Files
scylladb/test/cluster/test_tablets_lwt.py

1018 lines
49 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from cassandra import WriteFailure
from cassandra.auth import PlainTextAuthProvider
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement, ConsistencyLevel
from cassandra.protocol import InvalidRequest, WriteTimeout
from cassandra import Unauthorized
from test.cluster.lwt.lwt_common import wait_for_tablet_count
from test.cluster.util import new_test_keyspace, unique_name, reconnect_driver
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.conftest import skip_mode
from test.pylib.internal_types import ServerInfo
from test.pylib.tablets import get_all_tablet_replicas
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.rest_client import inject_error_one_shot
import pytest
import asyncio
import logging
import time
import re
logger = logging.getLogger(__name__)
async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(
s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(
s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def inject_error_one_shot_on(manager, error_name, servers):
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
@pytest.mark.asyncio
async def test_lwt(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'paxos=trace'
]
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc')
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
quorum = len(hosts) // 2 + 1
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
# We use capital letters to check that the proper quotes are used in paxos store queries.
ks = unique_name() + '_Test'
await cql.run_async(f"CREATE KEYSPACE IF NOT EXISTS \"{ks}\" WITH replication={{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets={{'initial': 1}}")
async def check_paxos_state_table(exists: bool, replicas: int):
async def query_host(h: Host):
q = f"""
SELECT table_name FROM system_schema.tables
WHERE keyspace_name='{ks}' AND table_name='Test$paxos'
"""
rows = await cql.run_async(q, host=h)
return len(rows) > 0
results = await asyncio.gather(*(query_host(h) for h in hosts))
return results.count(exists) >= replicas
# We run the checks several times to check that the base table
# recreation is handled correctly.
for i in range(3):
await cql.run_async(f"CREATE TABLE \"{ks}\".\"Test\" (pk int PRIMARY KEY, c int);")
await cql.run_async(f"INSERT INTO \"{ks}\".\"Test\" (pk, c) VALUES ({i}, {i}) IF NOT EXISTS")
# For a Paxos round to succeed, a quorum of replicas must respond.
# Therefore, the Paxos state table must exist on at least a quorum of replicas,
# though it doesn't need to exist on all of them.
# The driver calls wait_for_schema_agreement only for DDL queries,
# so we need to handle quorums explicitly here.
assert await check_paxos_state_table(True, quorum)
await cql.run_async(f"UPDATE \"{ks}\".\"Test\" SET c = {i * 10} WHERE pk = {i} IF c = {i}")
rows = await cql.run_async(f"SELECT pk, c FROM \"{ks}\".\"Test\";")
assert len(rows) == 1
row = rows[0]
assert row.pk == i
assert row.c == i * 10
# TRUNCATE goes through the topology_coordinator and triggers a full topology state
# reload on replicas. We use this to verify that system.tablets remains in a consistent state—
# e.g., that no orphan tablet maps remain after dropping the Paxos state tables.
await cql.run_async(f"TRUNCATE TABLE \"{ks}\".\"Test\"")
await cql.run_async(f"DROP TABLE \"{ks}\".\"Test\"")
# The driver implicitly calls wait_for_schema_agreement,
# so the table must be dropped from all replicas, not just a quorum.
assert await check_paxos_state_table(False, len(hosts))
await manager.get_cql().run_async(f"DROP KEYSPACE \"{ks}\"")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_lwt_during_migration(manager: ManagerClient):
# Scenario:
# 1. A cluster with three nodes, a table with one tablet and RF=2
# 2. Run the tablet migration and suspend it during streaming
# 3. Run an LWT and verify it succeeds
# 4. Resume the migration, perform a Paxos read, and check the stored values
# 5. Do the same for the intranode migration
logger.info("Bootstrap a cluster with three nodes")
cmdline = [
'--logger-log-level', 'paxos=trace',
'--smp', '2'
]
servers = await manager.servers_add(3, cmdline=cmdline, property_file=[
{'dc': 'my_dc', 'rack': 'r1'},
{'dc': 'my_dc', 'rack': 'r2'},
{'dc': 'my_dc', 'rack': 'r1'}
])
(cql, hosts) = await manager.get_ready_cql(servers)
host_ids = await asyncio.gather(*[manager.get_host_id(s.server_id) for s in servers])
logger.info("Disable tablet balancing")
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Detect a node that doesn't contain a tablet replica")
tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablets) == 1
tablet = tablets[0]
assert len(tablet.replicas) == 2
[r1, r2] = tablet.replicas
target_host_id = next((h for h in host_ids if h not in {r1[0], r2[0]}), None)
assert target_host_id is not None
target_server = next((s for s, h in zip(servers, host_ids) if h == target_host_id), None)
assert target_server is not None
logger.info(f"Enable 'migration_streaming_wait' injection on {target_server.ip_addr}")
await manager.api.enable_injection(target_server.ip_addr, 'migration_streaming_wait', True)
logger.info(f"Start migration from {r1[0]} to {target_host_id}")
migration_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
*r1,
target_host_id, 0, tablet.last_token))
logger.info("Open log")
log = await manager.server_open_log(target_server.server_id)
logger.info(f"Wait for 'migration_streaming_wait: start' injection on {target_server.ip_addr}")
await log.wait_for('migration_streaming_wait: start')
logger.info("Run an LWT while migration is in-progress")
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
logger.info("Trigger 'migration_streaming_wait'")
await manager.api.message_injection(target_server.ip_addr, "migration_streaming_wait")
logger.info("Wait for migration to finish")
await migration_task
logger.info("Run paxos SELECT")
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.SERIAL))
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.c == 1
logger.info(f"Enable 'intranode_migration_streaming_wait' injection on {target_server.ip_addr}")
await manager.api.enable_injection(target_server.ip_addr, 'intranode_migration_streaming_wait', True)
logger.info(f"Start intranode migration from on {target_host_id} from shard 0 to shard 1")
ingranode_migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test",
target_host_id, 0,
target_host_id, 1, tablet.last_token))
logger.info(f"Wait for 'intranode_migration_streaming: waiting' injection on {target_server.ip_addr}")
await log.wait_for('intranode_migration_streaming: waiting')
logger.info("Run another LWT while ingranode migration is in-progress")
await cql.run_async(f"UPDATE {ks}.test SET c = 2 WHERE pk = 1 IF c = 1")
logger.info("Trigger 'migration_streaming_wait'")
await manager.api.message_injection(target_server.ip_addr, "intranode_migration_streaming_wait")
logger.info("Wait for intranode migration to finish")
await ingranode_migration_task
logger.info("Run paxos SELECT")
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.SERIAL))
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.c == 2
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_lwt_state_is_preserved_on_tablet_migration(manager: ManagerClient):
# Scenario:
# 1. Cells c1 and c2 of some partition are not set.
# 2. An LWT on {n1, n2} writes 1 to c1, stores accepts on {n1, n2} and learn on n1.
# 3. Tablet migrates from n2 to n4.
# 4. Another LWT comes, accepts/learns on {n3, n4} a value "2 if c1 != 1" for the cell c2.
# If paxos state was not preserved, this LWT would succeed since the quorum {n3, n4} is completely
# unaware of the decided value 1 for c1. We would miss this value and
# commit a new write which contradicts it. Depending in the status of the first LWT, this could
# be valid or. If the first LWT has failed (e.g. cl_learn == quorum == 2, learn succeeded on n1
# and failed on n2) this is OK, since the user hasn't observed the effects of the first LWT.
# In this test we use cl_learn == 1 for simplicity (see the comment below), so this ignoring
# the effects of the first LWT is not valid -- this breaks linearizability.
# 5. Do the SERIAL (paxos) read of c1 and c2 from {n1, n4}. If paxos state was not preserved, this
# read would see c1==1 (from learned value on n1) and c2 == 2 (from n4),
# which contradicts the CQL from 4.
logger.info("Bootstrap a cluster with three nodes")
cmdline = [
'--logger-log-level', 'paxos=trace'
]
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc')
cql = manager.get_cql()
async def set_injection(set_to: list[ServerInfo], injection: str):
logger.info(f"Injecting {injection} on {set_to}")
await inject_error_on(manager, injection, set_to)
unset_from = [s for s in servers if s not in set_to]
logger.info(f"Disabling {injection} on {unset_from}")
await disable_injection_on(manager, injection, unset_from)
logger.info("Resolve hosts")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Disable tablet balancing")
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c1 int, c2 int);")
# Step 2: Inject paxos_error_before_save_proposal - on [n3], paxos_error_before_learn on [n2, n3],
# so that the paxos proposal for the LWT would be accepted
# only on n1 and n2 and learnt only on n1. The new value for the CAS
# operation would be decided.
# Note: we use cl_learn=1 here to simplify the test, the same problem
# would be with the default cl_learn=quorum, if learn succeeded on n1 but
# failed on n2.
await set_injection([servers[2]], 'paxos_error_before_save_proposal')
await set_injection([servers[1], servers[2]], 'paxos_error_before_learn')
logger.info("Execute CAS(c1 := 1)")
lwt1 = SimpleStatement(f"INSERT INTO {ks}.test (pk, c1) VALUES (1, 1) IF NOT EXISTS",
consistency_level=ConsistencyLevel.ONE)
await cql.run_async(lwt1, host=hosts[0])
# Step3: start n4, migrate the single table tablet from n2 to n4.
servers += [await manager.server_add(cmdline=cmdline, property_file={'dc': 'my_dc', 'rack': 'rack2'})]
n4_host_id = await manager.get_host_id(servers[3].server_id)
logger.info("Migrating the tablet from n2 to n4")
n2_host_id = await manager.get_host_id(servers[1].server_id)
tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablets) == 1
tablet = tablets[0]
n2_replica = next(r for r in tablet.replicas if r[0] == n2_host_id)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *n2_replica,
*(n4_host_id, 0), tablet.last_token)
# Step 4: execute LWT on n3, n4
await set_injection([servers[0]], 'paxos_error_before_save_promise')
await set_injection([servers[0]], 'paxos_error_before_save_proposal')
await set_injection([servers[0]], 'paxos_error_before_learn')
logger.info("Execute CAS(c2 := 2 IF c1 != 1)")
await cql.run_async(f"UPDATE {ks}.test SET c2 = 2 WHERE pk = 1 IF c1 != 1", host=hosts[2])
# Step 5. Do the SERIAL (paxos) read of c1, c2 from {n1, n4}.
await set_injection([servers[2]], 'paxos_error_before_save_promise')
await set_injection([servers[2]], 'paxos_error_before_save_proposal')
await set_injection([servers[2]], 'paxos_error_before_learn')
logger.info("Execute paxos read")
lwt_read = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.SERIAL)
rows = await cql.run_async(lwt_read, host=hosts[1])
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.c1 == 1
assert row.c2 is None
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_no_lwt_with_tablets_feature(manager: ManagerClient):
config = {
'error_injections_at_startup': [
{
'name': 'suppress_features',
'value': 'LWT_WITH_TABLETS'
}
]
}
await manager.server_add(config=config)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (key int PRIMARY KEY, val int)")
await cql.run_async(f"INSERT INTO {ks}.test (key, val) VALUES(1, 0)")
with pytest.raises(InvalidRequest, match=f"{ks}\.test.*LWT is not yet supported with tablets"):
await cql.run_async(f"INSERT INTO {ks}.test (key, val) VALUES(1, 1) IF NOT EXISTS")
# The query is rejected during the execution phase,
# so preparing the LWT query is expected to succeed.
stmt = cql.prepare(
f"UPDATE {ks}.test SET val = 1 WHERE KEY = ? IF EXISTS")
with pytest.raises(InvalidRequest, match=f"{ks}\.test.*LWT is not yet supported with tablets"):
await cql.run_async(stmt, [1])
with pytest.raises(InvalidRequest, match=f"{ks}\.test.*LWT is not yet supported with tablets"):
await cql.run_async(f"DELETE FROM {ks}.test WHERE key = 1 IF EXISTS")
res = await cql.run_async(f"SELECT val FROM {ks}.test WHERE key = 1")
assert res[0].val == 0
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_lwt_state_is_preserved_on_tablet_rebuild(manager: ManagerClient):
# Scenario:
# 1. A cluster with 3 nodes, rf=3.
# 2. A successful LWT(c := 1) with cl_learn = 1 comes, stores accepts on n1 and n2, learn -- only on n1.
# The user has observed its effects (the LWT is successfull), so we are not allowed to lose it.
# 3. Node n1 is lost permanently.
# 4. New node n4 is added to replace n1. The tablet is rebuilt on n4 based on n2 and n3.
# 5. Do the SERIAL (paxos) read of c from {n3, n4}. If paxos state was not rebuilt, we would
# miss the decided value c == 1 of the first LWT.
logger.info("Bootstrap a cluster with three nodes")
cmdline = [
'--logger-log-level', 'paxos=trace'
]
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc')
cql = manager.get_cql()
logger.info("Resolve hosts")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Disable tablet balancing")
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
stopped_servers = set()
async def set_injection(set_to: list[ServerInfo], injection: str):
logger.info(f"Injecting {injection} on {set_to}")
await inject_error_on(manager, injection, set_to)
unset_from = [
s for s in servers if s not in set_to and s not in stopped_servers]
logger.info(f"Disabling {injection} on {unset_from}")
await disable_injection_on(manager, injection, unset_from)
# Step2. Inject paxos_error_before_save_proposal - on [n3], paxos_error_before_learn on [n2, n3]
await set_injection([servers[2]], 'paxos_error_before_save_proposal')
await set_injection([servers[1], servers[2]], 'paxos_error_before_learn')
logger.info("Execute CAS(c := 1)")
lwt1 = SimpleStatement(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS",
consistency_level=ConsistencyLevel.ONE)
await cql.run_async(lwt1, host=hosts[0])
# Step3: Node n1 is lost permanently
logger.info("Stop n1")
await manager.server_stop(servers[0].server_id)
stopped_servers.add(servers[0])
cql = await reconnect_driver(manager)
# Step 4: New node n4 is added to replace n1.
logger.info("Start n4")
replace_cfg = ReplaceConfig(
replaced_id=servers[0].server_id,
reuse_ip_addr=False,
use_host_id=True,
wait_replaced_dead=True
)
servers += [await manager.server_add(replace_cfg=replace_cfg,
cmdline=cmdline,
property_file={'dc': 'my_dc', 'rack': 'rack1'})]
# Check we've actually run rebuild
logs = []
for s in servers:
logs.append(await manager.server_open_log(s.server_id))
assert sum([len(await log.grep('Initiating repair phase of tablet rebuild')) for log in logs]) == 1
# Step 5. Do the SERIAL (paxos) read of c from {n3, n4}.
await set_injection([servers[1]], 'paxos_error_before_save_promise')
await set_injection([servers[1]], 'paxos_error_before_save_proposal')
await set_injection([], 'paxos_error_before_learn')
logger.info("Execute paxos read")
lwt_read = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.SERIAL)
rows = await cql.run_async(lwt_read)
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.c == 1
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_lwt_concurrent_base_table_recreation(manager: ManagerClient):
# The test checks that the node doesn't crash when the base table is recreated
# during LWT execution. A no_such_column_family exception is thrown, and the LWT
# fails as a result.
cmdline = [
'--logger-log-level', 'paxos=trace'
]
server = await manager.server_add(cmdline=cmdline)
cql = manager.get_cql()
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
async def recreate_table():
logger.info(f"Drop table if exists {ks}.test")
await cql.run_async(f"DROP TABLE IF EXISTS {ks}.test;")
logger.info(f"Create table {ks}.test")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await recreate_table()
logger.info("Inject load_paxos_state-enter")
await inject_error_one_shot_on(manager, "load_paxos_state-enter", [server])
logger.info("Start LWT")
lwt_task = cql.run_async(
f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
logger.info("Open log")
log = await manager.server_open_log(server.server_id)
logger.info("Wait for load_paxos_state-enter injection")
await log.wait_for('load_paxos_state-enter: waiting for message')
await recreate_table()
logger.info("Trigger load_paxos_state-enter")
await manager.api.message_injection(server.ip_addr, "load_paxos_state-enter")
with pytest.raises(WriteFailure):
await lwt_task
@pytest.mark.asyncio
@skip_mode('debug', 'aarch64/debug is unpredictably slow', platform_key='aarch64')
@skip_mode('release', 'error injections are not supported in release mode')
async def test_lwt_timeout_while_creating_paxos_state_table(manager: ManagerClient, build_mode):
timeout = 10000 if build_mode == 'debug' else 1000
config = {
'write_request_timeout_in_ms': timeout,
'error_injections_at_startup': [
{
'name': 'raft-group-registry-fd-threshold-in-ms',
'value': '500'
}
]
}
logger.info("Bootstrap a cluster with three nodes")
servers = await manager.servers_add(3, config=config)
cql = manager.get_cql()
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
logger.info(f"Create table {ks}.test")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Stop the second and third nodes")
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
manager.server_stop_gracefully(servers[2].server_id))
logger.info(f"Running an LWT with timeout {timeout}")
with pytest.raises(Exception, match="raft operation \\[read_barrier\\] timed out, there is no raft quorum"):
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
logger.info("Start the second and third nodes")
await asyncio.gather(manager.server_start(servers[1].server_id),
manager.server_start(servers[2].server_id))
@pytest.mark.asyncio
async def test_lwt_for_tablets_is_not_supported_without_raft(manager: ManagerClient):
# This test checks that LWT for tablets requires raft-based schema management.
cmdline = [
'--logger-log-level', 'paxos=trace'
]
servers = [await manager.server_add(cmdline=cmdline)]
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
# In RECOVERY mode the node doesn't join group 0 or start the group 0 Raft server,
# it performs all operations as in `use_pre_raft_procedures` and doesn't attempt to
# perform the upgrade.
# We use this mode to verify the behaviour of LWT for tables when RAFT is disabled.
logger.info("Set group0_upgrade_state := 'recovery'")
await cql.run_async("UPDATE system.scylla_local SET value = 'recovery' WHERE key = 'group0_upgrade_state'",
host = hosts[0])
logger.info(f"Rebooting {servers[0]} in RECOVERY mode")
await manager.server_restart(servers[0].server_id)
cql = await reconnect_driver(manager)
logger.info("Waiting for driver")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Run an LWT")
with pytest.raises(Exception, match=f"Cannot create paxos state table for {ks}.test because raft-based schema management is not enabled."):
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
@pytest.mark.asyncio
async def test_paxos_state_table_permissions(manager: ManagerClient):
# This test checks permission handling for paxos state tables:
# * Only a superuser is allowed to access a paxos state table
# * Even a superuser is not allowed to run ALTER or DROP on paxos state tables
cmdline = [
'--logger-log-level', 'paxos=trace'
]
config = {
'authenticator': 'PasswordAuthenticator',
'authorizer': 'CassandraAuthorizer'
}
manager.auth_provider = PlainTextAuthProvider(
username="cassandra", password="cassandra")
servers = [await manager.server_add(cmdline=cmdline, config=config)]
cql, _ = await manager.get_ready_cql(servers)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Run an LWT1")
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
logger.info("Create a role_1")
await cql.run_async("CREATE ROLE role_1 WITH SUPERUSER = true AND PASSWORD = 'psw' AND LOGIN = true")
logger.info("Create a role_2")
await cql.run_async("CREATE ROLE role_2 WITH SUPERUSER = false AND PASSWORD = 'psw' AND LOGIN = true")
logger.info("Grant all permissions on test table to role_2")
await cql.run_async(f"GRANT ALL PERMISSIONS ON TABLE {ks}.test TO role_2")
# We don't attempt to prevent users from granting permissions on $paxos tables,
# for simplicity. These permissions simply won't take effect — for example,
# role_2 will still be denied access to the $paxos table.
logger.info("Grant all permissions on test$paxos table to role_2")
await cql.run_async(f"GRANT ALL PERMISSIONS ON TABLE \"{ks}\".\"test$paxos\" TO role_2")
logger.info("Login as role_1")
await manager.driver_connect(auth_provider=PlainTextAuthProvider(username="role_1", password="psw"))
cql = manager.get_cql()
logger.info("Run an LWT2")
await cql.run_async(f"UPDATE {ks}.test SET c = 2 WHERE pk = 1 IF c = 1")
logger.info("Select data after LWT2")
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.SERIAL))
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.c == 2
logger.info("Select paxos state as role_1")
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.\"test$paxos\""))
assert len(rows) == 1
logger.info("Attempt to run DROP TABLE for paxos state table as role_1")
with pytest.raises(Unauthorized, match=re.escape(f"Cannot DROP <table {ks}.test$paxos>")):
await cql.run_async(SimpleStatement(f"DROP TABLE {ks}.\"test$paxos\""))
logger.info("Login as role_2")
await manager.driver_connect(auth_provider=PlainTextAuthProvider(username="role_2", password="psw"))
cql = manager.get_cql()
logger.info("Run an LWT3")
await cql.run_async(f"UPDATE {ks}.test SET c = 3 WHERE pk = 1 IF c = 2")
logger.info("Select data after LWT3")
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.SERIAL))
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.c == 3
logger.info("Select paxos state as role_2")
with pytest.raises(Unauthorized, match=re.escape(f"Only superusers are allowed to SELECT <table {ks}.test$paxos>")):
await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.\"test$paxos\""))
logger.info("Attempt to run DROP TABLE for paxos state table as role_2")
with pytest.raises(Unauthorized, match=re.escape(f"Cannot DROP <table {ks}.test$paxos>")):
await cql.run_async(SimpleStatement(f"DROP TABLE {ks}.\"test$paxos\""))
# Relogin as a default user to be able to DROP the test keyspace
await manager.driver_connect()
cql = manager.get_cql()
@pytest.mark.asyncio
async def test_lwt_coordinator_shard(manager: ManagerClient):
# The test checks that an LWT coordinator runs on a replica shard, and not on a 'default' (zero) shard.
# Scenario:
# 1. Start a cluster with one node with --smp 2
# 2. Create a table with one tablet and one replica, move tablet to the second shard
# 3. Start another node
# 4. Run an LWT on the second node, check the logs and assert that an LWT was executed on shard 1
# Note: Before the changes in storage_proxy/get_cas_shard, the last LWT would be executed
# on shard 0 of the second node because this node doesn't host any replicas of the
# tablet, and sharder.shard_for_reads would return 0 as the 'default' shard.
logger.info("Starting the first node")
cmdline = [
'--logger-log-level', 'paxos=trace',
'--smp', '2'
]
servers = [await manager.server_add(cmdline=cmdline)]
cql = manager.get_cql()
logger.info("Disable tablet balancing")
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Migrating the tablet to shard 1")
n1_host_id = await manager.get_host_id(servers[0].server_id)
tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablets) == 1
tablet = tablets[0]
assert len(tablet.replicas) == 1
old_replica = tablet.replicas[0]
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *old_replica,
*(n1_host_id, 1), tablet.last_token)
logger.info("Starting a second node")
servers += [await manager.server_add(cmdline=cmdline)]
logger.info("Wait for cql and get hosts")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info(f"Execute CAS on {hosts[1]}")
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS", host=hosts[1])
n2_log = await manager.server_open_log(servers[1].server_id)
matches = await n2_log.grep("CAS\\[0\\] successful")
assert len(matches) == 1
assert "shard 1" in matches[0][0]
@pytest.mark.asyncio
@skip_mode('debug', 'dev is enought: the test checks non-critical functionality')
@skip_mode('release', 'error injections are not supported in release mode')
async def test_error_message_for_timeout_due_to_write_uncertainty(manager: ManagerClient):
# LWT can sometimes return WriteTimeout when it is uncertain whether the transaction
# was applied. In this case, the user should retry the transaction.
#
# Scenario:
# 1. The cluster has three nodes. The first node is configured to throw an error on accept,
# and the second node is configured to wait for an explicit signal from the test.
# 2. Run the first LWT on the first node and wait until the coordinator processes the error from it.
# 3. Run another LWT, which should invalidate all promises made during the first one.
# 4. Signal the second node to proceed with accept; it must return a reject.
# 5. At this point, the coordinator is uncertain about the result of the first LWT. Any subsequent LWT
# might either complete its Paxos round (if a quorum of promises observed the accept) or
# overwrite it (if a quorum of promises never observed the accept).
#
# The test verifies that the WriteTimeout error returned by the system
# correctly reflects the cause of the problem.
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'paxos=trace'
]
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="mydc")
(cql, hosts) = await manager.get_ready_cql(servers)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
# accept on the first node returns an error
logger.info("Inject paxos_error_before_save_proposal")
await inject_error_one_shot_on(manager, "paxos_error_before_save_proposal", [servers[0]])
# accept on the second node returns reject
logger.info("Inject paxos_accept_proposal_wait")
await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [servers[1]])
logger.info("Open log")
log0 = await manager.server_open_log(servers[0].server_id)
logger.info(f"Start LWT1 on {hosts[0]}")
lwt_task = cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS;"),
host=hosts[0])
logger.info(
"Wait for the coordinator to process error from the first node")
await log0.wait_for('accept_proposal: failure while sending proposal')
logger.info(f"Run LWT2 on {hosts[1]}")
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 2) IF NOT EXISTS", host=hosts[1])
logger.info("Trigger paxos_accept_proposal_wait")
await manager.api.message_injection(servers[1].ip_addr, "paxos_accept_proposal_wait")
with pytest.raises(WriteTimeout, match="write timeout due to uncertainty(.*)injected_error_before_save_proposal"):
await lwt_task
@pytest.mark.asyncio
@skip_mode('debug', 'dev is enought')
@skip_mode('release', 'error injections are not supported in release mode')
async def test_no_uncertainty_for_reads(manager: ManagerClient):
# This test verifies that LWT reads do not produce 'uncertainty' timeouts.
#
# The scenario is similar to the write-uncertainty test:
# 1. The cluster has three nodes. The first node is configured to fail on accept,
# and the second node is configured to block until explicitly signaled by the test.
# 2. Run a read LWT on the first node and wait until the coordinator processes an accept error
# from the first node.
# 3. Run another LWT, which should invalidate all promises from the first one.
# 4. Signal the second node to proceed with accept; it must return a reject.
# 5. At this point, the coordinator is uncertain about the outcome of the first LWT. Any subsequent LWT
# might either complete its Paxos round (if a quorum of promises observed the accept) or
# overwrite it (if a quorum did not). Since the first LWT is read-only, the coordinator retries it
# transparently and returns the effects of the second LWT to the client.
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'paxos=trace'
]
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="mydc")
(cql, hosts) = await manager.get_ready_cql(servers)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
# accept on the first node returns an error
logger.info("Inject paxos_error_before_save_proposal")
await inject_error_one_shot_on(manager, "paxos_error_before_save_proposal", [servers[0]])
# accept on the second node returns reject
logger.info("Inject paxos_accept_proposal_wait")
await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [servers[1]])
logger.info("Open log")
log0 = await manager.server_open_log(servers[0].server_id)
logger.info(f"Start LWT1 on {hosts[0]}")
lwt_read = cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.SERIAL),
host=hosts[0])
logger.info(
"Wait for the coordinator to process error from the first node")
await log0.wait_for('accept_proposal: failure while sending proposal')
logger.info(f"Run LWT2 on {hosts[1]}")
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 2) IF NOT EXISTS", host=hosts[1])
logger.info("Trigger paxos_accept_proposal_wait")
await manager.api.message_injection(servers[1].ip_addr, "paxos_accept_proposal_wait")
rows = await lwt_read
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.c == 2
@pytest.mark.asyncio
async def test_lwts_for_special_tables(manager: ManagerClient):
"""
SELECT commands with SERIAL consistency level are historically allowed for vnode-based views,
even though they don't provide linearizability guarantees. We prohibit LWTs for tablet-based views,
but preserve old behavior for vnode-based view for compatibility. Similar logic is applied to
CDC log tables.
"""
cmdline = [
'--logger-log-level', 'paxos=trace'
]
await manager.servers_add(1, cmdline=cmdline)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1 } AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, c int, v int, PRIMARY KEY(pk, c)) WITH cdc = {{'enabled': true}}")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (pk, c)")
with pytest.raises(InvalidRequest, match=re.escape("Cannot directly modify a materialized view")):
await cql.run_async(f"INSERT INTO {ks}.tv (pk, c, v) VALUES (1, 2, 3) IF NOT EXISTS")
with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on views: {ks}.tv")):
await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.tv WHERE pk=1 AND c=2", consistency_level=ConsistencyLevel.SERIAL))
with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on CDC log tables: {ks}.test_scylla_cdc_log")):
await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test_scylla_cdc_log WHERE \"cdc$stream_id\"=0xAB", consistency_level=ConsistencyLevel.SERIAL))
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_lwt_shutdown(manager: ManagerClient):
"""
This is a regression test for #26355:
* Start a cluster with two nodes (s0, s1) and a tablet-based table.
* Inject `paxos_state_learn_after_mutate` on s0 for this table.
* Inject `storage_proxy::stop` on s0 to ensure that s0 shutdown does not
complete before the LWT write on s0 finishes.
* Execute an LWT with `cl_learn=1` on s0 and wait for it to succeed.
This LWT leaves a background write to s0 storage.
* Begin s0 shutdown and wait until it reaches “storage proxy RPC verbs”,
but not “paxos store”. In other words, storage_proxy::remote should start
draining, but the paxos store should not yet be destroyed since it is still
in use.
* Release the `database_apply_wait` injection.
* Verify that shutdown completes successfully.
* After s0 restarts, a node-local read on s0 must return the updated value.
"""
cmdline = [
'--logger-log-level', 'paxos=trace'
]
[s0, s1] = await manager.servers_add(2, cmdline=cmdline, property_file=[
{'dc': 'my_dc', 'rack': 'r1'},
{'dc': 'my_dc', 'rack': 'r2'}
])
(cql, [h0, _]) = await manager.get_ready_cql([s0, s1])
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int)")
logger.info(f"Enable 'paxos_state_learn_after_mutate' injection on {s0.ip_addr}")
await manager.api.enable_injection(s0.ip_addr, 'paxos_state_learn_after_mutate',
False, parameters={'cf_name': 'test'})
logger.info("Run an LWT")
await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, v) VALUES (1, 2) IF NOT EXISTS",
consistency_level=ConsistencyLevel.ONE),
host=h0)
logger.info("Open log")
log = await manager.server_open_log(s0.server_id)
logger.info("Wait for 'paxos_state_learn_after_mutate' injection")
await log.wait_for('paxos_state_learn_after_mutate: waiting for message')
logger.info(f"Enable 'storage_proxy::stop' injection on {s0.ip_addr}")
await manager.api.enable_injection(s0.ip_addr, 'storage_proxy::stop', True)
logger.info("Start node shutdown")
stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
await log.wait_for('Shutting down storage proxy RPC verbs')
# assert len(await log.grep('Shutting down paxos store')) == 0
logger.info("Trigger storage_proxy::stop and database_apply_wait")
await manager.api.message_injection(s0.ip_addr, "paxos_state_learn_after_mutate")
await manager.api.message_injection(s0.ip_addr, 'storage_proxy::stop')
logger.info("Waiting for paxos store shutdown")
await log.wait_for('Shutting down paxos store')
logger.info("Waiting for the node shutdown")
await stop_task
logger.info("Restarting server")
await manager.server_start(s0.server_id)
logger.info("Reconnecting the driver")
cql = await reconnect_driver(manager)
logger.info("Wait for cql and get hosts")
[h0, _] = await wait_for_cql_and_get_hosts(cql, [s0, s1], time.time() + 60)
logger.info("Run simple non-paxos read")
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
consistency_level=ConsistencyLevel.ONE),
host=h0)
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.v == 2
@pytest.mark.asyncio
@skip_mode('debug', 'dev is enough')
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablets_merge_waits_for_lwt(manager: ManagerClient):
"""
This is a regression test for #26437:
1. A cluster with one node, a table with rf=1 and two tablets on the same shard.
2. Run an LWT on the second tablet, make it hang on the paxos_accept_proposal_wait injection.
The LWT coordinator holds a tablet_metadata_guard with a non-migrating topology erm and global_shard=1.
3. Trigger tablets merge with the tablet_force_tablet_count_decrease injection, wait for tablet_count == 1.
This step invalidates the global_shard, the tablet_metadata_guard should stop refreshing erms.
4. Start tablet migration for the merged tablet, check it hangs on the first global barrier.
4. Release the paxos_accept_proposal_wait injection, check the barrier is released, the tablet is
migrated and the LWT succeeded.
"""
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'paxos=trace',
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'tablets=debug'
]
config = {
# to force faster tablet balancer reactions
'tablet_load_stats_refresh_interval_in_seconds': 1
}
s0 = await manager.server_add(cmdline=cmdline,
config=config,
property_file={'dc': 'dc1', 'rack': 'r1'})
(cql, _) = await manager.get_ready_cql([s0])
# Disable automatic tablet migrations so that they don't interfere with
# the global barrier checks the test wants to run for the table migration track.
await manager.api.enable_injection(s0.ip_addr, "tablet_migration_bypass", one_shot=False)
logger.info("Create a keyspace")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
logger.info("Create a table")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
tablets = await get_all_tablet_replicas(manager, s0, ks, 'test')
assert len(tablets) == 2
for t in tablets:
assert len(t.replicas) == 1
(s0_host_id, shard) = t.replicas[0]
if shard != 0:
logger.info(f"Migrating tablet {t.last_token} to shard0")
await manager.api.move_tablet(s0.ip_addr, ks, "test", s0_host_id, shard, s0_host_id, 0, t.last_token)
logger.info("Inject paxos_accept_proposal_wait")
await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [s0])
logger.info("Run an LWT")
# We use 3 as a key since its token is handled by the second tablet, which is important for the test.
lwt = cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (3, 3) IF NOT EXISTS")
logger.info("Open log")
log0 = await manager.server_open_log(s0.server_id)
logger.info("Wait for paxos_accept_proposal_wait")
await log0.wait_for("paxos_accept_proposal_wait: waiting for message")
logger.info("Injecting tablet_force_tablet_count_decrease")
await manager.api.enable_injection(s0.ip_addr, "tablet_force_tablet_count_decrease", one_shot=False)
m = await log0.mark()
logger.info("Wait for tablet merge to complete")
await wait_for_tablet_count(manager, s0, ks, 'test', lambda c: c == 1, 1, timeout_s=15)
logger.info("Ensure the guard decided to retain the erm")
await log0.wait_for("tablet_metadata_guard::check: retain the erm and abort the guard",
from_mark=m, timeout=10)
tablets = await get_all_tablet_replicas(manager, s0, ks, 'test')
assert len(tablets) == 1
tablet = tablets[0]
assert tablet.replicas == [(s0_host_id, 0)]
m = await log0.mark()
migration_task = asyncio.create_task(manager.api.move_tablet(s0.ip_addr, ks, "test",
s0_host_id, 0,
s0_host_id, 1,
tablet.last_token))
logger.info("Wait for the global barrier to start draining on shard0")
await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m)
# Just to confirm that the guard still holds the erm
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m)
assert len(matches) == 0
# Before the fix, the tablet migration global barrier did not wait for the LWT operation.
# As a result, the merged tablet could be migrated to another shard while the LWT is still running.
# This caused the LWT to crash in paxos_state::prepare/shards_for_writes because of the
# unexpected tablet shard change.
logger.info("Trigger 'paxos_accept_proposal_wait'")
await manager.api.message_injection(s0.ip_addr, "paxos_accept_proposal_wait")
logger.info("Wait for the tablet migration task to finish")
await migration_task
logger.info("Wait for the LWT to finish")
await lwt