Function skip_mode works only on function and only in cluster test. This if OK when we need to skip one test, but it's not possible to use it with pytestmark to automatically mark all tests in the file. The goal of this PR is to migrate skip_mode to be dynamic pytest.mark that can be used as ordinary mark. Closes scylladb/scylladb#27853 [avi: apply to test/cluster/test_tablets.py::test_table_creation_wakes_up_balancer]
499 lines
25 KiB
Python
499 lines
25 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from contextlib import asynccontextmanager
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import inject_error_one_shot, read_barrier
|
|
from test.pylib.tablets import get_tablet_replica, get_base_table, get_tablet_count, get_tablet_info
|
|
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, wait_for_view
|
|
from test.cluster.conftest import skip_mode
|
|
from test.cluster.util import new_test_keyspace, new_test_table, new_materialized_view
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
import random
|
|
from cassandra.query import SimpleStatement, ConsistencyLevel
|
|
from cassandra.protocol import InvalidRequest, Unauthorized
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def inject_error_one_shot_on(manager, error_name, servers):
|
|
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def inject_error_on(manager, error_name, servers):
|
|
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def disable_injection_on(manager, error_name, servers):
|
|
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def disable_tablet_balancing(manager, servers):
|
|
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
|
|
|
async def enable_tablet_balancing(manager, servers):
|
|
await asyncio.gather(*[manager.api.enable_tablet_balancing(s.ip_addr) for s in servers])
|
|
|
|
@asynccontextmanager
|
|
async def no_tablet_balancing(manager, servers):
|
|
await disable_tablet_balancing(manager, servers)
|
|
try:
|
|
yield
|
|
finally:
|
|
await enable_tablet_balancing(manager, servers)
|
|
|
|
async def wait_for_tablet_stage(manager, server, keyspace_name, table_name, token, stage):
|
|
async def tablet_is_in_stage():
|
|
ti = await get_tablet_info(manager, server, keyspace_name, table_name, token)
|
|
if ti.stage == stage:
|
|
return True
|
|
await wait_for(tablet_is_in_stage, time.time() + 60)
|
|
|
|
# Test the when creating MVs that have the same partition key as the base table
|
|
# they are co-located with the base table.
|
|
# We create multiple views, some with the same partition key and some not, and
|
|
# check that those views with the same partition key are co-located by reading
|
|
# their tablet map from system.tablets and checking they have base_table set.
|
|
@pytest.mark.asyncio
|
|
async def test_base_view_colocation(manager: ManagerClient):
|
|
cfg = {'enable_tablets': True}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
servers = await manager.servers_add(1, config=cfg, cmdline=cmdline)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
min_tablet_count = 8
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1 } AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, c int, v int, PRIMARY KEY(pk, c)) WITH tablets={{'min_tablet_count':{min_tablet_count}}};")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv1 AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (pk, c)")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv2 AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL AND v IS NOT NULL PRIMARY KEY (pk, v, c)")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv3 AS SELECT * FROM {ks}.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk)")
|
|
|
|
base_id = await manager.get_table_id(ks, 'test')
|
|
tv1_id = await manager.get_view_id(ks, 'tv1')
|
|
tv2_id = await manager.get_view_id(ks, 'tv2')
|
|
tv3_id = await manager.get_view_id(ks, 'tv3')
|
|
|
|
# tv1 and tv2 are co-located with the base table because they have the same partition key
|
|
assert base_id == (await get_base_table(manager, tv1_id))
|
|
assert base_id == (await get_base_table(manager, tv2_id))
|
|
|
|
# tv3 is not co-located with the base table because it has a different partition key
|
|
assert tv3_id == (await get_base_table(manager, tv3_id))
|
|
|
|
base_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert base_tablet_count == (await get_tablet_count(manager, servers[0], ks, 'tv1'))
|
|
assert base_tablet_count == (await get_tablet_count(manager, servers[0], ks, 'tv2'))
|
|
|
|
await cql.run_async(f"DROP MATERIALIZED VIEW {ks}.tv1")
|
|
await cql.run_async(f"DROP MATERIALIZED VIEW {ks}.tv2")
|
|
await cql.run_async(f"DROP MATERIALIZED VIEW {ks}.tv3")
|
|
|
|
# Create co-located base and view tables with a single tablet each in a 2 node
|
|
# cluster, and request to move the tablet of some table (either base or view,
|
|
# according to the parameter) to the other node. It should succeed and move
|
|
# both tablets to be co-located on the other node. After they are moved we
|
|
# stop the other node, remaining only with the one node that should hold the
|
|
# base and view tablets, and verify we can read both tables from this node.
|
|
@pytest.mark.parametrize("move_table", ["base", "child"])
|
|
@pytest.mark.asyncio
|
|
async def test_move_tablet(manager: ManagerClient, move_table: str):
|
|
cfg = {'enable_tablets': True}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline)
|
|
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
# The base and view table should be co-located on one of the nodes with a single tablet each.
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (pk, c)")
|
|
|
|
row_count = 100
|
|
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
|
for pk in range(row_count):
|
|
cql.execute(insert, [pk, pk+1])
|
|
|
|
if move_table == 'base':
|
|
table = 'test'
|
|
else:
|
|
table = 'tv'
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Request to move one of the tablets to the other node. it should move
|
|
# both tablets because they must remain co-located.
|
|
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, table, tablet_token)
|
|
if replica[0] == s0_host_id:
|
|
src_server = 0
|
|
dst_host_id = s1_host_id
|
|
else:
|
|
src_server = 1
|
|
dst_host_id = s0_host_id
|
|
|
|
logger.info(f"Migrate the tablet to {dst_host_id}")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, table, replica[0], replica[1], dst_host_id, 0, tablet_token)
|
|
logger.info("Migration done")
|
|
|
|
new_replica = await get_tablet_replica(manager, servers[0], ks, table, tablet_token)
|
|
assert new_replica[0] == dst_host_id
|
|
|
|
# Now the dst node should hold both tablets. Stop the other node and
|
|
# verify we can read from both tables.
|
|
await manager.server_stop(servers[src_server].server_id)
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test")
|
|
assert len(rows) == row_count
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.tv")
|
|
assert len(rows) == row_count
|
|
|
|
# Start the server back because we need quorom when dropping the keyspace
|
|
await manager.server_start(servers[src_server].server_id)
|
|
|
|
# Basic test of tablet split and merge of co-located tablets.
|
|
# Create co-located base and view tablets and populate the base table with enough data to trigger tablet splits.
|
|
# The view table has a filter so it's empty and wouldn't be a candidate for tablet split normally, but it should
|
|
# be split because it's co-located with the base table and their combined sizes are large enough.
|
|
# We verify that the tablets of both tables are split, and the tables have the same tablet count.
|
|
# Then delete some keys and verify the tablets of both tables are merged.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
@pytest.mark.parametrize(
|
|
"with_merge",
|
|
[
|
|
pytest.param(False, id="no_merge"),
|
|
pytest.param(True, id="with_merge", marks=pytest.mark.xfail(reason="issue #17265")),
|
|
],
|
|
)
|
|
async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--target-tablet-size-in-bytes', '30000',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=1 AND bloom_filter_fp_chance=1;")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL AND pk > 1000000 PRIMARY KEY (pk, c)")
|
|
|
|
# Initial average table size of 400k (1 tablet), so triggers some splits.
|
|
total_keys = 200
|
|
keys = range(total_keys)
|
|
def populate(keys):
|
|
insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)")
|
|
for pk in keys:
|
|
value = random.randbytes(2000)
|
|
cql.execute(insert, [pk, value])
|
|
populate(keys)
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == len(keys)
|
|
|
|
await check()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 1
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Increases the chance of tablet migration concurrent with split
|
|
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
|
|
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Now there's a split and migration need, so they'll potentially run concurrently.
|
|
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
await check()
|
|
await asyncio.sleep(2) # Give load balancer some time to do work
|
|
|
|
await s1_log.wait_for(f"Detected tablet split for table {ks}.test", from_mark=s1_mark, timeout=60)
|
|
await s1_log.wait_for(f"Detected tablet split for table {ks}.tv", from_mark=s1_mark, timeout=60)
|
|
|
|
await check()
|
|
|
|
async with no_tablet_balancing(manager, servers):
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 1
|
|
other_tablet_count = await get_tablet_count(manager, servers[0], ks, 'tv')
|
|
assert tablet_count == other_tablet_count
|
|
|
|
if not with_merge:
|
|
return
|
|
|
|
# Allow shuffling of tablet replicas to make co-location work harder
|
|
async def shuffle():
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
await asyncio.sleep(2)
|
|
await disable_injection_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
await shuffle()
|
|
|
|
# This will allow us to simulate some balancing after co-location with shuffling, to make sure that
|
|
# balancer won't break co-location.
|
|
await inject_error_on(manager, "tablet_merge_completion_bypass", servers)
|
|
|
|
# Shrinks table significantly, forcing merge.
|
|
delete_keys = range(total_keys - 1)
|
|
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys])
|
|
keys = range(total_keys - 1, total_keys)
|
|
|
|
# To avoid race of major with migration
|
|
async with no_tablet_balancing(manager, servers):
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
await manager.api.keyspace_compaction(server.ip_addr, ks)
|
|
|
|
await s1_log.wait_for("Emitting resize decision of type merge", from_mark=s1_mark, timeout=60)
|
|
# Waits for balancer to co-locate sibling tablets
|
|
await s1_log.wait_for("All sibling tablets are co-located", timeout=60)
|
|
# Do some shuffling to make sure balancer works with co-located tablets
|
|
await shuffle()
|
|
|
|
old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await inject_error_on(manager, "replica_merge_completion_wait", servers)
|
|
await disable_injection_on(manager, "tablet_merge_completion_bypass", servers)
|
|
|
|
await s1_log.wait_for(f"Detected tablet merge for table {ks}.test", from_mark=s1_mark, timeout=60)
|
|
await s1_log.wait_for(f"Detected tablet merge for table {ks}.tv", from_mark=s1_mark, timeout=60)
|
|
|
|
async with no_tablet_balancing(manager, servers):
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count < old_tablet_count
|
|
other_tablet_count = await get_tablet_count(manager, servers[0], ks, 'tv')
|
|
assert tablet_count == other_tablet_count
|
|
|
|
await check()
|
|
|
|
# Test creating a co-located table while the base table is migrating. We
|
|
# create a table with a single tablet and start migrating it to the other node.
|
|
# While it's in some transition stage, we hold it and create a co-located view.
|
|
# We verify we can continue read and write to both tables. Then we complete the
|
|
# migration and verify everything continues to work as expected.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
@pytest.mark.parametrize("wait_stage", [("streaming", "stream_tablet_wait"), ("cleanup", "cleanup_tablet_wait")])
|
|
async def test_create_colocated_table_while_base_is_migrating(manager: ManagerClient, wait_stage):
|
|
cfg = {'enable_tablets': True, 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline)
|
|
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
total_keys = 100
|
|
keys = range(total_keys)
|
|
def populate(table, keys):
|
|
insert = cql.prepare(f"INSERT INTO {ks}.{table}(pk, c) VALUES(?, ?)")
|
|
for pk in keys:
|
|
cql.execute(insert, [pk, pk+1])
|
|
|
|
populate('test', keys)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
tablet_token = 0
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
if replica[0] == s0_host_id:
|
|
replica_ip = servers[0].ip_addr
|
|
dst_host_id = s1_host_id
|
|
else:
|
|
replica_ip = servers[1].ip_addr
|
|
dst_host_id = s0_host_id
|
|
|
|
wait_injection = wait_stage[1]
|
|
await inject_error_on(manager, wait_injection, servers)
|
|
move_task = asyncio.create_task(manager.api.move_tablet(servers[0].ip_addr, ks, 'test', replica[0], replica[1], dst_host_id, 0, tablet_token))
|
|
await wait_for_tablet_stage(manager, servers[0], ks, 'test', tablet_token, wait_stage[0])
|
|
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (pk, c) WITH synchronous_updates = true")
|
|
await asyncio.sleep(2)
|
|
|
|
# check we can write and read from the base table during the migration, and
|
|
# that the view is also updated
|
|
cql.execute(f"INSERT INTO {ks}.test(pk, c) VALUES(200, 201)")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test WHERE pk=200")
|
|
assert len(rows) == 1 and rows[0].c == 201
|
|
|
|
await cql.run_async(f"SELECT * FROM {ks}.tv WHERE pk=200 BYPASS CACHE")
|
|
assert len(rows) == 1 and rows[0].c == 201
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
await disable_injection_on(manager, wait_injection, servers)
|
|
await move_task
|
|
|
|
base_id = await manager.get_table_id(ks, 'test')
|
|
tv_id = await manager.get_view_id(ks, 'tv')
|
|
|
|
new_replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
assert new_replica[0] == dst_host_id
|
|
|
|
await wait_for_view(cql, 'tv', 2)
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.tv")
|
|
assert len(rows) == total_keys+1
|
|
|
|
cql.execute(f"INSERT INTO {ks}.test(pk, c) VALUES(300, 301)")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.tv WHERE pk=300 BYPASS CACHE")
|
|
assert len(rows) == 1 and rows[0].c == 301
|
|
|
|
src_host_id, dst_host_id = dst_host_id, replica[0]
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, 'test', src_host_id, 0, dst_host_id, 0, tablet_token)
|
|
|
|
cql.execute(f"INSERT INTO {ks}.test(pk, c) VALUES(400, 401)")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.tv WHERE pk=400 BYPASS CACHE")
|
|
assert len(rows) == 1 and rows[0].c == 401
|
|
|
|
# Test basic tablet repair of a co-located base and view table.
|
|
# 1. start 2 nodes
|
|
# 1. Create a base table and a co-located view table with a single tablet and RF=2 (replica on each node)
|
|
# 2. write data to the base table while one node is down
|
|
# 3. bring the node back up - it is now missing some data
|
|
# 4. run tablet repair on the base table
|
|
# 5. verify both the base table and the view contain the missing data on the node that was down
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip(reason="tablet repair of colocated tables is not supported currently")
|
|
async def test_repair_colocated_base_and_view(manager: ManagerClient):
|
|
cfg = {'enable_tablets': True}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'repair=debug',
|
|
]
|
|
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
|
|
await asyncio.gather(*[manager.api.disable_tablet_balancing(s.ip_addr) for s in servers])
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (pk, c)")
|
|
|
|
cql.execute(SimpleStatement(f"INSERT INTO {ks}.test(pk, c) VALUES(1, 10)", consistency_level=ConsistencyLevel.ONE))
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
await manager.api.flush_keyspace(servers[1].ip_addr, ks)
|
|
|
|
# Stop node 2 and write data while it is down
|
|
await manager.server_stop(servers[1].server_id)
|
|
|
|
cql.execute(SimpleStatement(f"INSERT INTO {ks}.test(pk, c) VALUES(2, 20)", consistency_level=ConsistencyLevel.ONE))
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
# Start node 2 back up
|
|
await manager.server_start(servers[1].server_id)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
|
|
# At this point, node 2 is missing pk=2
|
|
# Verify that pk=2 is missing on node 2 before repair
|
|
# Connect directly to server 2 to check its data
|
|
cql_server2 = await manager.get_cql_exclusive(servers[1])
|
|
rows = await cql_server2.run_async(SimpleStatement(f"SELECT * FROM {ks}.test", consistency_level=ConsistencyLevel.ONE))
|
|
pks = set(row.pk for row in rows)
|
|
assert 1 in pks and 2 not in pks
|
|
|
|
# Trigger repair of the single tablet
|
|
tablet_token = 0
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, 'test', tablet_token)
|
|
|
|
# Verify the view is repaired on server 2
|
|
rows = await cql_server2.run_async(SimpleStatement(f"SELECT * FROM {ks}.tv", consistency_level=ConsistencyLevel.ONE))
|
|
pks = set(row.pk for row in rows)
|
|
assert 1 in pks and 2 in pks
|
|
|
|
# Verify the default tombstone GC mode for colocated tables is 'timeout',
|
|
# and that altering it to 'repair' is not allowed.
|
|
@pytest.mark.asyncio
|
|
async def test_colocated_tables_gc_mode(manager: ManagerClient):
|
|
servers = await manager.servers_add(3, auto_rack_dc="dc1")
|
|
cql = manager.get_cql()
|
|
|
|
def check_tombstone_gc_mode_timeout(cql, table):
|
|
s = list(cql.execute(f"DESC {table}"))[0].create_statement
|
|
assert "'mode': 'timeout'" in s or "mode" not in s # default is timeout
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'dc1': 3 }") as ks:
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int") as table:
|
|
async with new_materialized_view(manager, table, "*", "pk, v", "pk IS NOT NULL AND v IS NOT NULL") as mv:
|
|
check_tombstone_gc_mode_timeout(cql, mv)
|
|
|
|
with pytest.raises(InvalidRequest, match="The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables."):
|
|
await cql.run_async(f"ALTER MATERIALIZED VIEW {mv} WITH tombstone_gc = {{'mode': 'repair'}}")
|
|
|
|
with pytest.raises(InvalidRequest, match="The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables."):
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv2 AS SELECT * FROM {table} WHERE pk IS NOT NULL AND v IS NOT NULL PRIMARY KEY (pk, v) WITH tombstone_gc = {{'mode': 'repair'}}")
|
|
|
|
# a not colocated view with 'repair' mode - should succeed
|
|
async with new_materialized_view(manager, table, "*", "v, pk", "pk IS NOT NULL AND v IS NOT NULL", "WITH tombstone_gc = {'mode': 'repair'}") as mv:
|
|
await cql.run_async(f"ALTER MATERIALIZED VIEW {mv} WITH tombstone_gc = {{'mode': 'timeout'}}")
|
|
await cql.run_async(f"ALTER MATERIALIZED VIEW {mv} WITH tombstone_gc = {{'mode': 'repair'}}")
|
|
|
|
await cql.run_async(f"CREATE INDEX ON {table}((pk),v)")
|
|
view_name = cql.execute("SELECT view_name FROM system_schema.views").one().view_name
|
|
check_tombstone_gc_mode_timeout(cql, f"{ks}.{view_name}")
|
|
|
|
with pytest.raises(InvalidRequest, match="The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables."):
|
|
await cql.run_async(f"ALTER MATERIALIZED VIEW {ks}.{view_name} WITH tombstone_gc = {{'mode': 'repair'}}")
|
|
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", " WITH cdc={'enabled': true}") as table:
|
|
check_tombstone_gc_mode_timeout(cql, f"{table}_scylla_cdc_log")
|
|
|
|
with pytest.raises(InvalidRequest, match="The 'repair' mode for tombstone_gc is not allowed on CDC log tables."):
|
|
await cql.run_async(f"ALTER TABLE {table}_scylla_cdc_log WITH tombstone_gc = {{'mode': 'repair'}}")
|
|
|
|
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int") as table:
|
|
await cql.run_async(f"INSERT INTO {table}(pk, v) VALUES(0, 0)")
|
|
await cql.run_async(f"UPDATE {table} SET v = 1 WHERE pk = 0 IF v = 0")
|
|
|
|
# ensure paxos state table is created on all nodes
|
|
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
|
|
|
ks_name, cf_name = table.split('.')
|
|
table_name = f"{ks_name}.\"{cf_name}$paxos\""
|
|
check_tombstone_gc_mode_timeout(cql, table_name)
|
|
|
|
# paxos table is not allowed to be altered, even not by a superuser.
|
|
with pytest.raises(Unauthorized):
|
|
await cql.run_async(f"ALTER TABLE {table_name} WITH tombstone_gc = {{'mode': 'repair'}}")
|