HostRegistry initialized in several places in the framework, this can lead to the overlapping IP, even though the possibility is low it's not zero. This PR makes host registry initialized once for the master thread and pytest. To avoid communication between with workers, each worker will get its own subnet that it can use solely for its own goals. This simplifies the solution while providing the way to avoid overlapping IP's. Closes scylladb/scylladb#28520
195 lines
8.2 KiB
Python
195 lines
8.2 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
import logging
|
|
import asyncio
|
|
|
|
from test.pylib.host_registry import HostRegistry
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.internal_types import IPAddress
|
|
from test.cluster.util import new_test_keyspace, new_test_table
|
|
from cassandra.cluster import ConsistencyLevel
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Refs #27429
|
|
# Transposed/adapted from dtest with same name
|
|
async def do_test_internode_compression_between_datacenters(manager: ManagerClient, compression: str, verifier) -> None:
|
|
"""
|
|
Verify that compression between datacenters is compressed if internode_compression is set to dc and
|
|
not compressed for intra-dc communication.
|
|
"""
|
|
|
|
class Stats:
|
|
def __init__(self):
|
|
self.bytes_written = 0
|
|
self.max_packet_size = 0
|
|
|
|
def add(self, n: int):
|
|
self.bytes_written = self.bytes_written + n;
|
|
self.max_packet_size = max(self.max_packet_size, n)
|
|
|
|
# Simple python async io proxy type
|
|
class Proxy:
|
|
def __init__(self, listen_addr: str, dest_addr: str, port:int = 7000):
|
|
self.list_addr = listen_addr
|
|
self.dest_addr = dest_addr
|
|
self.port = port
|
|
self.server = None
|
|
self.stats = Stats()
|
|
self.conn_id = 0
|
|
|
|
def reset(self):
|
|
self.stats = Stats()
|
|
|
|
async def start(self):
|
|
# IO worker per direction
|
|
async def pipe(reader, writer, src, dst, cid):
|
|
logger.debug("pipe %s %s -> %s:%s", cid, src, dst, self.port)
|
|
try:
|
|
written = 0
|
|
while not reader.at_eof():
|
|
buf = await reader.read(64*1024)
|
|
n = len(buf)
|
|
if n == 0:
|
|
break
|
|
self.stats.add(n)
|
|
written = written + n
|
|
writer.write(buf)
|
|
await writer.drain()
|
|
#logger.debug("pipe wrote %s %s -> %s:%s, %s", cid, src, dst, self.port, n)
|
|
|
|
if writer.can_write_eof():
|
|
writer.write_eof()
|
|
except Exception as e:
|
|
logger.debug("pipe error %s %s -> %s:%s, %s (%s)", cid, src, dst, self.port, e, e.__class__)
|
|
finally:
|
|
logger.debug("end pipe %s %s -> %s:%s", cid, src, dst, self.port)
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
# Invoked by asyncio.start_server accept
|
|
async def handle_client(local_reader, local_writer):
|
|
cid = self.conn_id
|
|
self.conn_id += 1
|
|
logger.debug("Connection %s %s -> %s %s", cid, self.list_addr, self.dest_addr, self.port)
|
|
while True:
|
|
try:
|
|
# Connect can/will fail repeatedly during startup. Since we
|
|
# need to emulate being a scylla connected to, and we've already
|
|
# been accepted, we can't really return "nope". So just keep trying...
|
|
remote_reader, remote_writer = await asyncio.open_connection(self.dest_addr, self.port)
|
|
pipe1 = pipe(local_reader, remote_writer, self.list_addr, self.dest_addr, cid)
|
|
pipe2 = pipe(remote_reader, local_writer, self.dest_addr, self.list_addr, cid)
|
|
await asyncio.gather(pipe1, pipe2)
|
|
except ConnectionRefusedError as e:
|
|
# retry!
|
|
logger.debug("Connection refused %s %s -> %s:%s (%s)", cid, self.list_addr, self.dest_addr, self.port, e)
|
|
continue
|
|
except Exception as e:
|
|
logger.debug("Connection error %s %s -> %s:%s, %s (%s)", cid, self.list_addr, self.dest_addr, self.port, e, e.__class__)
|
|
finally:
|
|
local_writer.close()
|
|
await local_writer.wait_closed()
|
|
break
|
|
# creates an accepting server
|
|
self.server = await asyncio.start_server(handle_client,self.list_addr, self.port, keep_alive=True)
|
|
|
|
async def run(self):
|
|
async with self.server:
|
|
await self.server.serve_forever()
|
|
|
|
async def stop(self):
|
|
if self.server is not None:
|
|
self.server.close()
|
|
await self.server.wait_closed()
|
|
|
|
logger.info("Creating a new cluster of 2 nodes in 1st DC and 1 node in 2nd DC")
|
|
|
|
dcs = [('dc1','rack1'), ('dc1', 'rack2'), ('dc2', 'rack3')]
|
|
proxy_addrs = [ (await HostRegistry().lease_host(),dc,rack) for dc,rack in dcs]
|
|
seeds = [IPAddress(addr) for addr,_,_ in proxy_addrs]
|
|
seeds = [proxy_addrs[0][0]]
|
|
config = {"internode_compression": compression, "ssl_storage_port": 0 }
|
|
|
|
# create unstarted servers
|
|
# use our proxy addresses as broadcast so we can insert proxies
|
|
servers = [(await manager.server_add(config=config | { 'broadcast_address':addr }, start=False,
|
|
property_file={"dc": dc, "rack": rack}), addr)
|
|
for addr, dc, rack in proxy_addrs]
|
|
|
|
# now create our proxies and start them
|
|
proxies = [Proxy(addr, s.ip_addr) for s,addr in servers]
|
|
for p in proxies:
|
|
await p.start()
|
|
|
|
proxy_futs = [p.run() for p in proxies]
|
|
|
|
# Now we can start servers
|
|
await asyncio.gather(*[manager.server_start(s.server_id, seeds=seeds, connect_driver=True) for s,_ in servers])
|
|
# And connect the driver.
|
|
await manager.driver_connect(servers[0][0])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 }") as ks:
|
|
async with new_test_table(manager, ks, "key int PRIMARY KEY, val TEXT") as table:
|
|
|
|
msg_size = 8192
|
|
insert_stmt = cql.prepare(f"insert into {table} (key, val) values (?, ?)")
|
|
insert_stmt.consistency_level = ConsistencyLevel.ALL
|
|
|
|
# reset all stats. only want IO metrics for the actual insert
|
|
for p in proxies:
|
|
p.reset()
|
|
|
|
# insert a compressible row of size ~8 kiB
|
|
def insert_once():
|
|
cql.execute(insert_stmt, [1, "1" * msg_size])
|
|
|
|
# need to do non-blocking
|
|
loop = asyncio.get_running_loop()
|
|
await loop.run_in_executor(None, insert_once)
|
|
|
|
node1_proxy = proxies[0]
|
|
node2_proxy = proxies[1]
|
|
node3_proxy = proxies[2]
|
|
|
|
verifier(msg_size, node1_proxy, node2_proxy, node3_proxy)
|
|
|
|
await asyncio.gather(*[manager.server_stop(s.server_id) for s,_ in servers])
|
|
await asyncio.gather(*[p.stop() for p in proxies])
|
|
# these will all except, because we just stopped them above
|
|
for coro in proxy_futs:
|
|
try:
|
|
await coro
|
|
except:
|
|
pass
|
|
|
|
async def test_internode_compression_compress_packets_between_nodes(request, manager: ManagerClient) -> None:
|
|
def check_expected(msg_size, node1_proxy, node2_proxy, node3_proxy):
|
|
# get the stats
|
|
max_intra_pkg = max(node1_proxy.stats.max_packet_size, node2_proxy.stats.max_packet_size)
|
|
max_dc_pkg = max(node3_proxy.stats.max_packet_size, node3_proxy.stats.max_packet_size)
|
|
|
|
expected = msg_size / 2
|
|
assert max_dc_pkg < expected
|
|
assert max_intra_pkg < expected
|
|
|
|
await do_test_internode_compression_between_datacenters(manager, "all", check_expected)
|
|
|
|
async def test_internode_compression_between_datacenters(request, manager: ManagerClient) -> None:
|
|
def check_expected(msg_size, node1_proxy, node2_proxy, node3_proxy):
|
|
# get the stats
|
|
max_intra_pkg = max(node1_proxy.stats.max_packet_size, node2_proxy.stats.max_packet_size)
|
|
max_dc_pkg = max(node3_proxy.stats.max_packet_size, node3_proxy.stats.max_packet_size)
|
|
|
|
expected = msg_size / 2
|
|
assert max_dc_pkg < expected
|
|
assert max_intra_pkg > msg_size
|
|
|
|
await do_test_internode_compression_between_datacenters(manager, "dc", check_expected)
|
|
|