Files
scylladb/test/cluster/test_commitlog.py
Andrei Chekun c950c2e582 test.py: convert skip_mode function to pytest.mark
Function skip_mode works only on function and only in cluster test. This if OK
when we need to skip one test, but it's not possible to use it with pytestmark
to automatically mark all tests in the file. The goal of this PR is to migrate

skip_mode to be dynamic pytest.mark that can be used as ordinary mark.

Closes scylladb/scylladb#27853

[avi: apply to test/cluster/test_tablets.py::test_table_creation_wakes_up_balancer]
2026-01-08 21:55:16 +02:00

93 lines
3.7 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
import logging
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.util import reconnect_driver
from test.cluster.conftest import skip_mode
from test.pylib.random_tables import Column, TextType
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_reboot(request, manager: ManagerClient):
# Check that commitlog provides durability in case of a node reboot.
#
# 1. start a node with commitlog_sync: batch
# 2. create a test table t
# 3. write some data to the table and truncate it, now a truncation record should be created
# 4. cleanly restart a node with enabled injection decrease_schema_commitlog_base_segment_id,
# this allows to set new segment id to zero, which imitates machine reboot
# 5. write some data into t
# 6. save its current content in local variable
# 7. kill -9 the node and restart it
# 8. check the table content is preserved
server_info = await manager.server_add(config={
'commitlog_sync': 'batch'
})
cql = manager.cql
await wait_for_cql_and_get_hosts(cql, [server_info], time.time() + 60)
logger.info("Node started")
random_tables = RandomTables(request.node.name, manager, "ks", 1)
await random_tables.add_table(name='t', pks=1, columns=[
Column(name="key", ctype=TextType),
Column(name="value", ctype=TextType)
])
logger.info("Test table created")
async def load_table() -> list:
result = await cql.run_async("select * from ks.t")
result.sort()
return result
def save_table(k: str, v: str):
return cql.run_async("insert into ks.t(key, value) values (%s, %s)",
parameters = [k, v])
test_rows_count = 10
futures = []
for i in range(0, test_rows_count):
futures.append(save_table(f'key_{i}', f'value_{i}'))
await asyncio.gather(*futures)
logger.info("Some data is written into test table")
await cql.run_async("truncate table ks.t")
logger.info("Test table is truncated")
await manager.server_stop_gracefully(server_info.server_id)
await manager.server_update_config(server_info.server_id,
'error_injections_at_startup',
['decrease_commitlog_base_segment_id'])
await manager.server_start(server_info.server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, [server_info], time.time() + 60)
logging.info("Node is restarted with decrease_schema_commitlog_base_segment_id injection")
futures = []
for i in range(0, test_rows_count):
futures.append(save_table(f'new_key_{i}', f'new_value_{i}'))
await asyncio.gather(*futures)
logger.info("Some new data is written into test table")
table_content_before_crash = await load_table()
await manager.server_stop(server_info.server_id)
await manager.server_start(server_info.server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, [server_info], time.time() + 60)
logging.info("Node is killed and restarted")
table_content_after_crash = await load_table()
logging.info(f"table content before crash [{table_content_before_crash}], "
f"after crash [{table_content_after_crash}]")
assert table_content_before_crash == table_content_after_crash