topology_custom/test_mv_topology_change: use new_test_keyspace

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-01-19 08:52:50 +02:00
parent 11005b10db
commit ff9c8428df

View File

@@ -16,6 +16,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.tablets import get_tablet_replica
from test.topology.conftest import skip_mode
from test.pylib.util import wait_for
from test.topology.util import new_test_keyspace
logger = logging.getLogger(__name__)
@@ -36,51 +37,51 @@ async def test_mv_topology_change(manager: ManagerClient):
servers = [await manager.server_add(config=cfg, timeout=60) for _ in range(3)]
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};")
await cql.run_async("CREATE TABLE ks.t (pk int primary key, v int)")
await cql.run_async("CREATE materialized view ks.t_view AS select pk, v from ks.t where v is not null primary key (v, pk)")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.t (pk int primary key, v int)")
await cql.run_async(f"CREATE materialized view {ks}.t_view AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
stop_event = asyncio.Event()
concurrency = 10
async def do_writes(start_it, repeat) -> int:
iteration = start_it
while not stop_event.is_set():
start_time = time.time()
try:
await cql.run_async(f"insert into ks.t (pk, v) values ({iteration}, {iteration})")
except NoHostAvailable as e:
for _, err in e.errors.items():
# ConnectionException can be raised when the node is shutting down.
if not isinstance(err, ConnectionException):
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
except Exception as e:
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
iteration += concurrency
if not repeat:
break
await asyncio.sleep(0.01)
return iteration
stop_event = asyncio.Event()
concurrency = 10
async def do_writes(start_it, repeat) -> int:
iteration = start_it
while not stop_event.is_set():
start_time = time.time()
try:
await cql.run_async(f"insert into {ks}.t (pk, v) values ({iteration}, {iteration})")
except NoHostAvailable as e:
for _, err in e.errors.items():
# ConnectionException can be raised when the node is shutting down.
if not isinstance(err, ConnectionException):
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
except Exception as e:
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
iteration += concurrency
if not repeat:
break
await asyncio.sleep(0.01)
return iteration
# to hit the issue #18709 it's enough to start one batch of writes, the effective
# replication maps for base and view will change after the writes start but before they finish
tasks = [asyncio.create_task(do_writes(i, repeat=False)) for i in range(concurrency)]
# to hit the issue #18709 it's enough to start one batch of writes, the effective
# replication maps for base and view will change after the writes start but before they finish
tasks = [asyncio.create_task(do_writes(i, repeat=False)) for i in range(concurrency)]
server = await manager.server_add()
server = await manager.server_add()
await asyncio.gather(*tasks)
await asyncio.gather(*tasks)
[await manager.api.disable_injection(s.ip_addr, "delay_before_get_view_natural_endpoint") for s in servers]
[await manager.api.disable_injection(s.ip_addr, "delay_before_get_view_natural_endpoint") for s in servers]
# to hit the issue #17786 we need to run multiple batches of writes, so that some write is processed while the
# effective replication maps for base and view are different
tasks = [asyncio.create_task(do_writes(i, repeat=True)) for i in range(concurrency)]
await manager.decommission_node(server.server_id)
# to hit the issue #17786 we need to run multiple batches of writes, so that some write is processed while the
# effective replication maps for base and view are different
tasks = [asyncio.create_task(do_writes(i, repeat=True)) for i in range(concurrency)]
await manager.decommission_node(server.server_id)
stop_event.set()
await asyncio.gather(*tasks)
stop_event.set()
await asyncio.gather(*tasks)
# Reproduces #19152
# Verify a pending replica is not doing unnecessary work of building and sending view updates.
@@ -103,68 +104,68 @@ async def test_mv_update_on_pending_replica(manager: ManagerClient, intranode):
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await cql.run_async("CREATE MATERIALIZED VIEW test.mv1 AS SELECT * FROM test.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, pk);")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, pk);")
table_id = await manager.get_table_id('test', 'test')
table_id = await manager.get_table_id(ks, 'test')
servers.append(await manager.server_add(config=cfg, cmdline=cmd))
servers.append(await manager.server_add(config=cfg, cmdline=cmd))
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)")
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
src_shard = replica[1]
dst_shard = 1-replica[1]
assert replica[0] == s0_host_id
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
src_shard = replica[1]
dst_shard = 1-replica[1]
assert replica[0] == s0_host_id
if intranode:
dst_host = s0_host_id
dst_ip = servers[0].ip_addr
streaming_wait_injection = "intranode_migration_streaming_wait"
else:
dst_host = s1_host_id
dst_ip = servers[1].ip_addr
streaming_wait_injection = "stream_mutation_fragments"
if intranode:
dst_host = s0_host_id
dst_ip = servers[0].ip_addr
streaming_wait_injection = "intranode_migration_streaming_wait"
else:
dst_host = s1_host_id
dst_ip = servers[1].ip_addr
streaming_wait_injection = "stream_mutation_fragments"
await manager.api.enable_injection(dst_ip, streaming_wait_injection, one_shot=True)
await manager.api.enable_injection(dst_ip, streaming_wait_injection, one_shot=True)
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, "test", "test", s0_host_id, src_shard, dst_host, dst_shard, tablet_token))
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", s0_host_id, src_shard, dst_host, dst_shard, tablet_token))
async def tablet_is_streaming():
res = await cql.run_async(f"SELECT stage FROM system.tablets WHERE table_id={table_id}")
stage = res[0].stage
return stage == 'streaming' or None
async def tablet_is_streaming():
res = await cql.run_async(f"SELECT stage FROM system.tablets WHERE table_id={table_id}")
stage = res[0].stage
return stage == 'streaming' or None
await wait_for(tablet_is_streaming, time.time() + 60)
await wait_for(tablet_is_streaming, time.time() + 60)
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, {1})")
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, {1})")
# Release abandoned streaming
await manager.api.message_injection(dst_ip, streaming_wait_injection)
# Release abandoned streaming
await manager.api.message_injection(dst_ip, streaming_wait_injection)
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
def get_view_updates_on_wrong_node_count(server):
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
pattern = re.compile("^scylla_database_total_view_updates_on_wrong_node")
for metric in metrics.split('\n'):
if pattern.match(metric) is not None:
return int(float(metric.split()[1]))
def get_view_updates_on_wrong_node_count(server):
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
pattern = re.compile("^scylla_database_total_view_updates_on_wrong_node")
for metric in metrics.split('\n'):
if pattern.match(metric) is not None:
return int(float(metric.split()[1]))
assert all(map(lambda x: x is None or x == 0, [get_view_updates_on_wrong_node_count(server) for server in servers]))
assert all(map(lambda x: x is None or x == 0, [get_view_updates_on_wrong_node_count(server) for server in servers]))
res = await cql.run_async(f"SELECT c FROM test.test WHERE pk={key}")
assert [1] == [x.c for x in res]
res = await cql.run_async(f"SELECT c FROM test.mv1 WHERE pk={key} ALLOW FILTERING")
assert [1] == [x.c for x in res]
res = await cql.run_async(f"SELECT c FROM {ks}.test WHERE pk={key}")
assert [1] == [x.c for x in res]
res = await cql.run_async(f"SELECT c FROM {ks}.mv1 WHERE pk={key} ALLOW FILTERING")
assert [1] == [x.c for x in res]
# Reproduces issue #19529
# Write to a table with MV while one node is stopped, and verify
@@ -179,18 +180,18 @@ async def test_mv_write_to_dead_node(manager: ManagerClient):
servers = await manager.servers_add(4)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}")
await cql.run_async("CREATE TABLE ks.t (pk int primary key, v int)")
await cql.run_async("CREATE materialized view ks.t_view AS select pk, v from ks.t where v is not null primary key (v, pk)")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.t (pk int primary key, v int)")
await cql.run_async(f"CREATE materialized view {ks}.t_view AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
await manager.server_stop_gracefully(servers[-1].server_id)
await manager.server_stop_gracefully(servers[-1].server_id)
# Do inserts. some should generate MV writes to the stopped node
for i in range(100):
await cql.run_async(f"insert into ks.t (pk, v) values ({i}, {i+1})")
# Do inserts. some should generate MV writes to the stopped node
for i in range(100):
await cql.run_async(f"insert into {ks}.t (pk, v) values ({i}, {i+1})")
# Remove the node to trigger a topology change.
# If the MV write is not completed, as in issue #19529, the topology change
# will be held for long time until the write timeouts.
# Otherwise, it is expected to complete in short time.
await manager.remove_node(servers[0].server_id, servers[-1].server_id, timeout=30)
# Remove the node to trigger a topology change.
# If the MV write is not completed, as in issue #19529, the topology change
# will be held for long time until the write timeouts.
# Otherwise, it is expected to complete in short time.
await manager.remove_node(servers[0].server_id, servers[-1].server_id, timeout=30)