When it deadlocks, groups stop merging and compaction group merge backlog will run-away. Also, graceful shutdown will be blocked on it. Found by flaky unit test test_merge_chooses_best_replica_with_odd_count, which timed-out in 1 in 100 runs. Reason for deadlock: When storage groups are merged, the main compaction group of the new storage group takes a compaction lock, which is appended to _compaction_reenablers_for_merging, and released when the merge completion fiber is done with the whole batch. If we accumulate more than 1 merge cycle for the fiber, deadlock occurs. Lock order will be this Initial state: cg0: main cg1: main cg2: main cg3: main After 1st merge: cg0': main [locked], merging_groups=[cg0.main, cg1.main] cg1': main [locked], merging_groups=[cg2.main, cg3.main] After 2nd merge: cg0'': main [locked], merging_groups=[cg0'.main [locked], cg0.main, cg1.main, cg1'.main [locked], cg2.main, cg3.main] merge completion fiber will try to stop cg0'.main, which will be blocked on compaction lock. which is held by the reenabler in _compaction_reenablers_for_merging, hence deadlock. The fix is to wait for background merge to finish before we start the next merge. It's achieved by holding old erm in the background merge, and doing a topology barrier from the merge finalizing transition. Background merge is supposed to be a relatively quick operation, it's stopping compaction groups. So may wait for active requests. It shouldn't prolong the barrier indefinitely. Tablet boost unit tests which trigger merge need to be adjusted to call the barrier, otherwise they will be vulnerable to the deadlock. Two cluster tests were removed because they assumed that merge happens in the backgournd. Now that it happens as part of merge finalization, and blocks topology state machine, those tests deadlock because they are unable to make topology changes (node bootstrap) while background merge is blocked. The test "test_tablets_merge_waits_for_lwt" needed to be adjusted. It assumed that merge finalization doesn't wait for the erm held by the LWT operation, and triggered tablet movement afterwards, and assumed that this migration will issue a barrier which will block on the LWT operation. After this commit, it's the barrier in merge finalization which is blocked. The test was adjusted to use an earlier log mark when waiting for "Got raft_topology_cmd::barrier_and_drain", which will catch the barrier in merge finalization. Fixes SCYLLADB-928
979 lines
48 KiB
Python
979 lines
48 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from cassandra import WriteFailure
|
|
from cassandra.auth import PlainTextAuthProvider
|
|
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.query import SimpleStatement, ConsistencyLevel
|
|
from cassandra.protocol import InvalidRequest, WriteTimeout
|
|
from cassandra import Unauthorized
|
|
|
|
from test.cluster.lwt.lwt_common import wait_for_tablet_count
|
|
from test.cluster.util import new_test_keyspace, unique_name, reconnect_driver
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.util import wait_for_cql_and_get_hosts
|
|
from test.pylib.internal_types import ServerInfo
|
|
from test.pylib.tablets import get_all_tablet_replicas
|
|
from test.pylib.scylla_cluster import ReplaceConfig
|
|
from test.pylib.rest_client import inject_error_one_shot
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def inject_error_on(manager, error_name, servers):
|
|
errs = [manager.api.enable_injection(
|
|
s.ip_addr, error_name, False) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def disable_injection_on(manager, error_name, servers):
|
|
errs = [manager.api.disable_injection(
|
|
s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def inject_error_one_shot_on(manager, error_name, servers):
|
|
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lwt(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc')
|
|
cql = manager.get_cql()
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
quorum = len(hosts) // 2 + 1
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
# We use capital letters to check that the proper quotes are used in paxos store queries.
|
|
ks = unique_name() + '_Test'
|
|
await cql.run_async(f"CREATE KEYSPACE IF NOT EXISTS \"{ks}\" WITH replication={{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND tablets={{'initial': 1}}")
|
|
|
|
async def check_paxos_state_table(exists: bool, replicas: int):
|
|
async def query_host(h: Host):
|
|
q = f"""
|
|
SELECT table_name FROM system_schema.tables
|
|
WHERE keyspace_name='{ks}' AND table_name='Test$paxos'
|
|
"""
|
|
rows = await cql.run_async(q, host=h)
|
|
return len(rows) > 0
|
|
results = await asyncio.gather(*(query_host(h) for h in hosts))
|
|
return results.count(exists) >= replicas
|
|
|
|
# We run the checks several times to check that the base table
|
|
# recreation is handled correctly.
|
|
for i in range(3):
|
|
await cql.run_async(f"CREATE TABLE \"{ks}\".\"Test\" (pk int PRIMARY KEY, c int);")
|
|
|
|
await cql.run_async(f"INSERT INTO \"{ks}\".\"Test\" (pk, c) VALUES ({i}, {i}) IF NOT EXISTS")
|
|
# For a Paxos round to succeed, a quorum of replicas must respond.
|
|
# Therefore, the Paxos state table must exist on at least a quorum of replicas,
|
|
# though it doesn't need to exist on all of them.
|
|
# The driver calls wait_for_schema_agreement only for DDL queries,
|
|
# so we need to handle quorums explicitly here.
|
|
assert await check_paxos_state_table(True, quorum)
|
|
|
|
await cql.run_async(f"UPDATE \"{ks}\".\"Test\" SET c = {i * 10} WHERE pk = {i} IF c = {i}")
|
|
|
|
rows = await cql.run_async(f"SELECT pk, c FROM \"{ks}\".\"Test\";")
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == i
|
|
assert row.c == i * 10
|
|
|
|
# TRUNCATE goes through the topology_coordinator and triggers a full topology state
|
|
# reload on replicas. We use this to verify that system.tablets remains in a consistent state—
|
|
# e.g., that no orphan tablet maps remain after dropping the Paxos state tables.
|
|
await cql.run_async(f"TRUNCATE TABLE \"{ks}\".\"Test\"")
|
|
|
|
await cql.run_async(f"DROP TABLE \"{ks}\".\"Test\"")
|
|
# The driver implicitly calls wait_for_schema_agreement,
|
|
# so the table must be dropped from all replicas, not just a quorum.
|
|
assert await check_paxos_state_table(False, len(hosts))
|
|
|
|
await manager.get_cql().run_async(f"DROP KEYSPACE \"{ks}\"")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_lwt_during_migration(manager: ManagerClient):
|
|
# Scenario:
|
|
# 1. A cluster with three nodes, a table with one tablet and RF=2
|
|
# 2. Run the tablet migration and suspend it during streaming
|
|
# 3. Run an LWT and verify it succeeds
|
|
# 4. Resume the migration, perform a Paxos read, and check the stored values
|
|
# 5. Do the same for the intranode migration
|
|
|
|
logger.info("Bootstrap a cluster with three nodes")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace',
|
|
'--smp', '2'
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, property_file=[
|
|
{'dc': 'my_dc', 'rack': 'r1'},
|
|
{'dc': 'my_dc', 'rack': 'r2'},
|
|
{'dc': 'my_dc', 'rack': 'r1'}
|
|
])
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
host_ids = await asyncio.gather(*[manager.get_host_id(s.server_id) for s in servers])
|
|
|
|
logger.info("Disable tablet balancing")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Detect a node that doesn't contain a tablet replica")
|
|
tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablets) == 1
|
|
tablet = tablets[0]
|
|
assert len(tablet.replicas) == 2
|
|
[r1, r2] = tablet.replicas
|
|
target_host_id = next((h for h in host_ids if h not in {r1[0], r2[0]}), None)
|
|
assert target_host_id is not None
|
|
target_server = next((s for s, h in zip(servers, host_ids) if h == target_host_id), None)
|
|
assert target_server is not None
|
|
|
|
logger.info(f"Enable 'migration_streaming_wait' injection on {target_server.ip_addr}")
|
|
await manager.api.enable_injection(target_server.ip_addr, 'migration_streaming_wait', True)
|
|
|
|
logger.info(f"Start migration from {r1[0]} to {target_host_id}")
|
|
migration_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
*r1,
|
|
target_host_id, 0, tablet.last_token))
|
|
|
|
logger.info("Open log")
|
|
log = await manager.server_open_log(target_server.server_id)
|
|
logger.info(f"Wait for 'migration_streaming_wait: start' injection on {target_server.ip_addr}")
|
|
await log.wait_for('migration_streaming_wait: start')
|
|
|
|
logger.info("Run an LWT while migration is in-progress")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
|
|
|
|
logger.info("Trigger 'migration_streaming_wait'")
|
|
await manager.api.message_injection(target_server.ip_addr, "migration_streaming_wait")
|
|
|
|
logger.info("Wait for migration to finish")
|
|
await migration_task
|
|
|
|
logger.info("Run paxos SELECT")
|
|
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.SERIAL))
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.c == 1
|
|
|
|
logger.info(f"Enable 'intranode_migration_streaming_wait' injection on {target_server.ip_addr}")
|
|
await manager.api.enable_injection(target_server.ip_addr, 'intranode_migration_streaming_wait', True)
|
|
|
|
logger.info(f"Start intranode migration from on {target_host_id} from shard 0 to shard 1")
|
|
ingranode_migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test",
|
|
target_host_id, 0,
|
|
target_host_id, 1, tablet.last_token))
|
|
|
|
logger.info(f"Wait for 'intranode_migration_streaming: waiting' injection on {target_server.ip_addr}")
|
|
await log.wait_for('intranode_migration_streaming: waiting')
|
|
|
|
logger.info("Run another LWT while ingranode migration is in-progress")
|
|
await cql.run_async(f"UPDATE {ks}.test SET c = 2 WHERE pk = 1 IF c = 1")
|
|
|
|
logger.info("Trigger 'migration_streaming_wait'")
|
|
await manager.api.message_injection(target_server.ip_addr, "intranode_migration_streaming_wait")
|
|
|
|
logger.info("Wait for intranode migration to finish")
|
|
await ingranode_migration_task
|
|
|
|
logger.info("Run paxos SELECT")
|
|
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.SERIAL))
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.c == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_lwt_state_is_preserved_on_tablet_migration(manager: ManagerClient):
|
|
# Scenario:
|
|
# 1. Cells c1 and c2 of some partition are not set.
|
|
# 2. An LWT on {n1, n2} writes 1 to c1, stores accepts on {n1, n2} and learn on n1.
|
|
# 3. Tablet migrates from n2 to n4.
|
|
# 4. Another LWT comes, accepts/learns on {n3, n4} a value "2 if c1 != 1" for the cell c2.
|
|
# If paxos state was not preserved, this LWT would succeed since the quorum {n3, n4} is completely
|
|
# unaware of the decided value 1 for c1. We would miss this value and
|
|
# commit a new write which contradicts it. Depending in the status of the first LWT, this could
|
|
# be valid or. If the first LWT has failed (e.g. cl_learn == quorum == 2, learn succeeded on n1
|
|
# and failed on n2) this is OK, since the user hasn't observed the effects of the first LWT.
|
|
# In this test we use cl_learn == 1 for simplicity (see the comment below), so this ignoring
|
|
# the effects of the first LWT is not valid -- this breaks linearizability.
|
|
# 5. Do the SERIAL (paxos) read of c1 and c2 from {n1, n4}. If paxos state was not preserved, this
|
|
# read would see c1==1 (from learned value on n1) and c2 == 2 (from n4),
|
|
# which contradicts the CQL from 4.
|
|
|
|
logger.info("Bootstrap a cluster with three nodes")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc')
|
|
cql = manager.get_cql()
|
|
|
|
async def set_injection(set_to: list[ServerInfo], injection: str):
|
|
logger.info(f"Injecting {injection} on {set_to}")
|
|
await inject_error_on(manager, injection, set_to)
|
|
unset_from = [s for s in servers if s not in set_to]
|
|
logger.info(f"Disabling {injection} on {unset_from}")
|
|
await disable_injection_on(manager, injection, unset_from)
|
|
|
|
logger.info("Resolve hosts")
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Disable tablet balancing")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c1 int, c2 int);")
|
|
|
|
# Step 2: Inject paxos_error_before_save_proposal - on [n3], paxos_error_before_learn on [n2, n3],
|
|
# so that the paxos proposal for the LWT would be accepted
|
|
# only on n1 and n2 and learnt only on n1. The new value for the CAS
|
|
# operation would be decided.
|
|
# Note: we use cl_learn=1 here to simplify the test, the same problem
|
|
# would be with the default cl_learn=quorum, if learn succeeded on n1 but
|
|
# failed on n2.
|
|
await set_injection([servers[2]], 'paxos_error_before_save_proposal')
|
|
await set_injection([servers[1], servers[2]], 'paxos_error_before_learn')
|
|
logger.info("Execute CAS(c1 := 1)")
|
|
lwt1 = SimpleStatement(f"INSERT INTO {ks}.test (pk, c1) VALUES (1, 1) IF NOT EXISTS",
|
|
consistency_level=ConsistencyLevel.ONE)
|
|
await cql.run_async(lwt1, host=hosts[0])
|
|
|
|
# Step3: start n4, migrate the single table tablet from n2 to n4.
|
|
servers += [await manager.server_add(cmdline=cmdline, property_file={'dc': 'my_dc', 'rack': 'rack2'})]
|
|
n4_host_id = await manager.get_host_id(servers[3].server_id)
|
|
logger.info("Migrating the tablet from n2 to n4")
|
|
n2_host_id = await manager.get_host_id(servers[1].server_id)
|
|
tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablets) == 1
|
|
tablet = tablets[0]
|
|
n2_replica = next(r for r in tablet.replicas if r[0] == n2_host_id)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *n2_replica,
|
|
*(n4_host_id, 0), tablet.last_token)
|
|
|
|
# Step 4: execute LWT on n3, n4
|
|
await set_injection([servers[0]], 'paxos_error_before_save_promise')
|
|
await set_injection([servers[0]], 'paxos_error_before_save_proposal')
|
|
await set_injection([servers[0]], 'paxos_error_before_learn')
|
|
logger.info("Execute CAS(c2 := 2 IF c1 != 1)")
|
|
await cql.run_async(f"UPDATE {ks}.test SET c2 = 2 WHERE pk = 1 IF c1 != 1", host=hosts[2])
|
|
|
|
# Step 5. Do the SERIAL (paxos) read of c1, c2 from {n1, n4}.
|
|
await set_injection([servers[2]], 'paxos_error_before_save_promise')
|
|
await set_injection([servers[2]], 'paxos_error_before_save_proposal')
|
|
await set_injection([servers[2]], 'paxos_error_before_learn')
|
|
logger.info("Execute paxos read")
|
|
lwt_read = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.SERIAL)
|
|
rows = await cql.run_async(lwt_read, host=hosts[1])
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.c1 == 1
|
|
assert row.c2 is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_no_lwt_with_tablets_feature(manager: ManagerClient):
|
|
config = {
|
|
'error_injections_at_startup': [
|
|
{
|
|
'name': 'suppress_features',
|
|
'value': 'LWT_WITH_TABLETS'
|
|
}
|
|
]
|
|
}
|
|
await manager.server_add(config=config)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (key int PRIMARY KEY, val int)")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (key, val) VALUES(1, 0)")
|
|
with pytest.raises(InvalidRequest, match=fr"{ks}\.test.*LWT is not yet supported with tablets"):
|
|
await cql.run_async(f"INSERT INTO {ks}.test (key, val) VALUES(1, 1) IF NOT EXISTS")
|
|
# The query is rejected during the execution phase,
|
|
# so preparing the LWT query is expected to succeed.
|
|
stmt = cql.prepare(
|
|
f"UPDATE {ks}.test SET val = 1 WHERE KEY = ? IF EXISTS")
|
|
with pytest.raises(InvalidRequest, match=fr"{ks}\.test.*LWT is not yet supported with tablets"):
|
|
await cql.run_async(stmt, [1])
|
|
with pytest.raises(InvalidRequest, match=fr"{ks}\.test.*LWT is not yet supported with tablets"):
|
|
await cql.run_async(f"DELETE FROM {ks}.test WHERE key = 1 IF EXISTS")
|
|
res = await cql.run_async(f"SELECT val FROM {ks}.test WHERE key = 1")
|
|
assert res[0].val == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_lwt_state_is_preserved_on_tablet_rebuild(manager: ManagerClient):
|
|
# Scenario:
|
|
# 1. A cluster with 3 nodes, rf=3.
|
|
# 2. A successful LWT(c := 1) with cl_learn = 1 comes, stores accepts on n1 and n2, learn -- only on n1.
|
|
# The user has observed its effects (the LWT is successfull), so we are not allowed to lose it.
|
|
# 3. Node n1 is lost permanently.
|
|
# 4. New node n4 is added to replace n1. The tablet is rebuilt on n4 based on n2 and n3.
|
|
# 5. Do the SERIAL (paxos) read of c from {n3, n4}. If paxos state was not rebuilt, we would
|
|
# miss the decided value c == 1 of the first LWT.
|
|
|
|
logger.info("Bootstrap a cluster with three nodes")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='my_dc')
|
|
cql = manager.get_cql()
|
|
|
|
logger.info("Resolve hosts")
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Disable tablet balancing")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
stopped_servers = set()
|
|
|
|
async def set_injection(set_to: list[ServerInfo], injection: str):
|
|
logger.info(f"Injecting {injection} on {set_to}")
|
|
await inject_error_on(manager, injection, set_to)
|
|
unset_from = [
|
|
s for s in servers if s not in set_to and s not in stopped_servers]
|
|
logger.info(f"Disabling {injection} on {unset_from}")
|
|
await disable_injection_on(manager, injection, unset_from)
|
|
|
|
# Step2. Inject paxos_error_before_save_proposal - on [n3], paxos_error_before_learn on [n2, n3]
|
|
await set_injection([servers[2]], 'paxos_error_before_save_proposal')
|
|
await set_injection([servers[1], servers[2]], 'paxos_error_before_learn')
|
|
logger.info("Execute CAS(c := 1)")
|
|
lwt1 = SimpleStatement(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS",
|
|
consistency_level=ConsistencyLevel.ONE)
|
|
await cql.run_async(lwt1, host=hosts[0])
|
|
|
|
# Step3: Node n1 is lost permanently
|
|
logger.info("Stop n1")
|
|
await manager.server_stop(servers[0].server_id)
|
|
stopped_servers.add(servers[0])
|
|
cql = await reconnect_driver(manager)
|
|
|
|
# Step 4: New node n4 is added to replace n1.
|
|
logger.info("Start n4")
|
|
replace_cfg = ReplaceConfig(
|
|
replaced_id=servers[0].server_id,
|
|
reuse_ip_addr=False,
|
|
use_host_id=True,
|
|
wait_replaced_dead=True
|
|
)
|
|
servers += [await manager.server_add(replace_cfg=replace_cfg,
|
|
cmdline=cmdline,
|
|
property_file={'dc': 'my_dc', 'rack': 'rack1'})]
|
|
# Check we've actually run rebuild
|
|
logs = []
|
|
for s in servers:
|
|
logs.append(await manager.server_open_log(s.server_id))
|
|
assert sum([len(await log.grep('Initiating repair phase of tablet rebuild')) for log in logs]) == 1
|
|
|
|
# Step 5. Do the SERIAL (paxos) read of c from {n3, n4}.
|
|
await set_injection([servers[1]], 'paxos_error_before_save_promise')
|
|
await set_injection([servers[1]], 'paxos_error_before_save_proposal')
|
|
await set_injection([], 'paxos_error_before_learn')
|
|
logger.info("Execute paxos read")
|
|
lwt_read = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.SERIAL)
|
|
rows = await cql.run_async(lwt_read)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.c == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_lwt_concurrent_base_table_recreation(manager: ManagerClient):
|
|
# The test checks that the node doesn't crash when the base table is recreated
|
|
# during LWT execution. A no_such_column_family exception is thrown, and the LWT
|
|
# fails as a result.
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
|
|
server = await manager.server_add(cmdline=cmdline)
|
|
cql = manager.get_cql()
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
async def recreate_table():
|
|
logger.info(f"Drop table if exists {ks}.test")
|
|
await cql.run_async(f"DROP TABLE IF EXISTS {ks}.test;")
|
|
logger.info(f"Create table {ks}.test")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
await recreate_table()
|
|
|
|
logger.info("Inject load_paxos_state-enter")
|
|
await inject_error_one_shot_on(manager, "load_paxos_state-enter", [server])
|
|
|
|
logger.info("Start LWT")
|
|
lwt_task = cql.run_async(
|
|
f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
|
|
|
|
logger.info("Open log")
|
|
log = await manager.server_open_log(server.server_id)
|
|
|
|
logger.info("Wait for load_paxos_state-enter injection")
|
|
await log.wait_for('load_paxos_state-enter: waiting for message')
|
|
|
|
await recreate_table()
|
|
|
|
logger.info("Trigger load_paxos_state-enter")
|
|
await manager.api.message_injection(server.ip_addr, "load_paxos_state-enter")
|
|
|
|
with pytest.raises(WriteFailure):
|
|
await lwt_task
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='debug', reason='aarch64/debug is unpredictably slow', platform_key='aarch64')
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_lwt_timeout_while_creating_paxos_state_table(manager: ManagerClient, build_mode):
|
|
timeout = 10000 if build_mode == 'debug' else 1000
|
|
config = {
|
|
'write_request_timeout_in_ms': timeout,
|
|
'error_injections_at_startup': [
|
|
{
|
|
'name': 'raft-group-registry-fd-threshold-in-ms',
|
|
'value': '500'
|
|
}
|
|
]
|
|
}
|
|
|
|
logger.info("Bootstrap a cluster with three nodes")
|
|
servers = await manager.servers_add(3, config=config)
|
|
cql = manager.get_cql()
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
logger.info(f"Create table {ks}.test")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Stop the second and third nodes")
|
|
await asyncio.gather(manager.server_stop_gracefully(servers[1].server_id),
|
|
manager.server_stop_gracefully(servers[2].server_id))
|
|
|
|
logger.info(f"Running an LWT with timeout {timeout}")
|
|
with pytest.raises(Exception, match="raft operation \\[read_barrier\\] timed out, there is no raft quorum"):
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
|
|
|
|
logger.info("Start the second and third nodes")
|
|
await asyncio.gather(manager.server_start(servers[1].server_id),
|
|
manager.server_start(servers[2].server_id))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_paxos_state_table_permissions(manager: ManagerClient):
|
|
# This test checks permission handling for paxos state tables:
|
|
# * Only a superuser is allowed to access a paxos state table
|
|
# * Even a superuser is not allowed to run ALTER or DROP on paxos state tables
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
config = {
|
|
'authenticator': 'PasswordAuthenticator',
|
|
'authorizer': 'CassandraAuthorizer'
|
|
}
|
|
|
|
manager.auth_provider = PlainTextAuthProvider(
|
|
username="cassandra", password="cassandra")
|
|
|
|
servers = [await manager.server_add(cmdline=cmdline, config=config)]
|
|
cql, _ = await manager.get_ready_cql(servers)
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Run an LWT1")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS")
|
|
|
|
logger.info("Create a role_1")
|
|
await cql.run_async("CREATE ROLE role_1 WITH SUPERUSER = true AND PASSWORD = 'psw' AND LOGIN = true")
|
|
|
|
logger.info("Create a role_2")
|
|
await cql.run_async("CREATE ROLE role_2 WITH SUPERUSER = false AND PASSWORD = 'psw' AND LOGIN = true")
|
|
|
|
logger.info("Grant all permissions on test table to role_2")
|
|
await cql.run_async(f"GRANT ALL PERMISSIONS ON TABLE {ks}.test TO role_2")
|
|
|
|
# We don't attempt to prevent users from granting permissions on $paxos tables,
|
|
# for simplicity. These permissions simply won't take effect — for example,
|
|
# role_2 will still be denied access to the $paxos table.
|
|
logger.info("Grant all permissions on test$paxos table to role_2")
|
|
await cql.run_async(f"GRANT ALL PERMISSIONS ON TABLE \"{ks}\".\"test$paxos\" TO role_2")
|
|
|
|
logger.info("Login as role_1")
|
|
await manager.driver_connect(auth_provider=PlainTextAuthProvider(username="role_1", password="psw"))
|
|
cql = manager.get_cql()
|
|
|
|
logger.info("Run an LWT2")
|
|
await cql.run_async(f"UPDATE {ks}.test SET c = 2 WHERE pk = 1 IF c = 1")
|
|
|
|
logger.info("Select data after LWT2")
|
|
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.SERIAL))
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.c == 2
|
|
|
|
logger.info("Select paxos state as role_1")
|
|
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.\"test$paxos\""))
|
|
assert len(rows) == 1
|
|
|
|
logger.info("Attempt to run DROP TABLE for paxos state table as role_1")
|
|
with pytest.raises(Unauthorized, match=re.escape(f"Cannot DROP <table {ks}.test$paxos>")):
|
|
await cql.run_async(SimpleStatement(f"DROP TABLE {ks}.\"test$paxos\""))
|
|
|
|
logger.info("Login as role_2")
|
|
await manager.driver_connect(auth_provider=PlainTextAuthProvider(username="role_2", password="psw"))
|
|
cql = manager.get_cql()
|
|
|
|
logger.info("Run an LWT3")
|
|
await cql.run_async(f"UPDATE {ks}.test SET c = 3 WHERE pk = 1 IF c = 2")
|
|
|
|
logger.info("Select data after LWT3")
|
|
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.SERIAL))
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.c == 3
|
|
|
|
logger.info("Select paxos state as role_2")
|
|
with pytest.raises(Unauthorized, match=re.escape(f"Only superusers are allowed to SELECT <table {ks}.test$paxos>")):
|
|
await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.\"test$paxos\""))
|
|
|
|
logger.info("Attempt to run DROP TABLE for paxos state table as role_2")
|
|
with pytest.raises(Unauthorized, match=re.escape(f"Cannot DROP <table {ks}.test$paxos>")):
|
|
await cql.run_async(SimpleStatement(f"DROP TABLE {ks}.\"test$paxos\""))
|
|
|
|
# Relogin as a default user to be able to DROP the test keyspace
|
|
await manager.driver_connect()
|
|
cql = manager.get_cql()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lwt_coordinator_shard(manager: ManagerClient):
|
|
# The test checks that an LWT coordinator runs on a replica shard, and not on a 'default' (zero) shard.
|
|
# Scenario:
|
|
# 1. Start a cluster with one node with --smp 2
|
|
# 2. Create a table with one tablet and one replica, move tablet to the second shard
|
|
# 3. Start another node
|
|
# 4. Run an LWT on the second node, check the logs and assert that an LWT was executed on shard 1
|
|
# Note: Before the changes in storage_proxy/get_cas_shard, the last LWT would be executed
|
|
# on shard 0 of the second node because this node doesn't host any replicas of the
|
|
# tablet, and sharder.shard_for_reads would return 0 as the 'default' shard.
|
|
|
|
logger.info("Starting the first node")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace',
|
|
'--smp', '2'
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
cql = manager.get_cql()
|
|
|
|
logger.info("Disable tablet balancing")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Migrating the tablet to shard 1")
|
|
n1_host_id = await manager.get_host_id(servers[0].server_id)
|
|
tablets = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablets) == 1
|
|
tablet = tablets[0]
|
|
assert len(tablet.replicas) == 1
|
|
old_replica = tablet.replicas[0]
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *old_replica,
|
|
*(n1_host_id, 1), tablet.last_token)
|
|
|
|
logger.info("Starting a second node")
|
|
servers += [await manager.server_add(cmdline=cmdline)]
|
|
|
|
logger.info("Wait for cql and get hosts")
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info(f"Execute CAS on {hosts[1]}")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS", host=hosts[1])
|
|
|
|
n2_log = await manager.server_open_log(servers[1].server_id)
|
|
matches = await n2_log.grep("CAS\\[0\\] successful")
|
|
assert len(matches) == 1
|
|
assert "shard 1" in matches[0][0]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='debug', reason='dev is enought: the test checks non-critical functionality')
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_error_message_for_timeout_due_to_write_uncertainty(manager: ManagerClient):
|
|
# LWT can sometimes return WriteTimeout when it is uncertain whether the transaction
|
|
# was applied. In this case, the user should retry the transaction.
|
|
#
|
|
# Scenario:
|
|
# 1. The cluster has three nodes. The first node is configured to throw an error on accept,
|
|
# and the second node is configured to wait for an explicit signal from the test.
|
|
# 2. Run the first LWT on the first node and wait until the coordinator processes the error from it.
|
|
# 3. Run another LWT, which should invalidate all promises made during the first one.
|
|
# 4. Signal the second node to proceed with accept; it must return a reject.
|
|
# 5. At this point, the coordinator is uncertain about the result of the first LWT. Any subsequent LWT
|
|
# might either complete its Paxos round (if a quorum of promises observed the accept) or
|
|
# overwrite it (if a quorum of promises never observed the accept).
|
|
#
|
|
# The test verifies that the WriteTimeout error returned by the system
|
|
# correctly reflects the cause of the problem.
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="mydc")
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
# accept on the first node returns an error
|
|
logger.info("Inject paxos_error_before_save_proposal")
|
|
await inject_error_one_shot_on(manager, "paxos_error_before_save_proposal", [servers[0]])
|
|
|
|
# accept on the second node returns reject
|
|
logger.info("Inject paxos_accept_proposal_wait")
|
|
await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [servers[1]])
|
|
|
|
logger.info("Open log")
|
|
log0 = await manager.server_open_log(servers[0].server_id)
|
|
|
|
logger.info(f"Start LWT1 on {hosts[0]}")
|
|
lwt_task = cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 1) IF NOT EXISTS;"),
|
|
host=hosts[0])
|
|
|
|
logger.info(
|
|
"Wait for the coordinator to process error from the first node")
|
|
await log0.wait_for('accept_proposal: failure while sending proposal')
|
|
|
|
logger.info(f"Run LWT2 on {hosts[1]}")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 2) IF NOT EXISTS", host=hosts[1])
|
|
|
|
logger.info("Trigger paxos_accept_proposal_wait")
|
|
await manager.api.message_injection(servers[1].ip_addr, "paxos_accept_proposal_wait")
|
|
|
|
with pytest.raises(WriteTimeout, match="write timeout due to uncertainty(.*)injected_error_before_save_proposal"):
|
|
await lwt_task
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='debug', reason='dev is enought')
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_no_uncertainty_for_reads(manager: ManagerClient):
|
|
# This test verifies that LWT reads do not produce 'uncertainty' timeouts.
|
|
#
|
|
# The scenario is similar to the write-uncertainty test:
|
|
# 1. The cluster has three nodes. The first node is configured to fail on accept,
|
|
# and the second node is configured to block until explicitly signaled by the test.
|
|
# 2. Run a read LWT on the first node and wait until the coordinator processes an accept error
|
|
# from the first node.
|
|
# 3. Run another LWT, which should invalidate all promises from the first one.
|
|
# 4. Signal the second node to proceed with accept; it must return a reject.
|
|
# 5. At this point, the coordinator is uncertain about the outcome of the first LWT. Any subsequent LWT
|
|
# might either complete its Paxos round (if a quorum of promises observed the accept) or
|
|
# overwrite it (if a quorum did not). Since the first LWT is read-only, the coordinator retries it
|
|
# transparently and returns the effects of the second LWT to the client.
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="mydc")
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
# accept on the first node returns an error
|
|
logger.info("Inject paxos_error_before_save_proposal")
|
|
await inject_error_one_shot_on(manager, "paxos_error_before_save_proposal", [servers[0]])
|
|
|
|
# accept on the second node returns reject
|
|
logger.info("Inject paxos_accept_proposal_wait")
|
|
await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [servers[1]])
|
|
|
|
logger.info("Open log")
|
|
log0 = await manager.server_open_log(servers[0].server_id)
|
|
|
|
logger.info(f"Start LWT1 on {hosts[0]}")
|
|
lwt_read = cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.SERIAL),
|
|
host=hosts[0])
|
|
|
|
logger.info(
|
|
"Wait for the coordinator to process error from the first node")
|
|
await log0.wait_for('accept_proposal: failure while sending proposal')
|
|
|
|
logger.info(f"Run LWT2 on {hosts[1]}")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (1, 2) IF NOT EXISTS", host=hosts[1])
|
|
|
|
logger.info("Trigger paxos_accept_proposal_wait")
|
|
await manager.api.message_injection(servers[1].ip_addr, "paxos_accept_proposal_wait")
|
|
|
|
rows = await lwt_read
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.c == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lwts_for_special_tables(manager: ManagerClient):
|
|
"""
|
|
SELECT commands with SERIAL consistency level are historically allowed for vnode-based views,
|
|
even though they don't provide linearizability guarantees. We prohibit LWTs for tablet-based views,
|
|
but preserve old behavior for vnode-based view for compatibility. Similar logic is applied to
|
|
CDC log tables.
|
|
"""
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
await manager.servers_add(1, cmdline=cmdline)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1 } AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, c int, v int, PRIMARY KEY(pk, c)) WITH cdc = {{'enabled': true}}")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (pk, c)")
|
|
|
|
with pytest.raises(InvalidRequest, match=re.escape("Cannot directly modify a materialized view")):
|
|
await cql.run_async(f"INSERT INTO {ks}.tv (pk, c, v) VALUES (1, 2, 3) IF NOT EXISTS")
|
|
|
|
with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on views: {ks}.tv")):
|
|
await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.tv WHERE pk=1 AND c=2", consistency_level=ConsistencyLevel.SERIAL))
|
|
|
|
with pytest.raises(InvalidRequest, match=re.escape(f"LWT is not supported on CDC log tables: {ks}.test_scylla_cdc_log")):
|
|
await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test_scylla_cdc_log WHERE \"cdc$stream_id\"=0xAB", consistency_level=ConsistencyLevel.SERIAL))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_lwt_shutdown(manager: ManagerClient):
|
|
"""
|
|
This is a regression test for #26355:
|
|
* Start a cluster with two nodes (s0, s1) and a tablet-based table.
|
|
* Inject `paxos_state_learn_after_mutate` on s0 for this table.
|
|
* Execute an LWT with `cl_learn=1` on s0 and wait for it to succeed.
|
|
This LWT leaves a background write to s0 storage.
|
|
* Begin s0 shutdown and wait until it reaches “storage proxy RPC verbs”,
|
|
but not “paxos store”. In other words, storage_proxy::remote should start
|
|
draining, but the paxos store should not yet be destroyed since it is still
|
|
in use.
|
|
* Release the `database_apply_wait` injection.
|
|
* Verify that shutdown completes successfully.
|
|
* After s0 restarts, a node-local read on s0 must return the updated value.
|
|
"""
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
[s0, s1] = await manager.servers_add(2, cmdline=cmdline, property_file=[
|
|
{'dc': 'my_dc', 'rack': 'r1'},
|
|
{'dc': 'my_dc', 'rack': 'r2'}
|
|
])
|
|
(cql, [h0, _]) = await manager.get_ready_cql([s0, s1])
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int)")
|
|
|
|
logger.info(f"Enable 'paxos_state_learn_after_mutate' injection on {s0.ip_addr}")
|
|
await manager.api.enable_injection(s0.ip_addr, 'paxos_state_learn_after_mutate',
|
|
False, parameters={'cf_name': 'test'})
|
|
|
|
logger.info("Run an LWT")
|
|
await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.test (pk, v) VALUES (1, 2) IF NOT EXISTS",
|
|
consistency_level=ConsistencyLevel.ONE),
|
|
host=h0)
|
|
|
|
logger.info("Open log")
|
|
log = await manager.server_open_log(s0.server_id)
|
|
logger.info("Wait for 'paxos_state_learn_after_mutate' injection")
|
|
await log.wait_for('paxos_state_learn_after_mutate: waiting for message')
|
|
|
|
logger.info("Start node shutdown")
|
|
stop_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
|
|
await log.wait_for('Shutting down storage proxy RPC verbs')
|
|
assert len(await log.grep('Shutting down paxos store')) == 0
|
|
|
|
logger.info("Trigger paxos_state_learn_after_mutate")
|
|
await manager.api.message_injection(s0.ip_addr, "paxos_state_learn_after_mutate")
|
|
|
|
logger.info("Waiting for paxos store shutdown")
|
|
await log.wait_for('Shutting down paxos store')
|
|
|
|
logger.info("Waiting for the node shutdown")
|
|
await stop_task
|
|
|
|
logger.info("Restarting server")
|
|
await manager.server_start(s0.server_id)
|
|
|
|
logger.info("Reconnecting the driver")
|
|
cql = await reconnect_driver(manager)
|
|
|
|
logger.info("Wait for cql and get hosts")
|
|
[h0, _] = await wait_for_cql_and_get_hosts(cql, [s0, s1], time.time() + 60)
|
|
|
|
logger.info("Run simple non-paxos read")
|
|
rows = await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 1;",
|
|
consistency_level=ConsistencyLevel.ONE),
|
|
host=h0)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 1
|
|
assert row.v == 2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='debug', reason='dev is enough')
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablets_merge_waits_for_lwt(manager: ManagerClient, scale_timeout):
|
|
"""
|
|
This is a regression test for #26437:
|
|
1. A cluster with one node, a table with rf=1 and two tablets on the same shard.
|
|
2. Run an LWT on the second tablet, make it hang on the paxos_accept_proposal_wait injection.
|
|
The LWT coordinator holds a tablet_metadata_guard with a non-migrating topology erm and global_shard=1.
|
|
3. Trigger tablets merge with the tablet_force_tablet_count_decrease injection, wait for tablet_count == 1.
|
|
This step invalidates the global_shard, the tablet_metadata_guard should stop refreshing erms.
|
|
4. Start tablet migration for the merged tablet, check it hangs on the first global barrier.
|
|
4. Release the paxos_accept_proposal_wait injection, check the barrier is released, the tablet is
|
|
migrated and the LWT succeeded.
|
|
"""
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'paxos=trace',
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'tablets=debug'
|
|
]
|
|
config = {
|
|
# to force faster tablet balancer reactions
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
s0 = await manager.server_add(cmdline=cmdline,
|
|
config=config,
|
|
property_file={'dc': 'dc1', 'rack': 'r1'})
|
|
(cql, _) = await manager.get_ready_cql([s0])
|
|
|
|
# Disable automatic tablet migrations so that they don't interfere with
|
|
# the global barrier checks the test wants to run for the table migration track.
|
|
await manager.api.enable_injection(s0.ip_addr, "tablet_migration_bypass", one_shot=False)
|
|
|
|
logger.info("Create a keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
logger.info("Create a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
tablets = await get_all_tablet_replicas(manager, s0, ks, 'test')
|
|
assert len(tablets) == 2
|
|
for t in tablets:
|
|
assert len(t.replicas) == 1
|
|
(s0_host_id, shard) = t.replicas[0]
|
|
if shard != 0:
|
|
logger.info(f"Migrating tablet {t.last_token} to shard0")
|
|
await manager.api.move_tablet(s0.ip_addr, ks, "test", s0_host_id, shard, s0_host_id, 0, t.last_token)
|
|
|
|
logger.info("Inject paxos_accept_proposal_wait")
|
|
await inject_error_one_shot_on(manager, "paxos_accept_proposal_wait", [s0])
|
|
|
|
logger.info("Run an LWT")
|
|
# We use 3 as a key since its token is handled by the second tablet, which is important for the test.
|
|
lwt = cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (3, 3) IF NOT EXISTS")
|
|
|
|
logger.info("Open log")
|
|
log0 = await manager.server_open_log(s0.server_id)
|
|
|
|
logger.info("Wait for paxos_accept_proposal_wait")
|
|
await log0.wait_for("paxos_accept_proposal_wait: waiting for message")
|
|
|
|
logger.info("Injecting tablet_force_tablet_count_decrease")
|
|
await manager.api.enable_injection(s0.ip_addr, "tablet_force_tablet_count_decrease", one_shot=False)
|
|
|
|
m = await log0.mark()
|
|
|
|
logger.info("Wait for tablet merge to complete")
|
|
await wait_for_tablet_count(manager, s0, ks, 'test', lambda c: c == 1, 1, scale_timeout=scale_timeout, timeout_s=15)
|
|
|
|
logger.info("Ensure the guard decided to retain the erm")
|
|
m, _ = await log0.wait_for("tablet_metadata_guard::check: retain the erm and abort the guard",
|
|
from_mark=m, timeout=10)
|
|
|
|
tablets = await get_all_tablet_replicas(manager, s0, ks, 'test')
|
|
assert len(tablets) == 1
|
|
tablet = tablets[0]
|
|
assert tablet.replicas == [(s0_host_id, 0)]
|
|
|
|
# Since merge now waits for erms before releasing the state machine,
|
|
# the migration initiated below will not start until paxos released the erm.
|
|
# The barrier which is blocked is the one in merge finalization.
|
|
# I keep the tablet movement as a guard against regressions in case the behavior changes.
|
|
|
|
migration_task = asyncio.create_task(manager.api.move_tablet(s0.ip_addr, ks, "test",
|
|
s0_host_id, 0,
|
|
s0_host_id, 1,
|
|
tablet.last_token))
|
|
logger.info("Wait for the global barrier to start draining on shard0")
|
|
await log0.wait_for("\\[shard 0: gms\\] raft_topology - Got raft_topology_cmd::barrier_and_drain", from_mark=m)
|
|
# Just to confirm that the guard still holds the erm
|
|
matches = await log0.grep("\\[shard 0: gms\\] raft_topology - raft_topology_cmd::barrier_and_drain done", from_mark=m)
|
|
assert len(matches) == 0
|
|
|
|
# Before the fix, the tablet migration global barrier did not wait for the LWT operation.
|
|
# As a result, the merged tablet could be migrated to another shard while the LWT is still running.
|
|
# This caused the LWT to crash in paxos_state::prepare/shards_for_writes because of the
|
|
# unexpected tablet shard change.
|
|
logger.info("Trigger 'paxos_accept_proposal_wait'")
|
|
await manager.api.message_injection(s0.ip_addr, "paxos_accept_proposal_wait")
|
|
|
|
logger.info("Wait for the tablet migration task to finish")
|
|
await migration_task
|
|
|
|
logger.info("Wait for the LWT to finish")
|
|
await lwt
|