Files
scylladb/test/cluster/tasks/test_tablet_tasks.py
Aleksandra Martyniuk 02257d1429 test: add test_tablet_repair_wait
Add a test that checks if tablet_virtual_task::wait won't fail if
tablets are merged.
2026-03-10 14:42:27 +01:00

639 lines
30 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import random
from typing import Optional
import pytest
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.repair import create_table_insert_data_for_repair, get_tablet_task_id
from test.pylib.rest_client import read_barrier
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.util import create_new_test_keyspace, new_test_keyspace, get_topology_coordinator, find_server_by_host_id
from test.cluster.test_incremental_repair import trigger_tablet_merge
from test.cluster.test_tablets2 import inject_error_on
from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.tasks.task_manager_types import TaskStatus, TaskStats
from test.cluster.tasks import extra_scylla_cmdline_options
async def enable_injection(manager: ManagerClient, servers: list[ServerInfo], injection: str):
for server in servers:
await manager.api.enable_injection(server.ip_addr, injection, False)
async def disable_injection(manager: ManagerClient, servers: list[ServerInfo], injection: str):
for server in servers:
await manager.api.disable_injection(server.ip_addr, injection)
async def message_injection(manager: ManagerClient, servers: list[ServerInfo], injection: str):
for server in servers:
await manager.api.message_injection(server.ip_addr, injection)
async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_name: str, expected_number: int, type: str, keyspace: str, table: Optional[str] = None):
async def get_tasks():
tasks = [task for task in await tm.list_tasks(server.ip_addr, module_name) if task.kind == "cluster" and task.type == type and task.keyspace == keyspace]
return [task for task in tasks if not table or table == task.table]
tasks = await get_tasks()
while len(tasks) != expected_number:
tasks = await get_tasks()
return tasks
def check_task_status(status: TaskStatus, states: list[str], type: str, scope: str, abortable: bool, keyspace: str, table: str = "test", possible_child_num: list[int] = [0]):
assert status.scope == scope
assert status.kind == "cluster"
assert status.type == type
assert status.keyspace == keyspace
assert status.table == table
assert status.is_abortable == abortable
assert len(status.children_ids) in possible_child_num
assert status.state in states
async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str):
# Wait until user repair task is created.
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=keyspace)
task = repair_tasks[0]
assert task.scope == "table"
assert task.keyspace == keyspace
assert task.table == "test"
assert task.state in ["created", "running"]
status = await tm.get_task_status(servers[0].ip_addr, task.task_id)
check_task_status(status, ["created", "running"], "user_repair", "table", True, keyspace)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async def wait_for_task():
status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
check_task_status(status_wait, ["done"], "user_repair", "table", True, keyspace)
async def abort_task():
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
await tm.abort_task(servers[0].ip_addr, task.task_id)
await asyncio.gather(wait_for_task(), abort_task())
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_repair_task(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
async def repair_task():
token = -1
# Keep retring tablet repair.
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await asyncio.gather(repair_task(), check_and_abort_repair_task(manager, tm, servers, module_name, ks))
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_repair_wait_with_table_drop(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
injection = "tablet_virtual_task_wait"
cmdline = [
'--logger-log-level', 'debug_error_injection=debug',
]
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, cmdline=cmdline)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
token = -1
await enable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=ks)
task = repair_tasks[0]
assert task.scope == "table"
assert task.keyspace == ks
assert task.table == "test"
assert task.state in ["created", "running"]
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
await enable_injection(manager, [servers[0]], injection)
async def wait_for_task():
status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
assert status_wait.state == "done"
async def drop_table():
await log.wait_for(f'"{injection}"', from_mark=mark)
await disable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
await manager.get_cql().run_async(f"DROP TABLE {ks}.test")
await manager.api.message_injection(servers[0].ip_addr, injection)
await asyncio.gather(wait_for_task(), drop_table())
await disable_injection(manager, servers, injection)
async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str):
def get_task_with_id(repair_tasks, task_id):
tasks_with_id1 = [task for task in repair_tasks if task.task_id == task_id]
assert len(tasks_with_id1) == 1
return tasks_with_id1[0]
# Wait until user repair tasks are created.
repair_tasks0 = await wait_tasks_created(tm, servers[0], module_name, len(servers), "user_repair", keyspace=keyspace)
repair_tasks1 = await wait_tasks_created(tm, servers[1], module_name, len(servers), "user_repair", keyspace=keyspace)
repair_tasks2 = await wait_tasks_created(tm, servers[2], module_name, len(servers), "user_repair", keyspace=keyspace)
assert len(repair_tasks0) == len(repair_tasks1), f"Different number of repair virtual tasks on nodes {servers[0].server_id} and {servers[1].server_id}"
assert len(repair_tasks0) == len(repair_tasks2), f"Different number of repair virtual tasks on nodes {servers[0].server_id} and {servers[2].server_id}"
for task0 in repair_tasks0:
task1 = get_task_with_id(repair_tasks1, task0.task_id)
task2 = get_task_with_id(repair_tasks2, task0.task_id)
assert task0.table in ["test", "test2", "test3"]
assert task0.table == task1.table, f"Inconsistent table for task {task0.task_id}"
assert task0.table == task2.table, f"Inconsistent table for task {task0.task_id}"
for task in [task0, task1, task2]:
assert task.state in ["created", "running"]
assert task.type == "user_repair"
assert task.kind == "cluster"
assert task.scope == "table"
assert task.keyspace == keyspace
await tm.abort_task(servers[0].ip_addr, task0.task_id)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_repair_task_list(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
# Create other tables.
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
await cql.run_async(f"CREATE TABLE {ks}.test3 (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test2 (pk, c) VALUES ({k}, {k});") for k in keys])
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test3 (pk, c) VALUES ({k}, {k});") for k in keys])
async def run_repair(server_id, table_name):
token = -1
await manager.api.tablet_repair(servers[server_id].ip_addr, ks, table_name, token)
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
await asyncio.gather(run_repair(0, "test"), run_repair(1, "test2"), run_repair(2, "test3"), check_repair_task_list(tm, servers, module_name, ks))
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_repair_wait(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
stop_repair_injection = "repair_tablet_repair_task_impl_run"
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
await inject_error_on(manager, stop_repair_injection, servers)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", await_completion=False)
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=ks)
task = repair_tasks[0]
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async def wait_for_task():
await enable_injection(manager, servers, "tablet_virtual_task_wait")
status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
async def merge_tablets():
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
# Resume repair.
await message_injection(manager, servers, stop_repair_injection)
# Merge tablets.
coord = await find_server_by_host_id(manager, servers, await get_topology_coordinator(manager))
log2 = await manager.server_open_log(coord.server_id)
await trigger_tablet_merge(manager, servers, [log2])
await read_barrier(manager.api, servers[0].ip_addr)
await message_injection(manager, servers, "tablet_virtual_task_wait")
await asyncio.gather(wait_for_task(), merge_tablets())
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_repair_task_children(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
injection = "repair_tablet_repair_task_impl_run"
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
for server in servers:
await tm.set_task_ttl(server.ip_addr, 3600)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async def repair_task():
token = -1
# Keep retring tablet repair.
await inject_error_on(manager, injection, servers)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
async def resume_repair():
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
await message_injection(manager, servers, injection)
async def check_children():
# Wait until user repair task is created.
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", ks)
status = await tm.wait_for_task(servers[0].ip_addr, repair_tasks[0].task_id)
assert len(status.children_ids) == 1
child = status.children_ids[0]
child_status = await tm.get_task_status(child["node"], child["task_id"])
assert child_status.parent_id == repair_tasks[0].task_id
await asyncio.gather(repair_task(), resume_repair(), check_children())
async def prepare_migration_test(manager: ManagerClient):
servers = []
host_ids = []
async def make_server():
s = await manager.server_add(cmdline=extra_scylla_cmdline_options)
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await make_server()
await manager.disable_tablet_balancing()
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await make_server()
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({1}, {1});")
return (ks, servers, host_ids)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_migration_task(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
ks, servers, host_ids = await prepare_migration_test(manager)
injection = "handle_tablet_migration_end_migration"
async def move_tablet(old_replica, new_replica):
await manager.api.enable_injection(servers[0].ip_addr, injection, False)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
async def check(type):
# Wait until migration task is created.
migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks)
assert len(migration_tasks) == 1
status = await tm.get_task_status(servers[0].ip_addr, migration_tasks[0].task_id)
check_task_status(status, ["created", "running"], type, "tablet", False, keyspace=ks)
await manager.api.disable_injection(servers[0].ip_addr, injection)
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
intranode_migration_src = replicas[0].replicas[0]
intranode_migration_dst = (intranode_migration_src[0], 1 - intranode_migration_src[1])
await asyncio.gather(move_tablet(intranode_migration_src, intranode_migration_dst), check("intranode_migration"))
migration_src = intranode_migration_dst
assert migration_src[0] != host_ids[1]
migration_dst = (host_ids[1], 0)
await asyncio.gather(move_tablet(migration_src, migration_dst), check("migration"))
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_migration_task_list(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
ks, servers, host_ids = await prepare_migration_test(manager)
injection = "handle_tablet_migration_end_migration"
async def move_tablet(server, old_replica, new_replica):
await manager.api.move_tablet(server.ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
async def check_migration_task_list(type: str):
# Wait until migration tasks are created.
migration_tasks0 = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks)
migration_tasks1 = await wait_tasks_created(tm, servers[1], module_name, 1, type, keyspace=ks)
assert len(migration_tasks0) == len(migration_tasks1), f"Different number of migration virtual tasks on nodes {servers[0].server_id} and {servers[1].server_id}"
assert len(migration_tasks0) == 1, f"Wrong number of migration virtual tasks"
task0 = migration_tasks0[0]
task1 = migration_tasks1[0]
assert task0.task_id == task1.task_id
for task in [task0, task1]:
assert task.state in ["created", "running"]
assert task.type == type
assert task.kind == "cluster"
assert task.scope == "tablet"
assert task.table == "test"
assert task.keyspace == ks
await disable_injection(manager, servers, injection)
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
intranode_migration_src = replicas[0].replicas[0]
intranode_migration_dst = (intranode_migration_src[0], 1 - intranode_migration_src[1])
await enable_injection(manager, servers, injection)
await asyncio.gather(move_tablet(servers[0], intranode_migration_src, intranode_migration_dst), check_migration_task_list("intranode_migration"))
migration_src = intranode_migration_dst
assert migration_src[0] != host_ids[1]
migration_dst = (host_ids[1], 0)
await enable_injection(manager, servers, injection)
await asyncio.gather(move_tablet(servers[0], migration_src, migration_dst), check_migration_task_list("migration"))
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_migration_task_failed(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
ks, servers, host_ids = await prepare_migration_test(manager)
wait_injection = "stream_tablet_wait"
throw_injection = "stream_tablet_move_to_cleanup"
async def move_tablet(old_replica, new_replica):
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
async def wait_for_task(task_id, type):
status = await tm.wait_for_task(servers[0].ip_addr, task_id)
check_task_status(status, ["failed"], type, "tablet", False, keyspace=ks)
async def resume_migration(log, mark):
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
await disable_injection(manager, servers, wait_injection)
async def check(type, log, mark):
# Wait until migration task is created.
migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks)
assert len(migration_tasks) == 1
await asyncio.gather(wait_for_task(migration_tasks[0].task_id, type), resume_migration(log, mark))
await enable_injection(manager, servers, wait_injection)
await enable_injection(manager, servers, throw_injection)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
src = replicas[0].replicas[0]
dst = (src[0], 1 - src[1])
await asyncio.gather(move_tablet(src, dst), check("intranode_migration", log, mark))
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
token = -1
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
async def check_none():
tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token)
assert tablet_task_id is None
await check_none()
async def repair_task():
await enable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
async def wait_and_check_none():
task = (await wait_tasks_created(tm, servers[0], module_name, 1,"user_repair", keyspace=ks))[0]
await disable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
status = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
await check_none()
await asyncio.gather(repair_task(), wait_and_check_none())
async def prepare_split(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
await manager.disable_tablet_balancing()
cql = manager.get_cql()
insert = cql.prepare(f"INSERT INTO {keyspace}.{table}(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(1000)
cql.execute(insert, [pk, value])
await manager.api.flush_keyspace(server.ip_addr, keyspace)
async def prepare_merge(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
await manager.disable_tablet_balancing()
cql = manager.get_cql()
await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table} WHERE pk={k};") for k in keys])
await manager.api.flush_keyspace(server.ip_addr, keyspace)
async def enable_tablet_balancing_and_wait(manager: ManagerClient, server: ServerInfo, message: str):
s1_log = await manager.server_open_log(server.server_id)
s1_mark = await s1_log.mark()
await manager.enable_tablet_balancing()
await s1_log.wait_for(message, from_mark=s1_mark)
#common cmdline for server_add command in all tests below
cmdline = ['--target-tablet-size-in-bytes', '30000', ]
cmdline.extend(extra_scylla_cmdline_options)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_resize_task(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline, config={
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
table1 = "test1"
table2 = "test2"
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace:
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
await cql.run_async(f"CREATE TABLE {keyspace}.{table2} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
await enable_tablet_balancing_and_wait(manager, servers[0], "Detected tablet split for table")
await wait_tasks_created(tm, servers[0], module_name, 0, "split", keyspace, table1)
await prepare_split(manager, servers[0], keyspace, table2, keys)
await prepare_merge(manager, servers[0], keyspace, table1, keys[:-1])
await manager.api.keyspace_compaction(servers[0].ip_addr, keyspace)
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.enable_tablet_balancing()
async def wait_and_check_status(server, type, keyspace, table):
task = (await wait_tasks_created(tm, server, module_name, 1, type, keyspace, table))[0]
status = await tm.get_task_status(server.ip_addr, task.task_id)
# With incremental repair, we have doubled the tasks for repaired and unrepaired set
check_task_status(status, ["running"], type, "table", False, keyspace, table, [0, 1, 2, 3, 4])
await wait_and_check_status(servers[0], "split", keyspace, table2)
await wait_and_check_status(servers[0], "merge", keyspace, table1)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_resize_list(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline, config={
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
table1 = "test1"
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace:
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
servers.append(await manager.server_add(cmdline=cmdline, config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}))
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
injection = "tablet_split_finalization_postpone"
compaction_injection = "split_sstable_rewrite"
await enable_injection(manager, servers, injection)
await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True)
await manager.enable_tablet_balancing()
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", keyspace, table1))[0]
assert task0.task_id == task1.task_id
for task in [task0, task1]:
assert task.state == "running"
assert task.type == "split"
assert task.kind == "cluster"
assert task.scope == "table"
assert task.table == table1
assert task.keyspace == keyspace
await s1_log.wait_for("split_sstable_rewrite: waiting", from_mark=s1_mark)
await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite")
status1 = await tm.get_task_status(servers[1].ip_addr, task0.task_id)
status0 = await tm.get_task_status(servers[0].ip_addr, task0.task_id)
children_ids_len1 = len(status1.children_ids)
children_ids_len0 = len(status0.children_ids)
assert 0 < children_ids_len1 and children_ids_len1 <= children_ids_len0 and children_ids_len0 <= 4
assert all(child_id in status0.children_ids for child_id in status1.children_ids)
await disable_injection(manager, servers, injection)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.skip_mode(mode='debug', reason='debug mode is too time-sensitive')
async def test_tablet_resize_revoked(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline, config={
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
table1 = "test1"
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace:
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.enable_tablet_balancing()
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async def revoke_resize(log, mark):
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
revoke_injection = "force_resize_cancellation"
await enable_injection(manager, servers, revoke_injection)
async def wait_for_task(task_id):
status = await tm.wait_for_task(servers[0].ip_addr, task_id)
# With incremental repair, we have doubled the tasks for repaired and unrepaired set
check_task_status(status, ["suspended"], "split", "table", False, keyspace, table1, [0, 1, 2, 3, 4])
await asyncio.gather(revoke_resize(log, mark), wait_for_task(task0.task_id))
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_task_sees_latest_state(manager: ManagerClient):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
token = -1
async def repair_task():
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
# Check failed repair request can be deleted
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
async def del_repair_task():
tablet_task_id = None
while tablet_task_id is None:
tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token)
await manager.api.abort_task(servers[0].ip_addr, tablet_task_id)
await asyncio.gather(repair_task(), del_repair_task())