test_tablets: use run_async instead of execute

Don't block the thread which prevents concurrent tests from running
during this time. Use the dedicated `run_async`.

Also to silence `mypy` which complains that `manager.cql` is `Optional`
(so in theory might be `None`, e.g. after `driver_close`), use
`manager.get_cql()`.

Closes #14109
This commit is contained in:
Kamil Braun
2023-06-01 12:19:22 +02:00
committed by Tomasz Grabiec
parent 8be69fc3a0
commit 34b60ba82b

View File

@@ -34,17 +34,18 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m
# s0 should miss schema and tablet changes
await manager.server_stop_gracefully(s0)
manager.cql.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 3, 'initial_tablets': 100};")
# force s0 to catch up later from the snapshot and not the raft log
await inject_error_one_shot_on(manager, 'raft_server_force_snapshot', not_s0)
manager.cql.execute("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
keys = range(10)
await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 1);") for k in keys])
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 1);") for k in keys])
rows = manager.cql.execute("SELECT * FROM test.test;")
rows = await cql.run_async("SELECT * FROM test.test;")
assert len(list(rows)) == len(keys)
for r in rows:
assert r.c == 1
@@ -52,16 +53,17 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m
manager.driver_close()
await manager.server_start(s0, wait_others=2)
await manager.driver_connect(server=servers[0])
await wait_for_cql_and_get_hosts(manager.cql, [servers[0]], time.time() + 60)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
# Trigger a schema change to invoke schema agreement waiting to make sure that s0 has the latest schema
manager.cql.execute("CREATE KEYSPACE test_dummy WITH replication = {'class': 'NetworkTopologyStrategy', "
await cql.run_async("CREATE KEYSPACE test_dummy WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 1, 'initial_tablets': 1};")
await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist')
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist')
for k in keys])
rows = manager.cql.execute("SELECT * FROM test.test;").all()
rows = await cql.run_async("SELECT * FROM test.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == 2
@@ -70,35 +72,36 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m
for s in servers:
await manager.server_restart(s.server_id, wait_others=2)
await wait_for_cql_and_get_hosts(manager.cql, [servers[0]], time.time() + 60)
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist')
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist')
for k in keys])
rows = manager.cql.execute("SELECT * FROM test.test;").all()
rows = await cql.run_async("SELECT * FROM test.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == 3
manager.cql.execute("DROP KEYSPACE test;")
manager.cql.execute("DROP KEYSPACE test_dummy;")
await cql.run_async("DROP KEYSPACE test;")
await cql.run_async("DROP KEYSPACE test_dummy;")
@pytest.mark.asyncio
async def test_scans(manager: ManagerClient):
manager.cql.execute("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
"'replication_factor': 1, 'initial_tablets': 8};")
manager.cql.execute("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
keys = range(100)
await asyncio.gather(*[manager.cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
rows = manager.cql.execute("SELECT count(*) FROM test.test;")
assert rows.one().count == len(keys)
rows = await cql.run_async("SELECT count(*) FROM test.test;")
assert rows[0].count == len(keys)
rows = manager.cql.execute("SELECT * FROM test.test;").all()
rows = await cql.run_async("SELECT * FROM test.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
manager.cql.execute("DROP KEYSPACE test;")
await cql.run_async("DROP KEYSPACE test;")