mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 00:50:35 +00:00
Add basic cluster tests for CQL forwarding. The test cases include: - basic reads and writes - prepared statements with binds - forwarding from a non-replica - exception passthrough during forwarding (using an injection) - re-preparing a statement on the target node, even if the user query is also an EXECUTE request on a prepared statement - verification metric updates The existing test_basic_write_read was modified so that a few extra cases could be validated on the same cluster.
812 lines
43 KiB
Python
812 lines
43 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.util import gather_safely, wait_for
|
|
from test.cluster.util import new_test_keyspace, new_test_table, reconnect_driver
|
|
from test.pylib.internal_types import HostID, ServerInfo
|
|
from cassandra import ReadTimeout, WriteTimeout
|
|
from cassandra.cluster import ConsistencyLevel
|
|
from cassandra.policies import FallthroughRetryPolicy
|
|
from cassandra.protocol import InvalidRequest
|
|
from cassandra.query import SimpleStatement, BoundStatement
|
|
from test.pylib.tablets import get_all_tablet_replicas, get_tablet_replicas
|
|
|
|
import pytest
|
|
import logging
|
|
import time
|
|
import uuid
|
|
import random
|
|
import asyncio
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def wait_for_leader(manager: ManagerClient, s: ServerInfo, group_id: str):
|
|
async def get_leader_host_id():
|
|
result = await manager.api.get_raft_leader(s.ip_addr, group_id)
|
|
return None if uuid.UUID(result).int == 0 else result
|
|
return await wait_for(get_leader_host_id, time.time() + 60)
|
|
|
|
async def collect_all_raft_state(cql, host):
|
|
state = {}
|
|
rows = await cql.run_async(f"SELECT DISTINCT shard, group_id, vote_term, commit_idx FROM system.raft_groups", host=host)
|
|
logger.info(f"Collected raft state from host {host}:")
|
|
for row in rows:
|
|
assert str(row.group_id) not in state, f"Duplicate raft state for group {row.group_id} shard {row.shard}"
|
|
state[str(row.group_id)] = (row.shard, row.vote_term, row.commit_idx)
|
|
return state
|
|
|
|
|
|
# Verify that:
|
|
# - All raft groups present before an event are still present after the event.
|
|
# - For each raft group, the shard did not change.
|
|
# - For each raft group, vote_term did not decrease (if present).
|
|
# - For each raft group, commit_idx did not decrease (if present).
|
|
def assert_raft_state_continuity(state_before: dict, state_after: dict, context: str):
|
|
for group_id, (shard_before, vote_term_before, commit_idx_before) in state_before.items():
|
|
assert group_id in state_after, (
|
|
f"Raft state for group {group_id} shard {shard_before} missing after {context}."
|
|
)
|
|
shard_after, vote_term_after, commit_idx_after = state_after[group_id]
|
|
|
|
assert shard_before == shard_after, (
|
|
f"Raft state for group {group_id} changed shard from {shard_before} to {shard_after} after {context}."
|
|
)
|
|
|
|
if vote_term_before is not None:
|
|
assert vote_term_after is not None and vote_term_after >= vote_term_before, (
|
|
f"vote_term decreased for group {group_id} shard {shard_before} after {context}: "
|
|
f"{vote_term_before} -> {vote_term_after}."
|
|
)
|
|
|
|
if commit_idx_before is not None:
|
|
assert commit_idx_after is not None and commit_idx_after >= commit_idx_before, (
|
|
f"commit_idx decreased for group {group_id} shard {shard_before} after {context}: "
|
|
f"{commit_idx_before} -> {commit_idx_after}."
|
|
)
|
|
|
|
# Verify that reads and writes to raft tables for strongly consistent tablets
|
|
# are always routed to the expected shards, according to the tablet's
|
|
# assignment.
|
|
async def assert_no_cross_shard_routing(manager: ManagerClient, server: ServerInfo):
|
|
log = await manager.server_open_log(server.server_id)
|
|
|
|
# Check partitioner logs
|
|
partitioner_logs = await log.grep(r"fixed_shard.*get_token: shard=(\d+), token=(\d+)")
|
|
computed_tokens = {}
|
|
for _, match in partitioner_logs:
|
|
shard = int(match.group(1))
|
|
token = int(match.group(2))
|
|
computed_tokens[token] = shard
|
|
|
|
# Check sharder logs
|
|
sharder_logs = await log.grep(r"fixed_shard.*shard_of\((\d+)\) = (\d+)")
|
|
for _, match in sharder_logs:
|
|
token = int(match.group(1))
|
|
shard = int(match.group(2))
|
|
if token not in computed_tokens:
|
|
# This can happen when we read the raft tables as a client, through CQL.
|
|
# In that case, we may try to route a token that does not correspond
|
|
# to any existing raft group.
|
|
continue
|
|
shard_from_partitioner = computed_tokens[token]
|
|
assert shard == shard_from_partitioner, (
|
|
f"Sharder routed token {token} to shard {shard}, "
|
|
f"but partitioner computed shard {shard_from_partitioner}."
|
|
)
|
|
|
|
async def get_table_raft_group_id(manager: ManagerClient, ks: str, table: str):
|
|
table_id = await manager.get_table_id(ks, table)
|
|
rows = await manager.get_cql().run_async(f"SELECT raft_group_id FROM system.tablets where table_id = {table_id}")
|
|
return str(rows[0].raft_group_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_basic_write_read(manager: ManagerClient):
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug'
|
|
]
|
|
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
|
|
logger.info("Load host_id-s for servers")
|
|
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
|
|
|
def host_by_host_id(host_id):
|
|
for hid, host in zip(host_ids, hosts):
|
|
if hid == host_id:
|
|
return host
|
|
raise RuntimeError(f"Can't find host for host_id {host_id}")
|
|
|
|
logger.info("Creating a strongly-consistent keyspace")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
|
logger.info("Creating a table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Select raft group id for the tablet")
|
|
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
|
|
|
logger.info(f"Get current leader for the group {group_id}")
|
|
try:
|
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
|
except:
|
|
# We need to wait for leader on a replica, and first server might not be one
|
|
leader_host_id = await wait_for_leader(manager, servers[1], group_id)
|
|
leader_host = host_by_host_id(leader_host_id)
|
|
|
|
tablet_replicas = await get_tablet_replicas(manager, servers[0], ks, "test", 0)
|
|
assert len(tablet_replicas) == 2
|
|
replica_host_ids = [replica[0] for replica in tablet_replicas]
|
|
|
|
logger.info(f"Get the non-leader replica for the group {group_id}")
|
|
non_leader_replica_host_id = [host_id for host_id in replica_host_ids if str(host_id) != str(leader_host_id)][0]
|
|
non_leader_replica_host = host_by_host_id(non_leader_replica_host_id)
|
|
|
|
logger.info(f"Get the non-replica for the group {group_id}")
|
|
non_replica_host_id = [host_id for host_id in host_ids if str(host_id) not in replica_host_ids][0]
|
|
non_replica_host = host_by_host_id(non_replica_host_id)
|
|
|
|
logger.info(f"Run INSERT statement on leader {leader_host}")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 20)", host=leader_host)
|
|
|
|
logger.info(f"Run SELECT statement on leader {leader_host}")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=leader_host)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 20
|
|
|
|
logger.info(f"Run INSERT statement on non-leader replica {non_leader_replica_host}")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 30)", host=non_leader_replica_host)
|
|
|
|
logger.info(f"Run SELECT statement on non-leader replica {non_leader_replica_host}")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=non_leader_replica_host)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 30
|
|
|
|
logger.info(f"Run INSERT statement on non-replica {non_replica_host}")
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 40)", host=non_replica_host)
|
|
|
|
logger.info(f"Run SELECT statement on non-replica {non_replica_host}")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=non_replica_host)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 40
|
|
|
|
# Test with prepared statements as well
|
|
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
|
bound_insert_stmt = BoundStatement(insert_stmt, consistency_level=ConsistencyLevel.ONE)
|
|
select_stmt = cql.prepare(f"SELECT * FROM {ks}.test WHERE pk = ?")
|
|
bound_select_stmt = BoundStatement(select_stmt, consistency_level=ConsistencyLevel.ONE)
|
|
bound_select_stmt.bind([10])
|
|
|
|
logger.info(f"Run prepared INSERT statement on leader {leader_host}")
|
|
bound_insert_stmt.bind([10, 50])
|
|
await cql.run_async(bound_insert_stmt, host=leader_host)
|
|
|
|
logger.info(f"Run prepared SELECT statement on leader {leader_host}")
|
|
rows = await cql.run_async(bound_select_stmt, host=leader_host)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 50
|
|
|
|
logger.info(f"Run prepared INSERT statement on non-leader replica {non_leader_replica_host}")
|
|
bound_insert_stmt.bind([10, 60])
|
|
await cql.run_async(bound_insert_stmt, host=non_leader_replica_host)
|
|
|
|
logger.info(f"Run prepared SELECT statement on non-leader replica {non_leader_replica_host}")
|
|
rows = await cql.run_async(bound_select_stmt, host=non_leader_replica_host)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 60
|
|
|
|
logger.info(f"Run prepared INSERT statement on non-replica {non_replica_host}")
|
|
bound_insert_stmt.bind([10, 70])
|
|
await cql.run_async(bound_insert_stmt, host=non_replica_host)
|
|
|
|
logger.info(f"Run prepared SELECT statement on non-replica {non_replica_host}")
|
|
rows = await cql.run_async(bound_select_stmt, host=non_replica_host)
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 70
|
|
|
|
# Check that we can restart a server with an active tablets raft group
|
|
await manager.server_restart(servers[2].server_id)
|
|
|
|
# To check that the servers can be stopped gracefully. By default the test runner just kills them.
|
|
await gather_safely(*[manager.server_stop_gracefully(s.server_id) for s in servers])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multi_shard_write_read(manager: ManagerClient):
|
|
"""
|
|
Verify that strongly consistent tables work correctly on non-shard-0.
|
|
|
|
We create a strongly consistent table with 4 tablets and RF=3
|
|
and a cluster with 3 nodes and 4 shards per node.
|
|
Assuming a uniform distribution of tablets to shards,
|
|
each tablet should be assigned to a different shard.
|
|
We have 3 "shard zeros", so at least one tablet should be
|
|
replicated on non-shard-0 nodes only, including its leader.
|
|
|
|
We then perform multiple writes to all nodes, some of which
|
|
should go to non-shard-0 leaders, and verify that all writes
|
|
succeed and that we can read back all data correctly.
|
|
"""
|
|
logger.info("Bootstrapping cluster with 4 shards per node")
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--smp=4',
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug'
|
|
]
|
|
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
|
|
logger.info("Creating a strongly-consistent keyspace with 4 tablets")
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 4} AND consistency = 'local'") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, c int") as table:
|
|
for j in range(50):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({j}, {j})")
|
|
|
|
# Read all data
|
|
for j in range(50):
|
|
rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = {j};")
|
|
assert len(rows) == 1, f"Expected 1 row for pk={j}, got {len(rows)}"
|
|
row = rows[0]
|
|
assert row.c == j, f"Data integrity check failed for pk={j}: expected c={j}, got c={row.c}"
|
|
|
|
await gather_safely(*[manager.server_stop_gracefully(s.server_id) for s in servers])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sc_multishard_metadata_reads(manager: ManagerClient):
|
|
"""
|
|
Verify that multi-shard reads of raft metadata for strongly-consistent tables work correctly.
|
|
"""
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--smp=4',
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug',
|
|
'--logger-log-level', 'fixed_shard=trace',
|
|
]
|
|
server = await manager.server_add(config=config, cmdline=cmdline)
|
|
(cql, hosts) = await manager.get_ready_cql([server])
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8} AND consistency = 'local'") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, c int") as table:
|
|
table_name = table.split('.')[-1]
|
|
table_id = await manager.get_table_id(ks, table_name)
|
|
|
|
# Write some rows to trigger raft table updates
|
|
for pk in range(10):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({pk}, {pk})")
|
|
|
|
# Verify that tablets are allocated also on non-0 shards
|
|
tablets = await get_all_tablet_replicas(manager, server, ks, table_name)
|
|
logger.info(f"Tablet distribution: {tablets}")
|
|
assert set(shard for tablet in tablets for _, shard in tablet.replicas) != set([0]), "Strongly consistent tables shoud be allocated also on non-0 shards"
|
|
|
|
# Verify we have non-empty state
|
|
state = await collect_all_raft_state(cql, hosts[0])
|
|
has_nonempty_state = any(
|
|
commit_idx is not None and commit_idx > 0
|
|
for _, commit_idx, _ in state.values()
|
|
)
|
|
assert has_nonempty_state, "Expected at least one group to have commit_idx > 0 before crash"
|
|
|
|
# Prepare the list of (shard, group_id) pairs for all tablets of the table.
|
|
raft_partition_keys = []
|
|
for row in await cql.run_async(f"SELECT raft_group_id, replicas FROM system.tablets where table_id = {table_id}"):
|
|
assert len(row.replicas) == 1, "Expected RF=1 for the test table"
|
|
(_, shard) = row.replicas[0]
|
|
raft_partition_keys.append((shard, row.raft_group_id))
|
|
assert len(raft_partition_keys) == 8, f"Expected 8 tablets, got {len(raft_partition_keys)}"
|
|
|
|
# Collect raft metadata for all tablets using single-shard queries
|
|
raft_data = {}
|
|
for (shard, group_id) in raft_partition_keys:
|
|
raft_data[(shard, group_id)] = set(await cql.run_async(f"SELECT * FROM system.raft_groups WHERE shard = {shard} AND group_id = {group_id}"))
|
|
# Verify that we can also obtain the same data without knowing the shard using ALLOW FILTERING
|
|
filtered_data = set(await cql.run_async(f"SELECT * FROM system.raft_groups WHERE group_id = {group_id} ALLOW FILTERING"))
|
|
assert raft_data[(shard, group_id)] == filtered_data, f"Data mismatch for group {group_id} when read by shard vs ALLOW FILTERING: {raft_data[(shard, group_id)]} vs {filtered_data}"
|
|
|
|
# Now read raft metadata using multi-shard queries and verify correctness
|
|
for k in range(2, 9):
|
|
sample_keys = random.sample(raft_partition_keys, k)
|
|
shards = tuple(shard for (shard, _) in sample_keys)
|
|
group_ids = ", ".join(f"{group_id}" for group_id in set(group_id for (_, group_id) in sample_keys))
|
|
logger.info(f"Testing multi-shard read with {k} keys: {sample_keys}")
|
|
# We can't specify (shard, group_id) partition key pairs in the IN clause because multi-column restriction are only allowed for clustering keys.
|
|
# Instead, we read using a product of all involved shards and group_ids. This should return the same rows because every group_id
|
|
# is only assigned to one shard.
|
|
rows = await cql.run_async(f"SELECT * FROM system.raft_groups WHERE shard IN {shards} AND group_id IN ({group_ids})")
|
|
for key in sample_keys:
|
|
for row_data in raft_data[key]:
|
|
assert row_data in rows, f"Missing data for raft group {key} in multi-shard read: {row_data}"
|
|
for row in rows:
|
|
assert row in raft_data[(row.shard, row.group_id)], f"Unexpected data for raft group {(row.shard, row.group_id)} in multi-shard read: {row}"
|
|
|
|
# Read ranges of partitions using TOKEN()
|
|
for (boundary_shard, boundary_group_id) in raft_partition_keys:
|
|
ith_token = await cql.run_async(f"SELECT TOKEN(shard, group_id) as t FROM system.raft_groups WHERE shard = {boundary_shard} AND group_id = {boundary_group_id} LIMIT 1")
|
|
token_value = ith_token[0].t
|
|
range_below = await cql.run_async(f"SELECT * FROM system.raft_groups WHERE TOKEN(shard, group_id) <= {token_value}")
|
|
range_above = await cql.run_async(f"SELECT * FROM system.raft_groups WHERE TOKEN(shard, group_id) > {token_value}")
|
|
assert set(range_below) & set(range_above) == set(), f"Overlapping data in range reads up to and above token {token_value}"
|
|
for (shard, group_id) in raft_partition_keys:
|
|
for row_data in raft_data[(shard, group_id)]:
|
|
if shard < boundary_shard:
|
|
assert row_data in range_below, f"Missing data for raft group {(shard, group_id)} in range read up to token {token_value}: {row_data}"
|
|
elif shard > boundary_shard:
|
|
assert row_data in range_above, f"Missing data for raft group {(shard, group_id)} in range read above token {token_value}: {row_data}"
|
|
else:
|
|
if group_id == boundary_group_id:
|
|
assert row_data in range_below, f"Missing data for raft group {(shard, group_id)} in range read up to token {token_value}: {row_data}"
|
|
else:
|
|
assert row_data in range_below or row_data in range_above, f"Missing data for raft group {(shard, group_id)} in range read up to token {token_value}: {row_data}"
|
|
|
|
await manager.server_stop_gracefully(server.server_id)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sc_persistence_restart_with_smp_increase(manager: ManagerClient):
|
|
"""
|
|
Verify that the metadata for strongly-consistent tables
|
|
is preserved after increasing shard count (--smp).
|
|
|
|
The raft tables for strongly consistent tables use a custom partitioner
|
|
and sharder which should not change the shard assignments when the
|
|
number of shards changes. This test verifies that after increasing the SMP
|
|
on a single-node cluster, we don't start reading/writing raft metadata
|
|
from/to incorrect shards.
|
|
"""
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--smp=2',
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug',
|
|
'--logger-log-level', 'fixed_shard=trace',
|
|
]
|
|
server = await manager.server_add(config=config, cmdline=cmdline)
|
|
(cql, hosts) = await manager.get_ready_cql([server])
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2} AND consistency = 'local'") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, c int") as table:
|
|
|
|
# Write some rows to trigger raft table updates
|
|
for pk in range(10):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({pk}, {pk})")
|
|
|
|
state_before = await collect_all_raft_state(cql, hosts[0])
|
|
|
|
await manager.server_update_cmdline(server.server_id, ['--smp=4'])
|
|
await manager.server_restart(server.server_id)
|
|
await reconnect_driver(manager)
|
|
cql = manager.get_cql()
|
|
|
|
# We can't read the internal raft state directly, so we perform extra writes
|
|
# which should cause raft table updates based on the loaded state after restart.
|
|
for pk in range(10, 20):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({pk}, {pk})")
|
|
|
|
state_after = await collect_all_raft_state(cql, hosts[0])
|
|
assert_raft_state_continuity(state_before, state_after, "SMP increase")
|
|
|
|
await assert_no_cross_shard_routing(manager, server)
|
|
|
|
for pk in range(20):
|
|
rows = await cql.run_async(f"SELECT * FROM {table} WHERE pk = {pk};")
|
|
assert len(rows) == 1, f"Expected 1 row for pk={pk}, got {len(rows)}"
|
|
row = rows[0]
|
|
assert row.c == pk, f"Incorrect row read for pk={pk}: expected c={pk}, got c={row.c}"
|
|
|
|
await manager.server_stop_gracefully(server.server_id)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sc_persistence_with_compaction(manager: ManagerClient):
|
|
"""
|
|
Verify that compaction of system.raft_groups works correctly.
|
|
|
|
Regular (non-reshard) compaction does not use the custom partitioner/sharder
|
|
directly, bit it does compact SSTables written with them, so in this test we
|
|
verify that after compaction the raft metadata is still readable and correct.
|
|
"""
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug',
|
|
'--logger-log-level', 'fixed_shard=trace',
|
|
]
|
|
server = await manager.server_add(config=config, cmdline=cmdline)
|
|
(cql, hosts) = await manager.get_ready_cql([server])
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4} AND consistency = 'local'") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, c int") as table:
|
|
|
|
# Create multiple SSTables by doing writes with flushes in between
|
|
for batch in range(3):
|
|
for pk in range(batch * 10, (batch + 1) * 10):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({pk}, {pk})")
|
|
await manager.api.keyspace_flush(server.ip_addr, "system", "raft_groups")
|
|
|
|
state_before_compaction = await collect_all_raft_state(cql, hosts[0])
|
|
|
|
await manager.api.keyspace_compaction(server.ip_addr, "system", "raft_groups")
|
|
|
|
state_after_compaction = await collect_all_raft_state(cql, hosts[0])
|
|
assert_raft_state_continuity(state_before_compaction, state_after_compaction, "compaction")
|
|
|
|
# Restart to verify compacted SSTables are correctly readable by raft server
|
|
await manager.server_restart(server.server_id)
|
|
await reconnect_driver(manager)
|
|
cql = manager.get_cql()
|
|
|
|
# We can't read the internal raft state directly, so we perform extra writes
|
|
# which should cause raft table updates based on the loaded state after restart.
|
|
for pk in range(30, 40):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({pk}, {pk})")
|
|
|
|
state_after_restart = await collect_all_raft_state(cql, hosts[0])
|
|
assert_raft_state_continuity(state_after_compaction, state_after_restart, "compaction + restart")
|
|
|
|
await assert_no_cross_shard_routing(manager, server)
|
|
|
|
await manager.server_stop_gracefully(server.server_id)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sc_persistence_after_crash(manager: ManagerClient):
|
|
"""
|
|
Verify that metadata for strongly-consistent tables is recovered
|
|
after a non-graceful stop (crash simulation).
|
|
"""
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug',
|
|
'--logger-log-level', 'fixed_shard=trace',
|
|
]
|
|
server = await manager.server_add(config=config, cmdline=cmdline)
|
|
(cql, hosts) = await manager.get_ready_cql([server])
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4} AND consistency = 'local'") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, c int") as table:
|
|
# Write some rows to trigger raft table updates
|
|
for pk in range(20):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({pk}, {pk})")
|
|
|
|
# Collect raft state and log entry counts before crash
|
|
state_before_crash = await collect_all_raft_state(cql, hosts[0])
|
|
|
|
await manager.server_stop(server.server_id)
|
|
|
|
await manager.server_start(server.server_id)
|
|
await reconnect_driver(manager)
|
|
cql = manager.get_cql()
|
|
|
|
# We can't read the internal raft state directly, so we perform extra writes
|
|
# which should cause raft table updates based on the loaded state after restart.
|
|
for pk in range(20, 30):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, c) VALUES ({pk}, {pk})")
|
|
|
|
state_after_crash = await collect_all_raft_state(cql, hosts[0])
|
|
assert_raft_state_continuity(state_before_crash, state_after_crash, "crash recovery")
|
|
|
|
await assert_no_cross_shard_routing(manager, server)
|
|
|
|
await manager.server_stop_gracefully(server.server_id)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_no_schema_when_apply_write(manager: ManagerClient):
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug'
|
|
]
|
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
|
# We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups),
|
|
# because we want `servers[2]` to receive Raft commands from others.
|
|
servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})]
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
|
|
|
def host_by_host_id(host_id):
|
|
for hid, host in zip(host_ids, hosts):
|
|
if hid == host_id:
|
|
return host
|
|
raise RuntimeError(f"Can't find host for host_id {host_id}")
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
# Drop incoming append entries from group0 (schema changes) on `servers[2]` after the table is created,
|
|
# so strong consistency raft groups are created on the node but it won't receive next alter table mutations.
|
|
group0_id = (await cql.run_async("SELECT value FROM system.scylla_local WHERE key = 'raft_group0_id'"))[0].value
|
|
await manager.api.enable_injection(servers[2].ip_addr, "raft_drop_incoming_append_entries_for_specified_group", one_shot=False, parameters={'value': group0_id})
|
|
await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=hosts[0])
|
|
|
|
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
|
assert leader_host_id != host_ids[2]
|
|
leader_host = host_by_host_id(leader_host_id)
|
|
|
|
s2_log = await manager.server_open_log(servers[2].server_id)
|
|
s2_mark = await s2_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[2].ip_addr, "disable_raft_drop_append_entries_for_specified_group", one_shot=True)
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c, new_col) VALUES (10, 20, 30)", host=leader_host)
|
|
|
|
await s2_log.wait_for(f"Column definitions for {ks}.test changed", timeout=60, from_mark=s2_mark)
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2])
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 20
|
|
assert row.new_col == 30
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_old_schema_when_apply_write(manager: ManagerClient):
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug'
|
|
]
|
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc='my_dc')
|
|
# We don't want `servers[2]` to be a Raft leader (for both group0 and strong consistency groups),
|
|
# because we want `servers[2]` to receive Raft commands from others.
|
|
servers += [await manager.server_add(config=config | {'error_injections_at_startup': ['avoid_being_raft_leader']}, cmdline=cmdline, property_file={'dc':'my_dc', 'rack': 'rack3'})]
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
|
|
|
def host_by_host_id(host_id):
|
|
for hid, host in zip(host_ids, hosts):
|
|
if hid == host_id:
|
|
return host
|
|
raise RuntimeError(f"Can't find host for host_id {host_id}")
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
group_id = await get_table_raft_group_id(manager, ks, 'test')
|
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
|
assert leader_host_id != host_ids[2]
|
|
leader_host = host_by_host_id(leader_host_id)
|
|
|
|
table_schema_version = (await cql.run_async(f"SELECT version FROM system_schema.scylla_tables WHERE keyspace_name = '{ks}' AND table_name = 'test'"))[0].version
|
|
|
|
await manager.api.enable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply", one_shot=False)
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES (10, 20)", host=leader_host)
|
|
|
|
await cql.run_async(f"ALTER TABLE {ks}.test ADD new_col int;", host=leader_host)
|
|
# Following injection simulates that old schema version was already removed from the memory
|
|
await manager.api.enable_injection(servers[2].ip_addr, "schema_registry_ignore_version", one_shot=False, parameters={'value': table_schema_version})
|
|
await manager.api.message_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply")
|
|
await manager.api.disable_injection(servers[2].ip_addr, "strong_consistency_state_machine_wait_before_apply")
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk = 10;", host=hosts[2])
|
|
assert len(rows) == 1
|
|
row = rows[0]
|
|
assert row.pk == 10
|
|
assert row.c == 20
|
|
assert row.new_col is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reject_user_provided_timestamps(manager: ManagerClient):
|
|
"""
|
|
A simple validation test that makes sure that we don't accept
|
|
user-provided timestamps in queries to strongly consistent tables.
|
|
"""
|
|
|
|
config = {
|
|
'experimental_features': ['strongly-consistent-tables']
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'sc_groups_manager=debug',
|
|
'--logger-log-level', 'sc_coordinator=debug'
|
|
]
|
|
server = await manager.server_add(config=config, cmdline=cmdline)
|
|
cql, _ = await manager.get_ready_cql([server])
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1} AND consistency = 'local'") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int") as table:
|
|
error_msg = "Strongly consistent queries don't support user-provided timestamps"
|
|
with pytest.raises(InvalidRequest, match=error_msg):
|
|
await cql.run_async(f"INSERT INTO {table} (pk, v) VALUES (0, 13) USING TIMESTAMP 23")
|
|
with pytest.raises(InvalidRequest, match=error_msg):
|
|
await cql.run_async(f"UPDATE {table} USING TIMESTAMP 23 SET v = 13 WHERE pk = 0")
|
|
with pytest.raises(InvalidRequest, match=error_msg):
|
|
await cql.run_async(f"DELETE FROM {table} USING TIMESTAMP 23 WHERE pk = 0")
|
|
# FIXME(SCYLLADB-977):
|
|
# Add test cases for batches with timestamps. Remember to
|
|
# handle both whole-batch timestamps, e.g.
|
|
# BEGIN BATCH USING TIMESTAMP ts
|
|
# ...
|
|
# APPLY BATCH
|
|
# as well as timestamps for individual items, e.g.
|
|
# BEGIN BATCH
|
|
# INSERT INTO ... USING TIMESTAMP st;
|
|
# ...
|
|
# APPLY BATCH
|
|
|
|
async def test_forward_cql_prepared_with_bound_values(manager: ManagerClient):
|
|
"""
|
|
When we prepare an statement not on the leader, we should
|
|
still be able to forward it to the leader with bound values.
|
|
In this test we prepare a statement that should be forwarded to
|
|
the leader (so an INSERT statement) and then execute it.
|
|
It should correctly insert the value.
|
|
Then, we execute it again and check that we used the cache
|
|
on the leader side and that it updated the value correctly again.
|
|
"""
|
|
cmdline = [
|
|
'--logger-log-level', 'cql_server=trace',
|
|
'--experimental-features', 'strongly-consistent-tables',
|
|
]
|
|
servers = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc='dc1')
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
|
|
ks_opts = ("WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND consistency = 'local'")
|
|
async with new_test_keyspace(manager, ks_opts) as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, value int") as table:
|
|
table_name = table.split('.')[-1]
|
|
group_id = await get_table_raft_group_id(manager, ks, table_name)
|
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
|
non_leader_host = [host for host in hosts if str(host.host_id) != str(leader_host_id)][0]
|
|
|
|
# Prepare statement
|
|
stmt = cql.prepare(f"INSERT INTO {table} (pk, value) VALUES (0, ?)")
|
|
|
|
# Execute prepared INSERT once to verify results and populate the cache on the leader
|
|
await cql.run_async(stmt, [0], host=non_leader_host)
|
|
res = await cql.run_async(f"SELECT value FROM {table} WHERE pk = 0")
|
|
assert len(res) == 1 and res[0].value == 0
|
|
|
|
# Execute prepared INSERT again to verify that we use the cache on the leader
|
|
traced_execute = cql.execute(stmt, (0,), host=non_leader_host, trace=True)
|
|
res = await cql.run_async(f"SELECT value FROM {table} WHERE pk = 0")
|
|
assert len(res) == 1 and res[0].value == 0
|
|
trace = traced_execute.get_query_trace()
|
|
for event in trace.events:
|
|
logger.info(f"Trace event: {event.description}")
|
|
assert "Prepared statement not found on target" not in event.description
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_forward_cql_cache_invalidation(manager: ManagerClient):
|
|
"""
|
|
Test that cql forwarding works after invalidation of prepared statement cache on schema changes.
|
|
"""
|
|
cmdline = [
|
|
'--logger-log-level', 'cql_server=trace',
|
|
'--experimental-features', 'strongly-consistent-tables',
|
|
]
|
|
servers = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc='dc1')
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
|
|
ks_opts = ("WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1} AND consistency = 'local'")
|
|
async with new_test_keyspace(manager, ks_opts) as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, value int") as table:
|
|
table_name = table.split('.')[-1]
|
|
group_id = await get_table_raft_group_id(manager, ks, table_name)
|
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
|
non_leader_host = [host for host in hosts if str(host.host_id) != str(leader_host_id)][0]
|
|
|
|
insert_stmt = cql.prepare(f"INSERT INTO {table} (pk, value) VALUES (?, ?)")
|
|
select_stmt = cql.prepare(f"SELECT pk, value FROM {table} WHERE pk = ?")
|
|
|
|
await cql.run_async(insert_stmt, [1, 100])
|
|
rows = await cql.run_async(select_stmt, [1])
|
|
assert len(rows) == 1
|
|
assert rows[0].value == 100
|
|
|
|
metrics_before = await manager.metrics.query(non_leader_host.address)
|
|
prepared_not_found_before = metrics_before.get('scylla_transport_requests_forwarded_prepared_not_found') or 0
|
|
|
|
# Altering schema invalidates prepared statement cache
|
|
await cql.run_async(f"ALTER TABLE {table} ADD extra_column text")
|
|
await cql.run_async(insert_stmt, [2, 200], host=non_leader_host)
|
|
rows = await cql.run_async(select_stmt, [2], host=non_leader_host)
|
|
assert len(rows) == 1
|
|
assert rows[0].value == 200
|
|
|
|
metrics_after = await manager.metrics.query(non_leader_host.address)
|
|
prepared_not_found_after = metrics_after.get('scylla_transport_requests_forwarded_prepared_not_found') or 0
|
|
assert prepared_not_found_after > prepared_not_found_before
|
|
|
|
@pytest.mark.skip_mode('release', "error injections aren't enabled in release mode")
|
|
async def test_forward_cql_exception_passthrough(manager: ManagerClient):
|
|
"""
|
|
Verify that coordinator exception returned on the target replica is correctly returned to the client.
|
|
"""
|
|
cmdline = [
|
|
'--logger-log-level', 'cql_server=trace',
|
|
'--experimental-features', 'strongly-consistent-tables',
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc='dc1')
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
host_ids = await gather_safely(*[manager.get_host_id(s.server_id) for s in servers])
|
|
|
|
ks_opts = ("WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1} AND consistency = 'local'")
|
|
async with new_test_keyspace(manager, ks_opts) as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, value int") as table:
|
|
table_name = table.split('.')[-1]
|
|
group_id = await get_table_raft_group_id(manager, ks, table_name)
|
|
|
|
logger.info(f"Get current leader for the group {group_id}")
|
|
try:
|
|
leader_host_id = await wait_for_leader(manager, servers[0], group_id)
|
|
except:
|
|
# We need to wait for leader on a replica, and first server might not be one
|
|
leader_host_id = await wait_for_leader(manager, servers[1], group_id)
|
|
leader_host = [host for host in hosts if str(host.host_id) == str(leader_host_id)][0]
|
|
|
|
tablet_replicas = await get_tablet_replicas(manager, servers[0], ks, table_name, 0)
|
|
assert len(tablet_replicas) == 2
|
|
replica_host_ids = [replica[0] for replica in tablet_replicas]
|
|
|
|
logger.info(f"Get the non-leader replica for the group {group_id}")
|
|
non_leader_replica_host_id = [host_id for host_id in replica_host_ids if str(host_id) != str(leader_host_id)][0]
|
|
non_leader_replica_host = [host for host in hosts if str(host.host_id) == str(non_leader_replica_host_id)][0]
|
|
|
|
logger.info(f"Get the non-replica for the group {group_id}")
|
|
non_replica_host_id = [host_id for host_id in host_ids if str(host_id) not in replica_host_ids][0]
|
|
non_replica_host = [host for host in hosts if str(host.host_id) == str(non_replica_host_id)][0]
|
|
|
|
|
|
logger.info(f"Verify that timeout on the target node is returned to the client and fail metric is incremented")
|
|
await manager.api.enable_injection(leader_host.address, "sc_modification_statement_timeout", one_shot=False)
|
|
metrics = await manager.metrics.query(non_leader_replica_host.address)
|
|
errors_before = metrics.get('scylla_transport_requests_forwarded_failed') or 0
|
|
|
|
with pytest.raises(WriteTimeout):
|
|
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, value) VALUES (1, 1)", retry_policy=FallthroughRetryPolicy()), host=non_leader_replica_host)
|
|
|
|
metrics = await manager.metrics.query(non_leader_replica_host.address)
|
|
errors_after = metrics.get('scylla_transport_requests_forwarded_failed') or 0
|
|
assert errors_after > errors_before
|
|
await manager.api.disable_injection(leader_host.address, "sc_modification_statement_timeout")
|
|
|
|
# Now test that we get correct exception if the cross-node forwarding RPC times out.
|
|
logger.info("Verify that timeout of the forwarding RPC is returned as the correct exception to the client")
|
|
await manager.api.enable_injection(leader_host.address, "wait_before_handling_forwarded_request", one_shot=False)
|
|
with pytest.raises(WriteTimeout):
|
|
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, value) VALUES (1, 1) USING TIMEOUT 500ms", retry_policy=FallthroughRetryPolicy()), host=non_leader_replica_host)
|
|
|
|
await manager.api.enable_injection(non_leader_replica_host.address, "wait_before_handling_forwarded_request", one_shot=False)
|
|
with pytest.raises(ReadTimeout):
|
|
await cql.run_async(SimpleStatement(f"SELECT * FROM {table} WHERE pk = 1 USING TIMEOUT 500ms", retry_policy=FallthroughRetryPolicy()), host=non_replica_host)
|
|
|
|
await manager.api.message_injection(leader_host.address, "wait_before_handling_forwarded_request")
|
|
await manager.api.message_injection(non_leader_replica_host.address, "wait_before_handling_forwarded_request")
|