From b6edf056ead2b9b63e89f76d8c12ed34ec0db558 Mon Sep 17 00:00:00 2001 From: Andrei Chekun Date: Thu, 22 Feb 2024 20:39:30 +0100 Subject: [PATCH] Add sanity tests for multi dc Fix writing cassandra-rackdc.properties with correct format data instead of yaml Add a parameter to overwrite RF for specific DC Add the possibility to connect cql to the specific node In this PR 4 tests were added to test multi-DC functionality. One is added from initial commit were multi-DC possibility were introduced, however, this test was not commited. Three of them are migrations from dtest, that later will be deleted. To be able to execute migrated tests additional functionality is added: the ability to connect cql to the specific node in the cluster instead of pooled connection and the possibility to overwrite the replication factor for the specific DC. To be able to use the multi DC in test.py issue with the incorrect format of the properties file fixed in this PR. Closes scylladb/scylladb#17503 --- test/pylib/manager_client.py | 11 ++- test/pylib/random_tables.py | 11 ++- test/pylib/scylla_cluster.py | 3 +- test/topology_custom/test_multidc.py | 141 +++++++++++++++++++++++++++ 4 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 test/topology_custom/test_multidc.py diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 3540a68b85..59d8143e4a 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -18,7 +18,9 @@ from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient, ScyllaMe from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, Host from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer -from cassandra.cluster import Session as CassandraSession # type: ignore # pylint: disable=no-name-in-module +from cassandra.cluster import Session as CassandraSession, \ + ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore # pylint: disable=no-name-in-module +from cassandra.policies import WhiteListRoundRobinPolicy from cassandra.cluster import Cluster as CassandraCluster # type: ignore # pylint: disable=no-name-in-module from cassandra.auth import AuthProvider import aiohttp @@ -101,6 +103,13 @@ class ManagerClient(): hosts = await wait_for_cql_and_get_hosts(cql, servers, time() + 60) return cql, hosts + @staticmethod + def connect_to_node(server: ServerInfo) -> CassandraSession: + """Connect cql to the specific node in the cluster""" + logger.info(f"Establishing connection to {server.ip_addr}") + profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([server.ip_addr])) + return CassandraCluster([server.ip_addr], execution_profiles={EXEC_PROFILE_DEFAULT: profile}).connect() + # Make driver update endpoints from remote connection def _driver_update(self) -> None: if self.ccluster is not None: diff --git a/test/pylib/random_tables.py b/test/pylib/random_tables.py index b6bbb620ac..a3ae1d9874 100644 --- a/test/pylib/random_tables.py +++ b/test/pylib/random_tables.py @@ -255,17 +255,20 @@ class RandomTable(): class RandomTables(): """A list of managed random tables""" + def __init__(self, test_name: str, manager: ManagerClient, keyspace: str, - replication_factor: int): + replication_factor: int, dc_replication_factor: dict[str, int] = None): + keyspace_query = f"CREATE KEYSPACE {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor}}}" self.test_name = test_name self.manager = manager self.keyspace = keyspace self.tables: List[RandomTable] = [] self.removed_tables: List[RandomTable] = [] assert self.manager.cql is not None - self.manager.cql.execute(f"CREATE KEYSPACE {keyspace} WITH REPLICATION = " - "{ 'class' : 'NetworkTopologyStrategy', " - f"'replication_factor' : {replication_factor} }}") + if dc_replication_factor is not None: + for key, val in dc_replication_factor.items(): + keyspace_query = keyspace_query[:-1] + f",'{key}': {val}}}" + self.manager.cql.execute(keyspace_query) async def add_tables(self, ntables: int = 1, ncolumns: int = 5, if_not_exists: bool = False) -> None: """Add random tables to the list. diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index 775b171815..183a9ed139 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -642,7 +642,8 @@ class ScyllaServer: yaml.dump(self.config, config_file) if self.property_file: with self.property_filename.open('w') as property_file: - yaml.dump(self.property_file, property_file) + for key, value in self.property_file.items(): + property_file.write(f'{key}={value}\n') class ScyllaCluster: diff --git a/test/topology_custom/test_multidc.py b/test/topology_custom/test_multidc.py new file mode 100644 index 0000000000..2c60fc8689 --- /dev/null +++ b/test/topology_custom/test_multidc.py @@ -0,0 +1,141 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +import logging +import sys + +import pytest + +sys.path.insert(0, sys.path[0] + "/test/cql-pytest") +import nodetool +from cassandra import ConsistencyLevel +from cassandra.query import SimpleStatement +from test.pylib.manager_client import ManagerClient +from test.pylib.random_tables import RandomTables, TextType, Column +from test.pylib.util import unique_name + +logger = logging.getLogger(__name__) +CONFIG = {"endpoint_snitch": "GossipingPropertyFileSnitch"} + + +# Checks a cluster boot/operations in multi-dc environment with 30 nodes each in a separate DC +@pytest.mark.asyncio +@pytest.mark.skip(reason="Too heavy for CI, but OK for local run") +async def test_multidc(request: pytest.FixtureRequest, manager: ManagerClient) -> None: + logger.info("Creating a new cluster") + for i in range(30): + s_info = await manager.server_add( + config=CONFIG, + property_file={'dc': f'dc{i}', 'rack': 'myrack1'} + ) + logger.info(s_info) + random_tables = RandomTables(request.node.name, manager, unique_name(), 3) + logger.info("Creating new tables") + await random_tables.add_tables(ntables=3, ncolumns=3) + await random_tables.verify_schema() + +cluster_config = [ + ([1, 2], 1), + ([1, 1, 2, 2], 2) +] + + +# Simple put-get test for 2 DC with a different amount of nodes and different replication factors +@pytest.mark.asyncio +@pytest.mark.parametrize("nodes_list, rf", cluster_config) +async def test_putget_2dc_with_rf( + request: pytest.FixtureRequest, manager: ManagerClient, nodes_list: list[int], rf: int +) -> None: + ks = "test_ks" + cf = "test_cf" + table_name = "test_table_name" + columns = [Column("name", TextType), Column("value", TextType)] + logger.info("Create two servers in different DC's") + for i in nodes_list: + s_info = await manager.server_add( + config=CONFIG, + property_file={"dc": f"dc{i}", "rack": "myrack"}, + ) + logger.info(s_info) + conn = manager.get_cql() + random_tables = RandomTables(request.node.name, manager, ks, rf) + logger.info("Add table") + await random_tables.add_table(ncolumns=2, columns=columns, pks=1, name=table_name) + conn.execute(f"USE {ks}") + conn.execute( + f"CREATE COLUMNFAMILY {cf} ( key varchar, c varchar, v varchar, PRIMARY KEY(key, c)) WITH comment='test cf'" + ) + + logger.info("Perform insert/overwrite") + # create 100 records with values from 0 to 99 + update_query = f"UPDATE {cf} SET v='value%d' WHERE key='k0' AND c='c%02d'" + query_batch = "BEGIN BATCH %s APPLY BATCH" + kvs = [update_query % (i, i) for i in range(100)] + query = SimpleStatement( + query_batch % ";".join(kvs), consistency_level=ConsistencyLevel.QUORUM + ) + conn.execute(query) + nodetool.flush_keyspace(conn, ks) + # overwrite each second value + kvs = [update_query % (i * 4, i * 2) for i in range(50)] + query = SimpleStatement( + query_batch % "; ".join(kvs), consistency_level=ConsistencyLevel.QUORUM + ) + conn.execute(query) + nodetool.flush_keyspace(conn, ks) + # overwrite each fifth value + kvs = [update_query % (i * 20, i * 5) for i in range(20)] + query = SimpleStatement( + query_batch % "; ".join(kvs), consistency_level=ConsistencyLevel.QUORUM + ) + conn.execute(query) + nodetool.flush_keyspace(conn, ks) + + logger.info("Check written data is correct") + query = SimpleStatement(f"SELECT * FROM {cf} WHERE key='k0'", consistency_level=ConsistencyLevel.QUORUM) + rows = list(conn.execute(query)) + assert len(rows) == 100 + for i, row in enumerate(rows): + if i % 5 == 0: + assert row[2] == f"value{i * 4}" + elif i % 2 == 0: + assert row[2] == f"value{i * 2}" + else: + assert row[2] == f"value{i}" + + +@pytest.mark.asyncio +async def test_query_dc_with_rf_0_does_not_crash_db(request: pytest.FixtureRequest, manager: ManagerClient): + """Test querying dc with CL=LOCAL_QUORUM when RF=0 for this dc, does not crash the node and returns None + Covers https://github.com/scylladb/scylla/issues/8354""" + servers = [] + ks = "test_ks" + table_name = "test_table_name" + expected = ["k1", "value1"] + dc_replication = {'dc2': 0} + columns = [Column("name", TextType), Column("value", TextType)] + + for i in [1, 2]: + servers.append(await manager.server_add( + config=CONFIG, + property_file={"dc": f"dc{i}", "rack": "myrack"}, + )) + + dc1_connection = manager.connect_to_node(servers[0]) + dc2_connection = manager.connect_to_node(servers[1]) + random_tables = RandomTables(request.node.name, manager, ks, 1, dc_replication) + await random_tables.add_table(ncolumns=2, columns=columns, pks=1, name=table_name) + dc1_connection.execute( + f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('{expected[0]}', '{expected[1]}');") + select_query = SimpleStatement(f"SELECT * from {ks}.{table_name};", + consistency_level=ConsistencyLevel.LOCAL_QUORUM) + nodetool.flush(dc1_connection, "{ks}.{table_name}") + first_node_results = list(dc1_connection.execute(select_query).one()) + second_node_result = dc2_connection.execute(select_query).one() + + assert first_node_results == expected, \ + f"Expected {expected} from {select_query.query_string}, but got {first_node_results}" + assert second_node_result is None, \ + f"Expected no results from {select_query.query_string}, but got {second_node_result}"