diff --git a/test/topology/test_tablets.py b/test/topology/test_tablets.py index 8172e7bf8f..fdd8e23c65 100644 --- a/test/topology/test_tablets.py +++ b/test/topology/test_tablets.py @@ -34,17 +34,18 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m # s0 should miss schema and tablet changes await manager.server_stop_gracefully(s0) - manager.cql.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " "'replication_factor': 3, 'initial_tablets': 100};") # force s0 to catch up later from the snapshot and not the raft log await inject_error_one_shot_on(manager, 'raft_server_force_snapshot', not_s0) - manager.cql.execute("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") keys = range(10) - await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 1);") for k in keys]) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 1);") for k in keys]) - rows = manager.cql.execute("SELECT * FROM test.test;") + rows = await cql.run_async("SELECT * FROM test.test;") assert len(list(rows)) == len(keys) for r in rows: assert r.c == 1 @@ -52,16 +53,17 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m manager.driver_close() await manager.server_start(s0, wait_others=2) await manager.driver_connect(server=servers[0]) - await wait_for_cql_and_get_hosts(manager.cql, [servers[0]], time.time() + 60) + cql = manager.get_cql() + await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60) # Trigger a schema change to invoke schema agreement waiting to make sure that s0 has the latest schema - manager.cql.execute("CREATE KEYSPACE test_dummy WITH replication = {'class': 'NetworkTopologyStrategy', " + await cql.run_async("CREATE KEYSPACE test_dummy WITH replication = {'class': 'NetworkTopologyStrategy', " "'replication_factor': 1, 'initial_tablets': 1};") - await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist') + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist') for k in keys]) - rows = manager.cql.execute("SELECT * FROM test.test;").all() + rows = await cql.run_async("SELECT * FROM test.test;") assert len(rows) == len(keys) for r in rows: assert r.c == 2 @@ -70,35 +72,36 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m for s in servers: await manager.server_restart(s.server_id, wait_others=2) - await wait_for_cql_and_get_hosts(manager.cql, [servers[0]], time.time() + 60) + await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60) - await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist') + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist') for k in keys]) - rows = manager.cql.execute("SELECT * FROM test.test;").all() + rows = await cql.run_async("SELECT * FROM test.test;") assert len(rows) == len(keys) for r in rows: assert r.c == 3 - manager.cql.execute("DROP KEYSPACE test;") - manager.cql.execute("DROP KEYSPACE test_dummy;") + await cql.run_async("DROP KEYSPACE test;") + await cql.run_async("DROP KEYSPACE test_dummy;") @pytest.mark.asyncio async def test_scans(manager: ManagerClient): - manager.cql.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " "'replication_factor': 1, 'initial_tablets': 8};") - manager.cql.execute("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") keys = range(100) - await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) - rows = manager.cql.execute("SELECT count(*) FROM test.test;") - assert rows.one().count == len(keys) + rows = await cql.run_async("SELECT count(*) FROM test.test;") + assert rows[0].count == len(keys) - rows = manager.cql.execute("SELECT * FROM test.test;").all() + rows = await cql.run_async("SELECT * FROM test.test;") assert len(rows) == len(keys) for r in rows: assert r.c == r.pk - manager.cql.execute("DROP KEYSPACE test;") + await cql.run_async("DROP KEYSPACE test;")