diff --git a/test/cluster/object_store/test_backup.py b/test/cluster/object_store/test_backup.py index fed9de9f9a..d5b21209d6 100644 --- a/test/cluster/object_store/test_backup.py +++ b/test/cluster/object_store/test_backup.py @@ -902,32 +902,35 @@ async def test_restore_primary_replica(manager: ManagerClient, object_storage, d scope = "all" expected_replicas = 1 - ks = 'ks' cf = 'cf' + keys = range(256) + replication_str = f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {topology.rf}}}" servers, host_ids = await create_cluster(topology, manager, logger, object_storage) await manager.disable_tablet_balancing() cql = manager.get_cql() - schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger) + async with new_test_keyspace(manager, replication_str) as ks: + cql.execute(create_schema(ks, cf)) + stmt = cql.prepare(f"INSERT INTO {ks}.{cf} ( pk, value ) VALUES (?, ?)") + stmt.consistency_level = ConsistencyLevel.ALL + await asyncio.gather(*(cql.run_async(stmt, (str(k), k)) for k in keys)) - # validate replicas assertions hold on fresh dataset - await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf) + # validate replicas assertions hold on fresh dataset + await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf) - snap_name, sstables = await take_snapshot(ks, servers, manager, logger) - prefix = f'{cf}/{snap_name}' + snap_name, sstables = await take_snapshot(ks, servers, manager, logger) + prefix = f'{cf}/{snap_name}' - await asyncio.gather(*(do_backup(s, snap_name, prefix, ks, cf, object_storage, manager, logger) for s in servers)) + await asyncio.gather(*(do_backup(s, snap_name, prefix, ks, cf, object_storage, manager, logger) for s in servers)) - logger.info(f'Re-initialize keyspace') - cql.execute(f'DROP KEYSPACE {ks}') - cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};")) - cql.execute(schema) + async with new_test_keyspace(manager, replication_str) as ks: + cql.execute(create_schema(ks, cf)) - await asyncio.gather(*(do_restore_server(manager, logger, ks, cf, s, sstables[s], scope, True, prefix, object_storage) for s in servers)) + await asyncio.gather(*(do_restore_server(manager, logger, ks, cf, s, sstables[s], scope, True, prefix, object_storage) for s in servers)) - await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf, expected_replicas=expected_replicas) + await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf, expected_replicas=expected_replicas) logger.info(f'Validate streaming directions') for i, s in enumerate(servers):