test/backup: use new_test_keyspace in test_restore_primary_replica

Replace create_dataset + manual DROP/CREATE KEYSPACE with two sequential
new_test_keyspace context manager blocks, matching the pattern used by
do_test_streaming_scopes. The first block covers backup, the second
covers restore. Keyspace lifecycle is now automatic.

The streaming directions validation loop is moved outside of the second
context block, since it only parses logs and has no dependency on the
keyspace being alive.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Pavel Emelyanov
2026-03-23 16:59:47 +03:00
parent 051107f5bc
commit 2353091cbd

View File

@@ -902,32 +902,35 @@ async def test_restore_primary_replica(manager: ManagerClient, object_storage, d
scope = "all"
expected_replicas = 1
ks = 'ks'
cf = 'cf'
keys = range(256)
replication_str = f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}"
servers, host_ids = await create_cluster(topology, manager, logger, object_storage)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
async with new_test_keyspace(manager, replication_str) as ks:
cql.execute(create_schema(ks, cf))
stmt = cql.prepare(f"INSERT INTO {ks}.{cf} ( pk, value ) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ALL
await asyncio.gather(*(cql.run_async(stmt, (str(k), k)) for k in keys))
# validate replicas assertions hold on fresh dataset
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf)
# validate replicas assertions hold on fresh dataset
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf)
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
prefix = f'{cf}/{snap_name}'
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
prefix = f'{cf}/{snap_name}'
await asyncio.gather(*(do_backup(s, snap_name, prefix, ks, cf, object_storage, manager, logger) for s in servers))
await asyncio.gather(*(do_backup(s, snap_name, prefix, ks, cf, object_storage, manager, logger) for s in servers))
logger.info(f'Re-initialize keyspace')
cql.execute(f'DROP KEYSPACE {ks}')
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
cql.execute(schema)
async with new_test_keyspace(manager, replication_str) as ks:
cql.execute(create_schema(ks, cf))
await asyncio.gather(*(do_restore_server(manager, logger, ks, cf, s, sstables[s], scope, True, prefix, object_storage) for s in servers))
await asyncio.gather(*(do_restore_server(manager, logger, ks, cf, s, sstables[s], scope, True, prefix, object_storage) for s in servers))
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf, expected_replicas=expected_replicas)
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf, expected_replicas=expected_replicas)
logger.info(f'Validate streaming directions')
for i, s in enumerate(servers):