Files
scylladb/test/cluster/test_mutation_schema_change.py
Benny Halevy a4aa4d74c1 test/pylib: servers_add: add auto_rack_dc parameter
To quickly populate nodes in a single dc,
each node in its own rack.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-03-30 19:23:40 +03:00

154 lines
6.5 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
"""
Reproducer for a failure during lwt operation due to missing of a column mapping in schema history table.
"""
import asyncio
import logging
import time
from test.pylib.rest_client import inject_error_one_shot, inject_error
from test.pylib.util import wait_for_cql_and_get_hosts
import pytest
from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
@pytest.mark.enable_tablets(False) # uses lightweight transactions
async def test_mutation_schema_change(manager, random_tables):
"""
Cluster A, B, C
create table
stop C
change schema + do lwt write + change schema
stop B
start C
do lwt write to the same key through C
"""
server_a, server_b, server_c = await manager.running_servers()
t = await random_tables.add_table(ncolumns=5, pks=1)
manager.driver_close()
# Reduce the snapshot thresholds
await manager.mark_dirty()
errs = [inject_error_one_shot(manager.api, s.ip_addr, "raft_server_set_snapshot_thresholds",
parameters={'snapshot_threshold': '3', 'snapshot_trailing': '1'})
for s in [server_a, server_b, server_c]]
await asyncio.gather(*errs)
logger.info("Stopping C %s", server_c)
await manager.server_stop_gracefully(server_c.server_id)
await manager.driver_connect()
async with inject_error(manager.api, server_b.ip_addr, 'paxos_error_before_learn'):
await t.add_column()
ROWS = 1
seeds = [t.next_seq() for _ in range(ROWS)]
stmt = f"INSERT INTO {t} ({','.join(c.name for c in t.columns)}) " \
f"VALUES ({', '.join(['%s'] * len(t.columns))}) " \
f"IF NOT EXISTS"
query = SimpleStatement(stmt, consistency_level=ConsistencyLevel.ONE)
for seed in seeds:
logger.info("INSERT row seed %s", seed)
await manager.cql.run_async(query, parameters=[c.val(seed) for c in t.columns])
await t.add_column()
logger.info("Stopping B %s", server_b)
await manager.server_stop_gracefully(server_b.server_id)
logger.info("Starting C %s", server_c)
await manager.server_start(server_c.server_id, wait_others = 1)
logger.info("Driver connecting to C %s", server_c)
await manager.driver_connect(server=server_c)
await wait_for_cql_and_get_hosts(manager.cql, [server_a, server_c], time.time() + 60)
stmt = f"UPDATE {t} " \
f"SET {t.columns[3].name} = %s " \
f"WHERE {t.columns[0].name} = %s " \
f"IF {t.columns[3].name} = %s"
query = SimpleStatement(stmt, consistency_level=ConsistencyLevel.ONE)
for seed in seeds:
logger.info("UPDATE with seed %s", seed)
await manager.cql.run_async(query, parameters=[t.columns[3].val(seed + 1), # v_01 = seed + 1
t.columns[0].val(seed), # pk = seed
t.columns[3].val(seed)], # v_01 == seed
execution_profile='whitelist')
@pytest.mark.asyncio
@pytest.mark.enable_tablets(False) # uses lightweight transactions
async def test_mutation_schema_change_restart(manager, random_tables):
"""
Cluster A, B, C
create table
stop C
change schema + do lwt write + change schema
stop B
restart A
start C
do lwt write to the same key through A
"""
server_a, server_b, server_c = await manager.running_servers()
t = await random_tables.add_table(ncolumns=5, pks=1)
manager.driver_close()
# Reduce the snapshot thresholds
await manager.mark_dirty()
errs = [inject_error_one_shot(manager.api, s.ip_addr, "raft_server_set_snapshot_thresholds",
parameters={'snapshot_threshold': '3', 'snapshot_trailing': '1'})
for s in [server_a, server_b, server_c]]
await asyncio.gather(*errs)
logger.info("Stopping C %s", server_c)
await manager.server_stop_gracefully(server_c.server_id)
await manager.driver_connect()
await inject_error_one_shot(manager.api, server_a.ip_addr,
'raft_server_reduce_threshold')
async with inject_error(manager.api, server_b.ip_addr, 'paxos_error_before_learn'):
await t.add_column()
ROWS = 1
seeds = [t.next_seq() for _ in range(ROWS)]
stmt = f"INSERT INTO {t} ({','.join(c.name for c in t.columns)}) " \
f"VALUES ({', '.join(['%s'] * len(t.columns))}) " \
f"IF NOT EXISTS"
query = SimpleStatement(stmt, consistency_level=ConsistencyLevel.ONE)
for seed in seeds:
logger.info("INSERT row seed %s", seed)
await manager.cql.run_async(query, parameters=[c.val(seed) for c in t.columns])
await t.add_column()
manager.driver_close()
logger.info("Stopping B %s", server_b)
await manager.server_stop_gracefully(server_b.server_id)
logger.info("Restarting A %s", server_a)
await manager.server_restart(server_a.server_id)
logger.info("Starting C %s", server_c)
await manager.server_start(server_c.server_id, wait_others = 1) # Wait C to see another one (A)
# Wait for A to see C
await manager.server_sees_other_server(server_a.ip_addr, server_c.ip_addr)
logger.info("Driver connecting to A %s", server_a)
await manager.driver_connect(server=server_a)
await wait_for_cql_and_get_hosts(manager.cql, [server_a, server_c], time.time() + 60)
stmt = f"UPDATE {t} " \
f"SET {t.columns[3].name} = %s " \
f"WHERE {t.columns[0].name} = %s " \
f"IF {t.columns[3].name} = %s"
query = SimpleStatement(stmt, consistency_level=ConsistencyLevel.ONE)
for seed in seeds:
logger.info("UPDATE with seed %s", seed)
await manager.cql.run_async(query, parameters=[t.columns[3].val(seed + 1), # v_01 = seed + 1
t.columns[0].val(seed), # pk = seed
t.columns[3].val(seed)], # v_01 == seed
execution_profile='whitelist')