diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py index 3540a68b85..59d8143e4a 100644 --- a/test/pylib/manager_client.py +++ b/test/pylib/manager_client.py @@ -18,7 +18,9 @@ from test.pylib.rest_client import UnixRESTClient, ScyllaRESTAPIClient, ScyllaMe from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, Host from test.pylib.internal_types import ServerNum, IPAddress, HostID, ServerInfo from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer -from cassandra.cluster import Session as CassandraSession # type: ignore # pylint: disable=no-name-in-module +from cassandra.cluster import Session as CassandraSession, \ + ExecutionProfile, EXEC_PROFILE_DEFAULT # type: ignore # pylint: disable=no-name-in-module +from cassandra.policies import WhiteListRoundRobinPolicy from cassandra.cluster import Cluster as CassandraCluster # type: ignore # pylint: disable=no-name-in-module from cassandra.auth import AuthProvider import aiohttp @@ -101,6 +103,13 @@ class ManagerClient(): hosts = await wait_for_cql_and_get_hosts(cql, servers, time() + 60) return cql, hosts + @staticmethod + def connect_to_node(server: ServerInfo) -> CassandraSession: + """Connect cql to the specific node in the cluster""" + logger.info(f"Establishing connection to {server.ip_addr}") + profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([server.ip_addr])) + return CassandraCluster([server.ip_addr], execution_profiles={EXEC_PROFILE_DEFAULT: profile}).connect() + # Make driver update endpoints from remote connection def _driver_update(self) -> None: if self.ccluster is not None: diff --git a/test/pylib/random_tables.py b/test/pylib/random_tables.py index b6bbb620ac..a3ae1d9874 100644 --- a/test/pylib/random_tables.py +++ b/test/pylib/random_tables.py @@ -255,17 +255,20 @@ class RandomTable(): class RandomTables(): """A list of managed random tables""" + def __init__(self, test_name: str, manager: ManagerClient, keyspace: str, - replication_factor: int): + replication_factor: int, dc_replication_factor: dict[str, int] = None): + keyspace_query = f"CREATE KEYSPACE {keyspace} WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : {replication_factor}}}" self.test_name = test_name self.manager = manager self.keyspace = keyspace self.tables: List[RandomTable] = [] self.removed_tables: List[RandomTable] = [] assert self.manager.cql is not None - self.manager.cql.execute(f"CREATE KEYSPACE {keyspace} WITH REPLICATION = " - "{ 'class' : 'NetworkTopologyStrategy', " - f"'replication_factor' : {replication_factor} }}") + if dc_replication_factor is not None: + for key, val in dc_replication_factor.items(): + keyspace_query = keyspace_query[:-1] + f",'{key}': {val}}}" + self.manager.cql.execute(keyspace_query) async def add_tables(self, ntables: int = 1, ncolumns: int = 5, if_not_exists: bool = False) -> None: """Add random tables to the list. diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index 775b171815..183a9ed139 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -642,7 +642,8 @@ class ScyllaServer: yaml.dump(self.config, config_file) if self.property_file: with self.property_filename.open('w') as property_file: - yaml.dump(self.property_file, property_file) + for key, value in self.property_file.items(): + property_file.write(f'{key}={value}\n') class ScyllaCluster: diff --git a/test/topology_custom/test_multidc.py b/test/topology_custom/test_multidc.py new file mode 100644 index 0000000000..2c60fc8689 --- /dev/null +++ b/test/topology_custom/test_multidc.py @@ -0,0 +1,141 @@ +# +# Copyright (C) 2022-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +import logging +import sys + +import pytest + +sys.path.insert(0, sys.path[0] + "/test/cql-pytest") +import nodetool +from cassandra import ConsistencyLevel +from cassandra.query import SimpleStatement +from test.pylib.manager_client import ManagerClient +from test.pylib.random_tables import RandomTables, TextType, Column +from test.pylib.util import unique_name + +logger = logging.getLogger(__name__) +CONFIG = {"endpoint_snitch": "GossipingPropertyFileSnitch"} + + +# Checks a cluster boot/operations in multi-dc environment with 30 nodes each in a separate DC +@pytest.mark.asyncio +@pytest.mark.skip(reason="Too heavy for CI, but OK for local run") +async def test_multidc(request: pytest.FixtureRequest, manager: ManagerClient) -> None: + logger.info("Creating a new cluster") + for i in range(30): + s_info = await manager.server_add( + config=CONFIG, + property_file={'dc': f'dc{i}', 'rack': 'myrack1'} + ) + logger.info(s_info) + random_tables = RandomTables(request.node.name, manager, unique_name(), 3) + logger.info("Creating new tables") + await random_tables.add_tables(ntables=3, ncolumns=3) + await random_tables.verify_schema() + +cluster_config = [ + ([1, 2], 1), + ([1, 1, 2, 2], 2) +] + + +# Simple put-get test for 2 DC with a different amount of nodes and different replication factors +@pytest.mark.asyncio +@pytest.mark.parametrize("nodes_list, rf", cluster_config) +async def test_putget_2dc_with_rf( + request: pytest.FixtureRequest, manager: ManagerClient, nodes_list: list[int], rf: int +) -> None: + ks = "test_ks" + cf = "test_cf" + table_name = "test_table_name" + columns = [Column("name", TextType), Column("value", TextType)] + logger.info("Create two servers in different DC's") + for i in nodes_list: + s_info = await manager.server_add( + config=CONFIG, + property_file={"dc": f"dc{i}", "rack": "myrack"}, + ) + logger.info(s_info) + conn = manager.get_cql() + random_tables = RandomTables(request.node.name, manager, ks, rf) + logger.info("Add table") + await random_tables.add_table(ncolumns=2, columns=columns, pks=1, name=table_name) + conn.execute(f"USE {ks}") + conn.execute( + f"CREATE COLUMNFAMILY {cf} ( key varchar, c varchar, v varchar, PRIMARY KEY(key, c)) WITH comment='test cf'" + ) + + logger.info("Perform insert/overwrite") + # create 100 records with values from 0 to 99 + update_query = f"UPDATE {cf} SET v='value%d' WHERE key='k0' AND c='c%02d'" + query_batch = "BEGIN BATCH %s APPLY BATCH" + kvs = [update_query % (i, i) for i in range(100)] + query = SimpleStatement( + query_batch % ";".join(kvs), consistency_level=ConsistencyLevel.QUORUM + ) + conn.execute(query) + nodetool.flush_keyspace(conn, ks) + # overwrite each second value + kvs = [update_query % (i * 4, i * 2) for i in range(50)] + query = SimpleStatement( + query_batch % "; ".join(kvs), consistency_level=ConsistencyLevel.QUORUM + ) + conn.execute(query) + nodetool.flush_keyspace(conn, ks) + # overwrite each fifth value + kvs = [update_query % (i * 20, i * 5) for i in range(20)] + query = SimpleStatement( + query_batch % "; ".join(kvs), consistency_level=ConsistencyLevel.QUORUM + ) + conn.execute(query) + nodetool.flush_keyspace(conn, ks) + + logger.info("Check written data is correct") + query = SimpleStatement(f"SELECT * FROM {cf} WHERE key='k0'", consistency_level=ConsistencyLevel.QUORUM) + rows = list(conn.execute(query)) + assert len(rows) == 100 + for i, row in enumerate(rows): + if i % 5 == 0: + assert row[2] == f"value{i * 4}" + elif i % 2 == 0: + assert row[2] == f"value{i * 2}" + else: + assert row[2] == f"value{i}" + + +@pytest.mark.asyncio +async def test_query_dc_with_rf_0_does_not_crash_db(request: pytest.FixtureRequest, manager: ManagerClient): + """Test querying dc with CL=LOCAL_QUORUM when RF=0 for this dc, does not crash the node and returns None + Covers https://github.com/scylladb/scylla/issues/8354""" + servers = [] + ks = "test_ks" + table_name = "test_table_name" + expected = ["k1", "value1"] + dc_replication = {'dc2': 0} + columns = [Column("name", TextType), Column("value", TextType)] + + for i in [1, 2]: + servers.append(await manager.server_add( + config=CONFIG, + property_file={"dc": f"dc{i}", "rack": "myrack"}, + )) + + dc1_connection = manager.connect_to_node(servers[0]) + dc2_connection = manager.connect_to_node(servers[1]) + random_tables = RandomTables(request.node.name, manager, ks, 1, dc_replication) + await random_tables.add_table(ncolumns=2, columns=columns, pks=1, name=table_name) + dc1_connection.execute( + f"INSERT INTO {ks}.{table_name} ({columns[0].name}, {columns[1].name}) VALUES ('{expected[0]}', '{expected[1]}');") + select_query = SimpleStatement(f"SELECT * from {ks}.{table_name};", + consistency_level=ConsistencyLevel.LOCAL_QUORUM) + nodetool.flush(dc1_connection, "{ks}.{table_name}") + first_node_results = list(dc1_connection.execute(select_query).one()) + second_node_result = dc2_connection.execute(select_query).one() + + assert first_node_results == expected, \ + f"Expected {expected} from {select_query.query_string}, but got {first_node_results}" + assert second_node_result is None, \ + f"Expected no results from {select_query.query_string}, but got {second_node_result}"