From 480a5837ab732be16969fd00a6e42b5cc83e6b6a Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 19 Jan 2025 08:52:50 +0200 Subject: [PATCH] topology_custom/test_group0_schema_versioning: use new_test_keyspace Signed-off-by: Benny Halevy --- .../test_group0_schema_versioning.py | 347 +++++++++--------- 1 file changed, 173 insertions(+), 174 deletions(-) diff --git a/test/topology_custom/test_group0_schema_versioning.py b/test/topology_custom/test_group0_schema_versioning.py index f1d399e27c..0cf98620fb 100644 --- a/test/topology_custom/test_group0_schema_versioning.py +++ b/test/topology_custom/test_group0_schema_versioning.py @@ -18,7 +18,7 @@ from test.pylib.manager_client import ManagerClient, ServerInfo from test.pylib.util import wait_for_cql_and_get_hosts from test.pylib.log_browsing import ScyllaLogFile from test.topology.util import reconnect_driver, wait_until_upgrade_finishes, \ - enter_recovery_state, delete_raft_data_and_upgrade_state + enter_recovery_state, delete_raft_data_and_upgrade_state, new_test_keyspace logger = logging.getLogger(__name__) @@ -89,15 +89,15 @@ async def verify_table_versions_synced(cql: Session, hs: list[Host], ignore_syst await verify_scylla_tables_versions_synced(cql, hs, ignore_system_tables) -async def verify_in_memory_table_versions(srvs: list[ServerInfo], logs: list[ScyllaLogFile], marks: list[int]): +async def verify_in_memory_table_versions(srvs: list[ServerInfo], logs: list[ScyllaLogFile], marks: list[int], table): """ Assumes that `logs` are log files of servers `srvs`, correspondingly in order. Assumes that `marks` are log markers (obtained by `ScyllaLogFile.mark()`) corresponding to `logs` in order. - Assumes that an 'alter table ks.t ...' statement was performed after obtaining `marks`. - Checks that every server printed the same version in `Altering ks.t...' log message. + Assumes that an 'alter table {table} ...' statement was performed after obtaining `marks`. + Checks that every server printed the same version in `Altering {table}...' log message. """ logger.info("Verifying that in-memory table schema versions are in sync") - matches = [await log.grep("Altering ks.t.*version=(.*)", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep(f"Altering {table}.*version=(.*)", from_mark=mark) for log, mark in zip(logs, marks)] def get_version(srv: ServerInfo, matches: list[tuple[str, re.Match[str]]]): if not matches: @@ -132,155 +132,154 @@ async def test_schema_versioning_with_recovery(manager: ManagerClient): hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info("Creating keyspace and table") - await cql.run_async("create keyspace ks with replication = " - "{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") - await verify_table_versions_synced(cql, hosts) - await cql.run_async("create table ks.t (pk int primary key)") + async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks_name: + await verify_table_versions_synced(cql, hosts) + table_name = "t" + table = f"{ks_name}.{table_name}" + await cql.run_async(f"create table {table} (pk int primary key)") - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + logger.info("Waiting for driver") + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - await verify_table_versions_synced(cql, hosts) - ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert ks_t_version - - logs = [await manager.server_open_log(srv.server_id) for srv in servers] - marks = [await log.mark() for log in logs] - - logger.info("Altering table") - await cql.run_async("alter table ks.t with comment = ''") - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert new_ks_t_version - assert new_ks_t_version != ks_t_version - ks_t_version = new_ks_t_version - - # We still have a group 0 majority, don't do this at home. - srv1 = servers[0] - logger.info(f"Rebooting {srv1} in RECOVERY mode") - h1 = next(h for h in hosts if h.address == srv1.ip_addr) - await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h1) - await manager.server_restart(srv1.server_id) - - cql = await reconnect_driver(manager) - logger.info(f"Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - await verify_table_versions_synced(cql, hosts) - - # We're doing a schema change on RECOVERY node while we have two nodes running in group 0 mode. - # Don't do this at home. - # - # Now, the two nodes are not doing any schema changes right now, so this doesn't actually break anything: - # the RECOVERY node is operating using the old schema change procedure, which means - # that it pushes the schema mutations to other nodes directly with RPC, modifying - # the group 0 state machine on other two nodes. - # - # There is one problem with this however. If the RECOVERY node considers some other node - # as DOWN, it will silently *not* push the schema change, completing the operation - # "successfully" nevertheless (it will return to the driver without error). - # Usually in this case we rely on eventual convergence of schema through gossip, - # which will not happen here, because the group 0 nodes are not doing schema pulls! - # So we need to make sure that the RECOVERY node sees the other nodes as UP before - # we perform the schema change, so it pushes the mutations to them. - logger.info(f"Waiting until RECOVERY node ({srv1}) sees other servers as UP") - await manager.server_sees_others(srv1.server_id, 2) - - marks = [await log.mark() for log in logs] - logger.info(f"Altering table on RECOVERY node ({srv1})") - await cql.run_async("alter table ks.t with comment = ''", host=h1) - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert not new_ks_t_version - ks_t_version = new_ks_t_version - - logger.info(f"Stopping {srv1} gracefully") - await manager.server_stop_gracefully(srv1.server_id) - - srv2 = servers[1] - logger.info(f"Waiting until {srv2} sees {srv1} as dead") - await manager.server_not_sees_other_server(srv2.ip_addr, srv1.ip_addr) - - # Now we modify schema through group 0 while the RECOVERY node is dead. - # Don't do this at home. - marks = [await log.mark() for log in logs] - h2 = next(h for h in hosts if h.address == srv2.ip_addr) - logger.info(f"Altering table on group 0 node {srv2}") - await cql.run_async("alter table ks.t with comment = ''", host=h2) - - await manager.server_start(srv1.server_id) - cql = await reconnect_driver(manager) - logger.info(f"Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - logger.info(f"Waiting until {srv2} sees {srv1} as UP") - await manager.server_sees_other_server(srv2.ip_addr, srv1.ip_addr) - - # The RECOVERY node will pull schema when it gets a write. - # The other group 0 node will do a barrier so it will also sync schema before the write returns. - logger.info("Forcing schema sync through CL=ALL INSERT") - await cql.run_async(SimpleStatement("insert into ks.t (pk) values (0)", consistency_level=ConsistencyLevel.ALL), - host=h2) - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert new_ks_t_version - ks_t_version = new_ks_t_version - - srv3 = servers[2] - h3 = next(h for h in hosts if h.address == srv3.ip_addr) - logger.info("Finishing recovery") - for h in [h2, h3]: - await cql.run_async( - "update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h) - await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in [srv2, srv3])) - - cql = await reconnect_driver(manager) - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - for h in [h1, h2, h3]: - await delete_raft_data_and_upgrade_state(cql, h) - - logger.info("Restarting servers") - await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers)) - - cql = await reconnect_driver(manager) - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - logging.info(f"Waiting until upgrade finishes") - for h in [h1, h2, h3]: - await wait_until_upgrade_finishes(cql, h, time.time() + 60) - - await verify_table_versions_synced(cql, hosts) - - for change in [ - "alter table ks.t with comment = ''", - "alter table ks.t add v int", - "alter table ks.t alter v type blob"]: + await verify_table_versions_synced(cql, hosts) + ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name) + assert ks_t_version + logs = [await manager.server_open_log(srv.server_id) for srv in servers] marks = [await log.mark() for log in logs] - logger.info(f"Altering table with \"{change}\"") - await cql.run_async(change) - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') + logger.info("Altering table") + await cql.run_async(f"alter table {table} with comment = ''") + + await verify_table_versions_synced(cql, hosts) + await verify_in_memory_table_versions(servers, logs, marks, table) + + new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name) assert new_ks_t_version assert new_ks_t_version != ks_t_version ks_t_version = new_ks_t_version - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) + # We still have a group 0 majority, don't do this at home. + srv1 = servers[0] + logger.info(f"Rebooting {srv1} in RECOVERY mode") + h1 = next(h for h in hosts if h.address == srv1.ip_addr) + await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h1) + await manager.server_restart(srv1.server_id) - await cql.run_async("drop keyspace ks") + cql = await reconnect_driver(manager) + logger.info(f"Waiting for driver") + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + await verify_table_versions_synced(cql, hosts) + + # We're doing a schema change on RECOVERY node while we have two nodes running in group 0 mode. + # Don't do this at home. + # + # Now, the two nodes are not doing any schema changes right now, so this doesn't actually break anything: + # the RECOVERY node is operating using the old schema change procedure, which means + # that it pushes the schema mutations to other nodes directly with RPC, modifying + # the group 0 state machine on other two nodes. + # + # There is one problem with this however. If the RECOVERY node considers some other node + # as DOWN, it will silently *not* push the schema change, completing the operation + # "successfully" nevertheless (it will return to the driver without error). + # Usually in this case we rely on eventual convergence of schema through gossip, + # which will not happen here, because the group 0 nodes are not doing schema pulls! + # So we need to make sure that the RECOVERY node sees the other nodes as UP before + # we perform the schema change, so it pushes the mutations to them. + logger.info(f"Waiting until RECOVERY node ({srv1}) sees other servers as UP") + await manager.server_sees_others(srv1.server_id, 2) + + marks = [await log.mark() for log in logs] + logger.info(f"Altering table on RECOVERY node ({srv1})") + await cql.run_async(f"alter table {table} with comment = ''", host=h1) + + await verify_table_versions_synced(cql, hosts) + await verify_in_memory_table_versions(servers, logs, marks, table) + + new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name) + assert not new_ks_t_version + ks_t_version = new_ks_t_version + + logger.info(f"Stopping {srv1} gracefully") + await manager.server_stop_gracefully(srv1.server_id) + + srv2 = servers[1] + logger.info(f"Waiting until {srv2} sees {srv1} as dead") + await manager.server_not_sees_other_server(srv2.ip_addr, srv1.ip_addr) + + # Now we modify schema through group 0 while the RECOVERY node is dead. + # Don't do this at home. + marks = [await log.mark() for log in logs] + h2 = next(h for h in hosts if h.address == srv2.ip_addr) + logger.info(f"Altering table on group 0 node {srv2}") + await cql.run_async(f"alter table {table} with comment = ''", host=h2) + + await manager.server_start(srv1.server_id) + cql = await reconnect_driver(manager) + logger.info(f"Waiting for driver") + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + logger.info(f"Waiting until {srv2} sees {srv1} as UP") + await manager.server_sees_other_server(srv2.ip_addr, srv1.ip_addr) + + # The RECOVERY node will pull schema when it gets a write. + # The other group 0 node will do a barrier so it will also sync schema before the write returns. + logger.info("Forcing schema sync through CL=ALL INSERT") + await cql.run_async(SimpleStatement(f"insert into {table} (pk) values (0)", consistency_level=ConsistencyLevel.ALL), + host=h2) + + await verify_table_versions_synced(cql, hosts) + await verify_in_memory_table_versions(servers, logs, marks, table) + + new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name) + assert new_ks_t_version + ks_t_version = new_ks_t_version + + srv3 = servers[2] + h3 = next(h for h in hosts if h.address == srv3.ip_addr) + logger.info("Finishing recovery") + for h in [h2, h3]: + await cql.run_async( + "update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h) + await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in [srv2, srv3])) + + cql = await reconnect_driver(manager) + logger.info("Waiting for driver") + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + for h in [h1, h2, h3]: + await delete_raft_data_and_upgrade_state(cql, h) + + logger.info("Restarting servers") + await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers)) + + cql = await reconnect_driver(manager) + logger.info("Waiting for driver") + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + logging.info(f"Waiting until upgrade finishes") + for h in [h1, h2, h3]: + await wait_until_upgrade_finishes(cql, h, time.time() + 60) + + await verify_table_versions_synced(cql, hosts) + + for change in [ + f"alter table {table} with comment = ''", + f"alter table {table} add v int", + f"alter table {table} alter v type blob"]: + + marks = [await log.mark() for log in logs] + logger.info(f"Altering table with \"{change}\"") + await cql.run_async(change) + + new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name) + assert new_ks_t_version + assert new_ks_t_version != ks_t_version + ks_t_version = new_ks_t_version + + await verify_table_versions_synced(cql, hosts) + await verify_in_memory_table_versions(servers, logs, marks, table) @pytest.mark.asyncio async def test_upgrade(manager: ManagerClient): @@ -311,42 +310,42 @@ async def test_upgrade(manager: ManagerClient): await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info("Creating keyspace and table") - await cql.run_async("create keyspace ks with replication = " - "{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") - await verify_table_versions_synced(cql, hosts) - await cql.run_async("create table ks.t (pk int primary key)") + async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks_name: + table = f"{ks_name}.t" + await verify_table_versions_synced(cql, hosts) + await cql.run_async(f"create table {table} (pk int primary key)") - logging.info(f"Deleting Raft data and upgrade state on {hosts}") - await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts)) + logging.info(f"Deleting Raft data and upgrade state on {hosts}") + await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts)) - logging.info(f"Restarting {servers}") - await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers)) - cql = await reconnect_driver(manager) + logging.info(f"Restarting {servers}") + await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers)) + cql = await reconnect_driver(manager) - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + logger.info("Waiting for driver") + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - logging.info(f"Waiting until Raft upgrade procedure finishes") - await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts)) + logging.info(f"Waiting until Raft upgrade procedure finishes") + await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts)) - logs = [await manager.server_open_log(srv.server_id) for srv in servers] + logs = [await manager.server_open_log(srv.server_id) for srv in servers] - marks = [await log.mark() for log in logs] - logger.info("Altering table") - await cql.run_async("alter table ks.t with comment = ''") + marks = [await log.mark() for log in logs] + logger.info("Altering table") + await cql.run_async(f"alter table {table} with comment = ''") - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) + await verify_table_versions_synced(cql, hosts) + await verify_in_memory_table_versions(servers, logs, marks, table) - # `group0_schema_version` should be present - # and the version column for `ks.t` should be non-null. - for h in hosts: - logger.info(f"Checking that `group0_schema_version` is set on {h}") - assert (await get_group0_schema_version(cql, h)) is not None + # `group0_schema_version` should be present + # and the version column for `{table}` should be non-null. + for h in hosts: + logger.info(f"Checking that `group0_schema_version` is set on {h}") + assert (await get_group0_schema_version(cql, h)) is not None - for h in hosts: - logger.info(f"Checking that `version` column for `ks.t` is set on {h}") - versions = await get_scylla_tables_versions(cql, h) - for ks, _, v in versions: - if ks == "ks": - assert v is not None + for h in hosts: + logger.info(f"Checking that `version` column for `{table}` is set on {h}") + versions = await get_scylla_tables_versions(cql, h) + for ks, _, v in versions: + if ks == "ks": + assert v is not None