# # Copyright (C) 2025-present ScyllaDB # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 # from cassandra import WriteFailure from cassandra.auth import PlainTextAuthProvider from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module from cassandra.query import SimpleStatement, ConsistencyLevel from cassandra.protocol import InvalidRequest, WriteTimeout from cassandra import Unauthorized from test.cluster.lwt.lwt_common import wait_for_tablet_count from test.cluster.util import new_test_keyspace, unique_name, reconnect_driver from test.pylib.manager_client import ManagerClient from test.pylib.util import wait_for_cql_and_get_hosts from test.cluster.conftest import skip_mode from test.pylib.internal_types import ServerInfo from test.pylib.tablets import get_all_tablet_replicas from test.pylib.scylla_cluster import ReplaceConfig from test.pylib.rest_client import inject_error_one_shot import pytest import asyncio import logging import time import re logger = logging.getLogger(__name__) async def inject_error_on(manager, error_name, servers): errs = [manager.api.enable_injection( s.ip_addr, error_name, False) for s in servers] await asyncio.gather(*errs) async def disable_injection_on(manager, error_name, servers): errs = [manager.api.disable_injection( s.ip_addr, error_name) for s in servers] await asyncio.gather(*errs) async def inject_error_one_shot_on(manager, error_name, servers): errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers] await asyncio.gather(*errs) @pytest.mark.asyncio async def test_lwt(manager: ManagerClient): logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'paxos=trace' ] servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc') cql = manager.get_cql() hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) quorum = len(hosts) // 2 + 1 await manager.api.disable_tablet_balancing(servers[0].ip_addr) # We use capital letters to check that the proper quotes are used in paxos store queries. ks = unique_name() + '_Test' await cql.run_async(f"CREATE KEYSPACE IF NOT EXISTS \"{ks}\" WITH replication={{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets={{'initial': 1}}") async def check_paxos_state_table(exists: bool, replicas: int): async def query_host(h: Host): q = f""" SELECT table_name FROM system_schema.tables WHERE keyspace_name='{ks}' AND table_name='Test$paxos' """ rows = await cql.run_async(q, host=h) return len(rows) > 0 results = await asyncio.gather(*(query_host(h) for h in hosts)) return results.count(exists) >= replicas # We run the checks several times to check that the base table # recreation is handled correctly. for i in range(3): await cql.run_async(f"CREATE TABLE \"{ks}\".\"Test\" (pk int PRIMARY KEY, c int);") await cql.run_async(f"INSERT INTO \"{ks}\".\"Test\" (pk, c) VALUES ({i}, {i}) IF NOT EXISTS") # For a Paxos round to succeed, a quorum of replicas must respond. # Therefore, the Paxos state table must exist on at least a quorum of replicas, # though it doesn't need to exist on all of them. # The driver calls wait_for_schema_agreement only for DDL queries, # so we need to handle quorums explicitly here. assert await check_paxos_state_table(True, quorum) await cql.run_async(f"UPDATE \"{ks}\".\"Test\" SET c = {i * 10} WHERE pk = {i} IF c = {i}") rows = await cql.run_async(f"SELECT pk, c FROM \"{ks}\".\"Test\";") assert len(rows) == 1 row = rows[0] assert row.pk == i assert row.c == i * 10 # TRUNCATE goes through the topology_coordinator and triggers a full topology state # reload on replicas. We use this to verify that system.tablets remains in a consistent state— # e.g., that no orphan tablet maps remain after dropping the Paxos state tables. await cql.run_async(f"TRUNCATE TABLE \"{ks}\".\"Test\"") await cql.run_async(f"DROP TABLE \"{ks}\".\"Test\"") # The driver implicitly calls wait_for_schema_agreement, # so the table must be dropped from all replicas, not just a quorum. assert await check_paxos_state_table(False, len(hosts)) await manager.get_cql().run_async(f"DROP KEYSPACE \"{ks}\"") @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_lwt_during_migration(manager: ManagerClient): # Scenario: # 1. A cluster with three nodes, a table with one tablet and RF=2 # 2. Run the tablet migration and suspend it during streaming # 3. Run an LWT and verify it succeeds # 4. Resume the migration, perform a Paxos read, and check the stored values # 5. Do the same for the intranode migration logger.info("Bootstrap a cluster with three nodes") cmdline = [ '--logger-log-level', 'paxos=trace', '--smp', '2' ] servers = await manager.servers_add(3, cmdline=cmdline, property_file=[ {'dc': 'my_dc', 'rack': 'r1'}, {'dc': 'my_dc', 'rack': 'r2'}, {'dc': 'my_dc', 'rack': 'r1'} ]) (cql, hosts) = await manager.get_ready_cql(servers) host_ids = await asyncio.gather(*[manager.get_host_id(s.server_id) for s in servers]) logger.info("Disable tablet balancing") await manager.api.disable_tablet_balancing(servers[0].ip_addr) async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") logger.info("Detect a node that doesn't contain a tablet replica") tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test') assert len(tablets) == 1 tablet = tablets[0] assert len(tablet.replicas) == 2 [r1, r2] = tablet.replicas target_host_id = next((h for h in host_ids if h not in {r1[0], r2[0]}), None) assert target_host_id is not None target_server = next((s for s, h in zip(servers, host_ids) if h == target_host_id), None) assert target_server is not None logger.info(f"Enable 'migration_streaming_wait' injection on {target_server.ip_addr}") await manager.api.enable_injection(target_server.ip_addr, 'migration_streaming_wait', True) logger.info(f"Start migration from {r1[0]} to {target_host_id}") migration_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test", *r1, target_host_id, 0, tablet.last_token)) logger.info("Open log") log = await manager.server_open_log(target_server.server_id) logger.info(f"Wait for 'migration_streaming_wait: start' injection on {target_server.ip_addr}") await log.wait_for('migration_streaming_wait: start') logger.info("Run an LWT while migration is in-progress") await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS") logger.info("Trigger 'migration_streaming_wait'") await manager.api.message_injection(target_server.ip_addr, "migration_streaming_wait") logger.info("Wait for migration to finish") await migration_task logger.info("Run paxos SELECT") rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.SERIAL)) assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.c == 1 logger.info(f"Enable 'intranode_migration_streaming_wait' injection on {target_server.ip_addr}") await manager.api.enable_injection(target_server.ip_addr, 'intranode_migration_streaming_wait', True) logger.info(f"Start intranode migration from on {target_host_id} from shard 0 to shard 1") ingranode_migration_task = asyncio.create_task( manager.api.move_tablet(servers[0].ip_addr, ks, "test", target_host_id, 0, target_host_id, 1, tablet.last_token)) logger.info(f"Wait for 'intranode_migration_streaming: waiting' injection on {target_server.ip_addr}") await log.wait_for('intranode_migration_streaming: waiting') logger.info("Run another LWT while ingranode migration is in-progress") await cql.run_async(f"UPDATE {ks}.test SET c = 2 WHERE pk = 1 IF c = 1") logger.info("Trigger 'migration_streaming_wait'") await manager.api.message_injection(target_server.ip_addr, "intranode_migration_streaming_wait") logger.info("Wait for intranode migration to finish") await ingranode_migration_task logger.info("Run paxos SELECT") rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.SERIAL)) assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.c == 2 @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_lwt_state_is_preserved_on_tablet_migration(manager: ManagerClient): # Scenario: # 1. Cells c1 and c2 of some partition are not set. # 2. An LWT on {n1, n2} writes 1 to c1, stores accepts on {n1, n2} and learn on n1. # 3. Tablet migrates from n2 to n4. # 4. Another LWT comes, accepts/learns on {n3, n4} a value "2 if c1 != 1" for the cell c2. # If paxos state was not preserved, this LWT would succeed since the quorum {n3, n4} is completely # unaware of the decided value 1 for c1. We would miss this value and # commit a new write which contradicts it. Depending in the status of the first LWT, this could # be valid or. If the first LWT has failed (e.g. cl_learn == quorum == 2, learn succeeded on n1 # and failed on n2) this is OK, since the user hasn't observed the effects of the first LWT. # In this test we use cl_learn == 1 for simplicity (see the comment below), so this ignoring # the effects of the first LWT is not valid -- this breaks linearizability. # 5. Do the SERIAL (paxos) read of c1 and c2 from {n1, n4}. If paxos state was not preserved, this # read would see c1==1 (from learned value on n1) and c2 == 2 (from n4), # which contradicts the CQL from 4. logger.info("Bootstrap a cluster with three nodes") cmdline = [ '--logger-log-level', 'paxos=trace' ] servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc') cql = manager.get_cql() async def set_injection(set_to: list[ServerInfo], injection: str): logger.info(f"Injecting {injection} on {set_to}") await inject_error_on(manager, injection, set_to) unset_from = [s for s in servers if s not in set_to] logger.info(f"Disabling {injection} on {unset_from}") await disable_injection_on(manager, injection, unset_from) logger.info("Resolve hosts") hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info("Disable tablet balancing") await manager.api.disable_tablet_balancing(servers[0].ip_addr) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c1 int, c2 int);") # Step 2: Inject paxos_error_before_save_proposal - on [n3], paxos_error_before_learn on [n2, n3], # so that the paxos proposal for the LWT would be accepted # only on n1 and n2 and learnt only on n1. The new value for the CAS # operation would be decided. # Note: we use cl_learn=1 here to simplify the test, the same problem # would be with the default cl_learn=quorum, if learn succeeded on n1 but # failed on n2. await set_injection([servers[2]], 'paxos_error_before_save_proposal') await set_injection([servers[1], servers[2]], 'paxos_error_before_learn') logger.info("Execute CAS(c1 := 1)") lwt1 = SimpleStatement(f"INSERT INTO {ks}.test (pk, c1) VALUES (1, 1) IF NOT EXISTS", consistency_level=ConsistencyLevel.ONE) await cql.run_async(lwt1, host=hosts[0]) # Step3: start n4, migrate the single table tablet from n2 to n4. servers += [await manager.server_add(cmdline=cmdline, property_file={'dc': 'my_dc', 'rack': 'rack2'})] n4_host_id = await manager.get_host_id(servers[3].server_id) logger.info("Migrating the tablet from n2 to n4") n2_host_id = await manager.get_host_id(servers[1].server_id) tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test') assert len(tablets) == 1 tablet = tablets[0] n2_replica = next(r for r in tablet.replicas if r[0] == n2_host_id) await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *n2_replica, *(n4_host_id, 0), tablet.last_token) # Step 4: execute LWT on n3, n4 await set_injection([servers[0]], 'paxos_error_before_save_promise') await set_injection([servers[0]], 'paxos_error_before_save_proposal') await set_injection([servers[0]], 'paxos_error_before_learn') logger.info("Execute CAS(c2 := 2 IF c1 != 1)") await cql.run_async(f"UPDATE {ks}.test SET c2 = 2 WHERE pk = 1 IF c1 != 1", host=hosts[2]) # Step 5. Do the SERIAL (paxos) read of c1, c2 from {n1, n4}. await set_injection([servers[2]], 'paxos_error_before_save_promise') await set_injection([servers[2]], 'paxos_error_before_save_proposal') await set_injection([servers[2]], 'paxos_error_before_learn') logger.info("Execute paxos read") lwt_read = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.SERIAL) rows = await cql.run_async(lwt_read, host=hosts[1]) assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.c1 == 1 assert row.c2 is None @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_no_lwt_with_tablets_feature(manager: ManagerClient): config = { 'error_injections_at_startup': [ { 'name': 'suppress_features', 'value': 'LWT_WITH_TABLETS' } ] } await manager.server_add(config=config) cql = manager.get_cql() async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (key int PRIMARY KEY, val int)") await cql.run_async(f"INSERT INTO {ks}.test (key, val) VALUES(1, 0)") with pytest.raises(InvalidRequest, match=f"{ks}\.test.*LWT is not yet supported with tablets"): await cql.run_async(f"INSERT INTO {ks}.test (key, val) VALUES(1, 1) IF NOT EXISTS") # The query is rejected during the execution phase, # so preparing the LWT query is expected to succeed. stmt = cql.prepare( f"UPDATE {ks}.test SET val = 1 WHERE KEY = ? IF EXISTS") with pytest.raises(InvalidRequest, match=f"{ks}\.test.*LWT is not yet supported with tablets"): await cql.run_async(stmt, [1]) with pytest.raises(InvalidRequest, match=f"{ks}\.test.*LWT is not yet supported with tablets"): await cql.run_async(f"DELETE FROM {ks}.test WHERE key = 1 IF EXISTS") res = await cql.run_async(f"SELECT val FROM {ks}.test WHERE key = 1") assert res[0].val == 0 @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_lwt_state_is_preserved_on_tablet_rebuild(manager: ManagerClient): # Scenario: # 1. A cluster with 3 nodes, rf=3. # 2. A successful LWT(c := 1) with cl_learn = 1 comes, stores accepts on n1 and n2, learn -- only on n1. # The user has observed its effects (the LWT is successfull), so we are not allowed to lose it. # 3. Node n1 is lost permanently. # 4. New node n4 is added to replace n1. The tablet is rebuilt on n4 based on n2 and n3. # 5. Do the SERIAL (paxos) read of c from {n3, n4}. If paxos state was not rebuilt, we would # miss the decided value c == 1 of the first LWT. logger.info("Bootstrap a cluster with three nodes") cmdline = [ '--logger-log-level', 'paxos=trace' ] servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc') cql = manager.get_cql() logger.info("Resolve hosts") hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info("Disable tablet balancing") await manager.api.disable_tablet_balancing(servers[0].ip_addr) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") stopped_servers = set() async def set_injection(set_to: list[ServerInfo], injection: str): logger.info(f"Injecting {injection} on {set_to}") await inject_error_on(manager, injection, set_to) unset_from = [ s for s in servers if s not in set_to and s not in stopped_servers] logger.info(f"Disabling {injection} on {unset_from}") await disable_injection_on(manager, injection, unset_from) # Step2. Inject paxos_error_before_save_proposal - on [n3], paxos_error_before_learn on [n2, n3] await set_injection([servers[2]], 'paxos_error_before_save_proposal') await set_injection([servers[1], servers[2]], 'paxos_error_before_learn') logger.info("Execute CAS(c := 1)") lwt1 = SimpleStatement(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS", consistency_level=ConsistencyLevel.ONE) await cql.run_async(lwt1, host=hosts[0]) # Step3: Node n1 is lost permanently logger.info("Stop n1") await manager.server_stop(servers[0].server_id) stopped_servers.add(servers[0]) cql = await reconnect_driver(manager) # Step 4: New node n4 is added to replace n1. logger.info("Start n4") replace_cfg = ReplaceConfig( replaced_id=servers[0].server_id, reuse_ip_addr=False, use_host_id=True, wait_replaced_dead=True ) servers += [await manager.server_add(replace_cfg=replace_cfg, cmdline=cmdline, property_file={'dc': 'my_dc', 'rack': 'rack1'})] # Check we've actually run rebuild logs = [] for s in servers: logs.append(await manager.server_open_log(s.server_id)) assert sum([len(await log.grep('Initiating repair phase of tablet rebuild')) for log in logs]) == 1 # Step 5. Do the SERIAL (paxos) read of c from {n3, n4}. await set_injection([servers[1]], 'paxos_error_before_save_promise') await set_injection([servers[1]], 'paxos_error_before_save_proposal') await set_injection([], 'paxos_error_before_learn') logger.info("Execute paxos read") lwt_read = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.SERIAL) rows = await cql.run_async(lwt_read) assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.c == 1 @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_lwt_concurrent_base_table_recreation(manager: ManagerClient): # The test checks that the node doesn't crash when the base table is recreated # during LWT execution. A no_such_column_family exception is thrown, and the LWT # fails as a result. cmdline = [ '--logger-log-level', 'paxos=trace' ] server = await manager.server_add(cmdline=cmdline) cql = manager.get_cql() logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: async def recreate_table(): logger.info(f"Drop table if exists {ks}.test") await cql.run_async(f"DROP TABLE IF EXISTS {ks}.test;") logger.info(f"Create table {ks}.test") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") await recreate_table() logger.info("Inject load_paxos_state-enter") await inject_error_one_shot_on(manager, "load_paxos_state-enter", [server]) logger.info("Start LWT") lwt_task = cql.run_async( f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS") logger.info("Open log") log = await manager.server_open_log(server.server_id) logger.info("Wait for load_paxos_state-enter injection") await log.wait_for('load_paxos_state-enter: waiting for message') await recreate_table() logger.info("Trigger load_paxos_state-enter") await manager.api.message_injection(server.ip_addr, "load_paxos_state-enter") with pytest.raises(WriteFailure): await lwt_task @pytest.mark.asyncio @skip_mode('debug', 'aarch64/debug is unpredictably slow', platform_key='aarch64') @skip_mode('release', 'error injections are not supported in release mode') async def test_lwt_timeout_while_creating_paxos_state_table(manager: ManagerClient, build_mode): timeout = 10000 if build_mode == 'debug' else 1000 config = { 'write_request_timeout_in_ms': timeout, 'error_injections_at_startup': [ { 'name': 'raft-group-registry-fd-threshold-in-ms', 'value': '500' } ] } logger.info("Bootstrap a cluster with three nodes") servers = await manager.servers_add(3, config=config) cql = manager.get_cql() logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: logger.info(f"Create table {ks}.test") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") logger.info("Stop the second and third nodes") await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id), manager.server_stop_gracefully(servers[2].server_id)) logger.info(f"Running an LWT with timeout {timeout}") with pytest.raises(Exception, match="raft operation \\[read_barrier\\] timed out, there is no raft quorum"): await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS") logger.info("Start the second and third nodes") await asyncio.gather(manager.server_start(servers[1].server_id), manager.server_start(servers[2].server_id)) @pytest.mark.asyncio async def test_lwt_for_tablets_is_not_supported_without_raft(manager: ManagerClient): # This test checks that LWT for tablets requires raft-based schema management. cmdline = [ '--logger-log-level', 'paxos=trace' ] servers = [await manager.server_add(cmdline=cmdline)] cql = manager.get_cql() hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") # In RECOVERY mode the node doesn't join group 0 or start the group 0 Raft server, # it performs all operations as in `use_pre_raft_procedures` and doesn't attempt to # perform the upgrade. # We use this mode to verify the behaviour of LWT for tables when RAFT is disabled. logger.info("Set group0_upgrade_state := 'recovery'") await cql.run_async("UPDATE system.scylla_local SET value = 'recovery' WHERE key = 'group0_upgrade_state'", host = hosts[0]) logger.info(f"Rebooting {servers[0]} in RECOVERY mode") await manager.server_restart(servers[0].server_id) cql = await reconnect_driver(manager) logger.info("Waiting for driver") await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info("Run an LWT") with pytest.raises(Exception, match=f"Cannot create paxos state table for {ks}.test because raft-based schema management is not enabled."): await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS") @pytest.mark.asyncio async def test_paxos_state_table_permissions(manager: ManagerClient): # This test checks permission handling for paxos state tables: # * Only a superuser is allowed to access a paxos state table # * Even a superuser is not allowed to run ALTER or DROP on paxos state tables cmdline = [ '--logger-log-level', 'paxos=trace' ] config = { 'authenticator': 'PasswordAuthenticator', 'authorizer': 'CassandraAuthorizer' } manager.auth_provider = PlainTextAuthProvider( username="cassandra", password="cassandra") servers = [await manager.server_add(cmdline=cmdline, config=config)] cql, _ = await manager.get_ready_cql(servers) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") logger.info("Run an LWT1") await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS") logger.info("Create a role_1") await cql.run_async("CREATE ROLE role_1 WITH SUPERUSER = true AND PASSWORD = 'psw' AND LOGIN = true") logger.info("Create a role_2") await cql.run_async("CREATE ROLE role_2 WITH SUPERUSER = false AND PASSWORD = 'psw' AND LOGIN = true") logger.info("Grant all permissions on test table to role_2") await cql.run_async(f"GRANT ALL PERMISSIONS ON TABLE {ks}.test TO role_2") # We don't attempt to prevent users from granting permissions on $paxos tables, # for simplicity. These permissions simply won't take effect — for example, # role_2 will still be denied access to the $paxos table. logger.info("Grant all permissions on test$paxos table to role_2") await cql.run_async(f"GRANT ALL PERMISSIONS ON TABLE \"{ks}\".\"test$paxos\" TO role_2") logger.info("Login as role_1") await manager.driver_connect(auth_provider=PlainTextAuthProvider(username="role_1", password="psw")) cql = manager.get_cql() logger.info("Run an LWT2") await cql.run_async(f"UPDATE {ks}.test SET c = 2 WHERE pk = 1 IF c = 1") logger.info("Select data after LWT2") rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.SERIAL)) assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.c == 2 logger.info("Select paxos state as role_1") rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.\"test$paxos\"")) assert len(rows) == 1 logger.info("Attempt to run DROP TABLE for paxos state table as role_1") with pytest.raises(Unauthorized, match=re.escape(f"Cannot DROP ")): await cql.run_async(SimpleStatement(f"DROP TABLE {ks}.\"test$paxos\"")) logger.info("Login as role_2") await manager.driver_connect(auth_provider=PlainTextAuthProvider(username="role_2", password="psw")) cql = manager.get_cql() logger.info("Run an LWT3") await cql.run_async(f"UPDATE {ks}.test SET c = 3 WHERE pk = 1 IF c = 2") logger.info("Select data after LWT3") rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.SERIAL)) assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.c == 3 logger.info("Select paxos state as role_2") with pytest.raises(Unauthorized, match=re.escape(f"Only superusers are allowed to SELECT
")): await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.\"test$paxos\"")) logger.info("Attempt to run DROP TABLE for paxos state table as role_2") with pytest.raises(Unauthorized, match=re.escape(f"Cannot DROP
")): await cql.run_async(SimpleStatement(f"DROP TABLE {ks}.\"test$paxos\"")) # Relogin as a default user to be able to DROP the test keyspace await manager.driver_connect() cql = manager.get_cql() @pytest.mark.asyncio async def test_lwt_coordinator_shard(manager: ManagerClient): # The test checks that an LWT coordinator runs on a replica shard, and not on a 'default' (zero) shard. # Scenario: # 1. Start a cluster with one node with --smp 2 # 2. Create a table with one tablet and one replica, move tablet to the second shard # 3. Start another node # 4. Run an LWT on the second node, check the logs and assert that an LWT was executed on shard 1 # Note: Before the changes in storage_proxy/get_cas_shard, the last LWT would be executed # on shard 0 of the second node because this node doesn't host any replicas of the # tablet, and sharder.shard_for_reads would return 0 as the 'default' shard. logger.info("Starting the first node") cmdline = [ '--logger-log-level', 'paxos=trace', '--smp', '2' ] servers = [await manager.server_add(cmdline=cmdline)] cql = manager.get_cql() logger.info("Disable tablet balancing") await manager.api.disable_tablet_balancing(servers[0].ip_addr) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") logger.info("Migrating the tablet to shard 1") n1_host_id = await manager.get_host_id(servers[0].server_id) tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test') assert len(tablets) == 1 tablet = tablets[0] assert len(tablet.replicas) == 1 old_replica = tablet.replicas[0] await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *old_replica, *(n1_host_id, 1), tablet.last_token) logger.info("Starting a second node") servers += [await manager.server_add(cmdline=cmdline)] logger.info("Wait for cql and get hosts") hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info(f"Execute CAS on {hosts[1]}") await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS", host=hosts[1]) n2_log = await manager.server_open_log(servers[1].server_id) matches = await n2_log.grep("CAS\\[0\\] successful") assert len(matches) == 1 assert "shard 1" in matches[0][0] @pytest.mark.asyncio @skip_mode('debug', 'dev is enought: the test checks non-critical functionality') @skip_mode('release', 'error injections are not supported in release mode') async def test_error_message_for_timeout_due_to_write_uncertainty(manager: ManagerClient): # LWT can sometimes return WriteTimeout when it is uncertain whether the transaction # was applied. In this case, the user should retry the transaction. # # Scenario: # 1. The cluster has three nodes. The first node is configured to throw an error on accept, # and the second node is configured to wait for an explicit signal from the test. # 2. Run the first LWT on the first node and wait until the coordinator processes the error from it. # 3. Run another LWT, which should invalidate all promises made during the first one. # 4. Signal the second node to proceed with accept; it must return a reject. # 5. At this point, the coordinator is uncertain about the result of the first LWT. Any subsequent LWT # might either complete its Paxos round (if a quorum of promises observed the accept) or # overwrite it (if a quorum of promises never observed the accept). # # The test verifies that the WriteTimeout error returned by the system # correctly reflects the cause of the problem. logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'paxos=trace' ] servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="mydc") (cql, hosts) = await manager.get_ready_cql(servers) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") # accept on the first node returns an error logger.info("Inject paxos_error_before_save_proposal") await inject_error_one_shot_on(manager, "paxos_error_before_save_proposal", [servers[0]]) # accept on the second node returns reject logger.info("Inject paxos_accept_proposal_wait") await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [servers[1]]) logger.info("Open log") log0 = await manager.server_open_log(servers[0].server_id) logger.info(f"Start LWT1 on {hosts[0]}") lwt_task = cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS;"), host=hosts[0]) logger.info( "Wait for the coordinator to process error from the first node") await log0.wait_for('accept_proposal: failure while sending proposal') logger.info(f"Run LWT2 on {hosts[1]}") await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 2) IF NOT EXISTS", host=hosts[1]) logger.info("Trigger paxos_accept_proposal_wait") await manager.api.message_injection(servers[1].ip_addr, "paxos_accept_proposal_wait") with pytest.raises(WriteTimeout, match="write timeout due to uncertainty(.*)injected_error_before_save_proposal"): await lwt_task @pytest.mark.asyncio @skip_mode('debug', 'dev is enought') @skip_mode('release', 'error injections are not supported in release mode') async def test_no_uncertainty_for_reads(manager: ManagerClient): # This test verifies that LWT reads do not produce 'uncertainty' timeouts. # # The scenario is similar to the write-uncertainty test: # 1. The cluster has three nodes. The first node is configured to fail on accept, # and the second node is configured to block until explicitly signaled by the test. # 2. Run a read LWT on the first node and wait until the coordinator processes an accept error # from the first node. # 3. Run another LWT, which should invalidate all promises from the first one. # 4. Signal the second node to proceed with accept; it must return a reject. # 5. At this point, the coordinator is uncertain about the outcome of the first LWT. Any subsequent LWT # might either complete its Paxos round (if a quorum of promises observed the accept) or # overwrite it (if a quorum did not). Since the first LWT is read-only, the coordinator retries it # transparently and returns the effects of the second LWT to the client. logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'paxos=trace' ] servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="mydc") (cql, hosts) = await manager.get_ready_cql(servers) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") # accept on the first node returns an error logger.info("Inject paxos_error_before_save_proposal") await inject_error_one_shot_on(manager, "paxos_error_before_save_proposal", [servers[0]]) # accept on the second node returns reject logger.info("Inject paxos_accept_proposal_wait") await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [servers[1]]) logger.info("Open log") log0 = await manager.server_open_log(servers[0].server_id) logger.info(f"Start LWT1 on {hosts[0]}") lwt_read = cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.SERIAL), host=hosts[0]) logger.info( "Wait for the coordinator to process error from the first node") await log0.wait_for('accept_proposal: failure while sending proposal') logger.info(f"Run LWT2 on {hosts[1]}") await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 2) IF NOT EXISTS", host=hosts[1]) logger.info("Trigger paxos_accept_proposal_wait") await manager.api.message_injection(servers[1].ip_addr, "paxos_accept_proposal_wait") rows = await lwt_read assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.c == 2 @pytest.mark.asyncio async def test_lwts_for_special_tables(manager: ManagerClient): """ SELECT commands with SERIAL consistency level are historically allowed for vnode-based views, even though they don't provide linearizability guarantees. We prohibit LWTs for tablet-based views, but preserve old behavior for vnode-based view for compatibility. Similar logic is applied to CDC log tables. """ cmdline = [ '--logger-log-level', 'paxos=trace' ] await manager.servers_add(1, cmdline=cmdline) cql = manager.get_cql() async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1 } AND tablets = {'initial': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int, c int, v int, PRIMARY KEY(pk, c)) WITH cdc = {{'enabled': true}}") await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (pk, c)") with pytest.raises(InvalidRequest, match=re.escape("Cannot directly modify a materialized view")): await cql.run_async(f"INSERT INTO {ks}.tv (pk, c, v) VALUES (1, 2, 3) IF NOT EXISTS") with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on views: {ks}.tv")): await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.tv WHERE pk=1 AND c=2", consistency_level=ConsistencyLevel.SERIAL)) with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on CDC log tables: {ks}.test_scylla_cdc_log")): await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test_scylla_cdc_log WHERE \"cdc$stream_id\"=0xAB", consistency_level=ConsistencyLevel.SERIAL)) @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_lwt_shutdown(manager: ManagerClient): """ This is a regression test for #26355: * Start a cluster with two nodes (s0, s1) and a tablet-based table. * Inject `paxos_state_learn_after_mutate` on s0 for this table. * Inject `storage_proxy::stop` on s0 to ensure that s0 shutdown does not complete before the LWT write on s0 finishes. * Execute an LWT with `cl_learn=1` on s0 and wait for it to succeed. This LWT leaves a background write to s0 storage. * Begin s0 shutdown and wait until it reaches “storage proxy RPC verbs”, but not “paxos store”. In other words, storage_proxy::remote should start draining, but the paxos store should not yet be destroyed since it is still in use. * Release the `database_apply_wait` injection. * Verify that shutdown completes successfully. * After s0 restarts, a node-local read on s0 must return the updated value. """ cmdline = [ '--logger-log-level', 'paxos=trace' ] [s0, s1] = await manager.servers_add(2, cmdline=cmdline, property_file=[ {'dc': 'my_dc', 'rack': 'r1'}, {'dc': 'my_dc', 'rack': 'r2'} ]) (cql, [h0, _]) = await manager.get_ready_cql([s0, s1]) async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int)") logger.info(f"Enable 'paxos_state_learn_after_mutate' injection on {s0.ip_addr}") await manager.api.enable_injection(s0.ip_addr, 'paxos_state_learn_after_mutate', False, parameters={'cf_name': 'test'}) logger.info("Run an LWT") await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, v) VALUES (1, 2) IF NOT EXISTS", consistency_level=ConsistencyLevel.ONE), host=h0) logger.info("Open log") log = await manager.server_open_log(s0.server_id) logger.info("Wait for 'paxos_state_learn_after_mutate' injection") await log.wait_for('paxos_state_learn_after_mutate: waiting for message') logger.info(f"Enable 'storage_proxy::stop' injection on {s0.ip_addr}") await manager.api.enable_injection(s0.ip_addr, 'storage_proxy::stop', True) logger.info("Start node shutdown") stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id)) await log.wait_for('Shutting down storage proxy RPC verbs') # assert len(await log.grep('Shutting down paxos store')) == 0 logger.info("Trigger storage_proxy::stop and database_apply_wait") await manager.api.message_injection(s0.ip_addr, "paxos_state_learn_after_mutate") await manager.api.message_injection(s0.ip_addr, 'storage_proxy::stop') logger.info("Waiting for paxos store shutdown") await log.wait_for('Shutting down paxos store') logger.info("Waiting for the node shutdown") await stop_task logger.info("Restarting server") await manager.server_start(s0.server_id) logger.info("Reconnecting the driver") cql = await reconnect_driver(manager) logger.info("Wait for cql and get hosts") [h0, _] = await wait_for_cql_and_get_hosts(cql, [s0, s1], time.time() + 60) logger.info("Run simple non-paxos read") rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;", consistency_level=ConsistencyLevel.ONE), host=h0) assert len(rows) == 1 row = rows[0] assert row.pk == 1 assert row.v == 2 @pytest.mark.asyncio @skip_mode('debug', 'dev is enough') @skip_mode('release', 'error injections are not supported in release mode') async def test_tablets_merge_waits_for_lwt(manager: ManagerClient): """ This is a regression test for #26437: 1. A cluster with one node, a table with rf=1 and two tablets on the same shard. 2. Run an LWT on the second tablet, make it hang on the paxos_accept_proposal_wait injection. The LWT coordinator holds a tablet_metadata_guard with a non-migrating topology erm and global_shard=1. 3. Trigger tablets merge with the tablet_force_tablet_count_decrease injection, wait for tablet_count == 1. This step invalidates the global_shard, the tablet_metadata_guard should stop refreshing erms. 4. Start tablet migration for the merged tablet, check it hangs on the first global barrier. 4. Release the paxos_accept_proposal_wait injection, check the barrier is released, the tablet is migrated and the LWT succeeded. """ logger.info("Bootstrapping cluster") cmdline = [ '--logger-log-level', 'paxos=trace', '--logger-log-level', 'load_balancer=debug', '--logger-log-level', 'tablets=debug' ] config = { # to force faster tablet balancer reactions 'tablet_load_stats_refresh_interval_in_seconds': 1 } s0 = await manager.server_add(cmdline=cmdline, config=config, property_file={'dc': 'dc1', 'rack': 'r1'}) (cql, _) = await manager.get_ready_cql([s0]) # Disable automatic tablet migrations so that they don't interfere with # the global barrier checks the test wants to run for the table migration track. await manager.api.enable_injection(s0.ip_addr, "tablet_migration_bypass", one_shot=False) logger.info("Create a keyspace") async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks: logger.info("Create a table") await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") tablets = await get_all_tablet_replicas(manager, s0, ks, 'test') assert len(tablets) == 2 for t in tablets: assert len(t.replicas) == 1 (s0_host_id, shard) = t.replicas[0] if shard != 0: logger.info(f"Migrating tablet {t.last_token} to shard0") await manager.api.move_tablet(s0.ip_addr, ks, "test", s0_host_id, shard, s0_host_id, 0, t.last_token) logger.info("Inject paxos_accept_proposal_wait") await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [s0]) logger.info("Run an LWT") # We use 3 as a key since its token is handled by the second tablet, which is important for the test. lwt = cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (3, 3) IF NOT EXISTS") logger.info("Open log") log0 = await manager.server_open_log(s0.server_id) logger.info("Wait for paxos_accept_proposal_wait") await log0.wait_for("paxos_accept_proposal_wait: waiting for message") logger.info("Injecting tablet_force_tablet_count_decrease") await manager.api.enable_injection(s0.ip_addr, "tablet_force_tablet_count_decrease", one_shot=False) m = await log0.mark() logger.info("Wait for tablet merge to complete") await wait_for_tablet_count(manager, s0, ks, 'test', lambda c: c == 1, 1, timeout_s=15) logger.info("Ensure the guard decided to retain the erm") await log0.wait_for("tablet_metadata_guard::check: retain the erm and abort the guard", from_mark=m, timeout=10) tablets = await get_all_tablet_replicas(manager, s0, ks, 'test') assert len(tablets) == 1 tablet = tablets[0] assert tablet.replicas == [(s0_host_id, 0)] m = await log0.mark() migration_task = asyncio.create_task(manager.api.move_tablet(s0.ip_addr, ks, "test", s0_host_id, 0, s0_host_id, 1, tablet.last_token)) logger.info("Wait for the global barrier to start draining on shard0") await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m) # Just to confirm that the guard still holds the erm matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m) assert len(matches) == 0 # Before the fix, the tablet migration global barrier did not wait for the LWT operation. # As a result, the merged tablet could be migrated to another shard while the LWT is still running. # This caused the LWT to crash in paxos_state::prepare/shards_for_writes because of the # unexpected tablet shard change. logger.info("Trigger 'paxos_accept_proposal_wait'") await manager.api.message_injection(s0.ip_addr, "paxos_accept_proposal_wait") logger.info("Wait for the tablet migration task to finish") await migration_task logger.info("Wait for the LWT to finish") await lwt