Files
scylladb/test/cluster/test_alternator.py
Artsiom Mishuta d1198f8318 test.py: rename topology_custom folder to cluster
rename topology_custom folder to cluster
as it contains not only topology test cases
2025-03-04 10:32:44 +01:00

561 lines
28 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
# Multi-node tests for Alternator.
#
# Please note that most tests for Alternator are single-node tests and can
# be found in the test/alternator directory. Most functional testing of the
# many different syntax features that Alternator provides don't need more
# than a single node to be tested, and should be able to run also on DynamoDB
# - not just on Alternator, which the test/alternator framework allows to do.
# So only the minority of tests that do need a bigger cluster should be here.
import pytest
import asyncio
import logging
import time
import boto3
import botocore
from botocore.exceptions import ClientError
import requests
import json
from cassandra.auth import PlainTextAuthProvider
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.cluster.conftest import skip_mode
logger = logging.getLogger(__name__)
# Convenience function to open a connection to Alternator usable by the
# AWS SDK.
alternator_config = {
'alternator_port': 8000,
'alternator_write_isolation': 'only_rmw_uses_lwt',
'alternator_ttl_period_in_seconds': '0.5',
}
def get_alternator(ip, user='alternator', passwd='secret_pass'):
url = f"http://{ip}:{alternator_config['alternator_port']}"
return boto3.resource('dynamodb', endpoint_url=url,
region_name='us-east-1',
aws_access_key_id=user,
aws_secret_access_key=passwd,
config=botocore.client.Config(
retries={"max_attempts": 0},
read_timeout=300)
)
# Alternator convenience function for fetching the entire result set of a
# query into an array of items.
def full_query(table, ConsistentRead=True, **kwargs):
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
items = response['Items']
while 'LastEvaluatedKey' in response:
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
ConsistentRead=ConsistentRead, **kwargs)
items.extend(response['Items'])
return items
# FIXME: boto3 is NOT async. So all tests that use it are not really async.
# We could use the aioboto3 library to write a really asynchronous test, or
# implement an async wrapper to the boto3 functions ourselves (e.g., run them
# in a separate thread) ourselves.
test_table_prefix = 'alternator_Test_'
def unique_table_name():
current_ms = int(round(time.time() * 1000))
# If unique_table_name() is called twice in the same millisecond...
if unique_table_name.last_ms >= current_ms:
current_ms = unique_table_name.last_ms + 1
unique_table_name.last_ms = current_ms
return test_table_prefix + str(current_ms)
unique_table_name.last_ms = 0
async def test_alternator_ttl_scheduling_group(manager: ManagerClient):
"""A reproducer for issue #18719: The expiration scans and deletions
initiated by the Alternator TTL feature are supposed to run entirely in
the "streaming" scheduling group. But because of a bug in inheritance
of scheduling groups through RPC, some of the work ended up being done
on the "statement" scheduling group.
This test verifies that Alternator TTL work is done on the right
scheduling group.
This test assumes that the cluster is not concurrently busy with
running any other workload - so we won't see any work appearing
in the wrong scheduling group. We can assume this because we don't
run multiple tests in parallel on the same cluster.
"""
servers = await manager.servers_add(3, config=alternator_config)
alternator = get_alternator(servers[0].ip_addr)
table = alternator.create_table(TableName=unique_table_name(),
BillingMode='PAY_PER_REQUEST',
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH' },
],
AttributeDefinitions=[
{'AttributeName': 'p', 'AttributeType': 'N' },
])
# Enable expiration (TTL) on attribute "expiration"
table.meta.client.update_time_to_live(TableName=table.name, TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
# Insert N rows, setting them all to expire 3 seconds from now.
N = 100
expiration = int(time.time())+3
with table.batch_writer() as batch:
for p in range(N):
batch.put_item(Item={'p': p, 'expiration': expiration})
# Unfortunately, Alternator has no way of doing the writes above with
# CL=ALL, only CL=QUORUM. So at this point we're not sure all the writes
# above have completed. We want to wait until they are over, so that we
# won't measure any of those writes in the statement scheduling group.
# Let's do it by checking the metrics of background writes and wait for
# them to drop to zero.
ips = [server.ip_addr for server in await manager.running_servers()]
timeout = time.time() + 60
while True:
if time.time() > timeout:
pytest.fail("timed out waiting for background writes to complete")
bg_writes = 0
for ip in ips:
metrics = await manager.metrics.query(ip)
bg_writes += metrics.get('scylla_storage_proxy_coordinator_background_writes')
if bg_writes == 0:
break # done waiting for the background writes to finish
await asyncio.sleep(0.1)
# Get the current amount of work (in CPU ms) done across all nodes and
# shards in different scheduling groups. We expect this to increase
# considerably for the streaming group while expiration scanning is
# proceeding, but not increase at all for the statement group because
# there are no requests being executed.
async def get_cpu_metrics():
ms_streaming = 0
ms_statement = 0
for ip in ips:
metrics = await manager.metrics.query(ip)
ms_streaming += metrics.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
# in enterprise, default execution is in sl:default, not statement
ms_statement += metrics.get('scylla_scheduler_runtime_ms', {'group': 'sl:default'})
return (ms_streaming, ms_statement)
ms_streaming_before, ms_statement_before = await get_cpu_metrics()
# Wait until all rows expire, and get the CPU metrics again. All items
# were set to expire in 3 seconds, and the expiration thread is set up
# in alternator_config to scan the whole table in 0.5 seconds, and the
# whole table is just 100 rows, so we expect all the data to be gone in
# 4 seconds. Let's wait 5 seconds just in case. Even if not all the data
# will have been deleted by then, we do expect some deletions to have
# happened, and certainly several scans, all taking CPU which we expect
# to be in the right scheduling group.
await asyncio.sleep(5)
ms_streaming_after, ms_statement_after = await get_cpu_metrics()
# As a sanity check, verify some of the data really expired, so there
# was some TTL work actually done. We actually expect all of the data
# to have been expired by now, but in some extremely slow builds and
# test machines, this may not be the case.
assert N > table.scan(ConsistentRead=True, Select='COUNT')['Count']
# Between the calls to get_cpu_metrics() above, several expiration scans
# took place (we configured scans to happen every 0.5 seconds), and also
# a lot of deletes when the expiration time was reached. We expect all
# that work to have happened in the streaming group, not statement group,
# so "ratio" calculate below should be tiny, even exactly zero. Before
# issue #18719 was fixed, it was not tiny at all - 0.58.
# Just in case there are other unknown things happening, let's assert it
# is <0.1 instead of zero.
ms_streaming = ms_streaming_after - ms_streaming_before
ms_statement = ms_statement_after - ms_statement_before
ratio = ms_statement / ms_streaming
assert ratio < 0.1
table.delete()
@pytest.mark.asyncio
async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
"""Test that if the "broadcast_rpc_address" of a node is set, the
"/localnodes" request returns not the node's internal IP address,
but rather the one set in broadcast_rpc_address as passed between
nodes via gossip. The case where this parameter is not configured is
tested separately, in test/alternator/test_scylla.py.
Reproduces issue #18711.
"""
# Run two Scylla nodes telling both their broadcast_rpc_address is 127.0.0.0
# (this is silly, but servers_add() doesn't let us use a different config
# per server). We need to run two nodes to check that the node to which
# we send the /localnodes request knows not only its own modified
# address, but also the other node's (which it learnt by gossip).
# This address isn't used for any communication, but it will be
# produced by "/localnodes" and this is what we want to check
# The address "127.0.0.0" is a silly non-existing address which connecting
# to fails immediately (this is useful in the test shutdown - we don't want
# it to hang trying to reach this node, as happened in issue #22744).
config = alternator_config | {
'broadcast_rpc_address': '127.0.0.0'
}
servers = await manager.servers_add(2, config=config)
for server in servers:
# We expect /localnodes to return ["127.0.0.0", "127.0.0.0"]
# (since we configured both nodes with the same broadcast_rpc_address).
# We need the retry loop below because the second node might take a
# bit of time to bootstrap after coming up, and only then will it
# appear on /localnodes (see #19694).
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
timeout = time.time() + 60
while True:
assert time.time() < timeout
response = requests.get(url, verify=False)
j = json.loads(response.content.decode('utf-8'))
if j == ['127.0.0.0', '127.0.0.0']:
break # done
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_localnodes_drained_node(manager: ManagerClient):
"""Test that if in a cluster one node is brought down with "nodetool drain"
a "/localnodes" request should NOT return that node. This test does
NOT reproduce issue #19694 - a DRAINED node is not considered is_alive()
and even before the fix of that issue, "/localnodes" didn't return it.
"""
# Start a cluster with two nodes and verify that at this point,
# "/localnodes" on the first node returns both nodes.
# We the retry loop below because the second node might take a
# bit of time to bootstrap after coming up, and only then will it
# appear on /localnodes (see #19694).
servers = await manager.servers_add(2, config=alternator_config)
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
async def check_localnodes_two():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return True
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
return None # try again
else:
return False
assert await wait_for(check_localnodes_two, time.time() + 60)
# Now "nodetool" drain on the second node, leaving the second node
# in DRAINED state.
await manager.api.client.post("/storage_service/drain", host=servers[1].ip_addr)
# After that, "/localnodes" should no longer return the second node.
# It might take a short while until the first node learns what happened
# to node 1, so we may need to retry for a while
async def check_localnodes_one():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return None # try again
elif set(j) == {servers[0].ip_addr}:
return True
else:
return False
assert await wait_for(check_localnodes_one, time.time() + 60)
@pytest.mark.asyncio
async def test_localnodes_down_normal_node(manager: ManagerClient):
"""Test that if in a cluster one node reaches "normal" state and then
brought down (so is now in "DN" state), a "/localnodes" request
should NOT return that node. Reproduces issue #21538.
"""
# Start a cluster with two nodes and verify that at this point,
# "/localnodes" on the first node returns both nodes.
# We the retry loop below because the second node might take a
# bit of time to bootstrap after coming up, and only then will it
# appear on /localnodes (see #19694).
servers = await manager.servers_add(2, config=alternator_config)
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
async def check_localnodes_two():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return True
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
return None # try again
else:
return False
assert await wait_for(check_localnodes_two, time.time() + 60)
# Now stop the second node abruptly with server_stop(). The server will
# be down, the gossiper on the first node will soon realize it is down,
# but still consider it in a "normal" state - "DN" (down and normal).
# We then want to check that "/localnodes" handles this state correctly.
await manager.server_stop(servers[1].server_id)
# After that, "/localnodes" should no longer return the second node.
# It might take a short while until the first node learns what happened
# to the second, so we may need to retry for a while.
async def check_localnodes_one():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return None # try again
elif set(j) == {servers[0].ip_addr}:
return True
else:
return False
assert await wait_for(check_localnodes_one, time.time() + 60)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_localnodes_joining_nodes(manager: ManagerClient):
"""Test that if a cluster is being enlarged and a node is coming up but
not yet responsive, a "/localnodes" request should NOT return that node.
Reproduces issue #19694.
"""
# Start a cluster with one node, and then bring up a second node,
# pausing its bootstrap (with an injection) in JOINING state.
# We need to start the second node in the background, because server_add()
# will wait for the bootstrap to complete - which we don't want to do.
server = await manager.server_add(config=alternator_config)
task = asyncio.create_task(manager.server_add(config=alternator_config | {'error_injections_at_startup': ['delay_bootstrap_120s']}))
# Sleep until the first node knows of the second one as a "live node"
# (we check this with the REST API's /gossiper/endpoint/live.
async def check_two_live_nodes():
j = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
if len(j) == 1:
return None # try again
elif len(j) == 2:
return True
else:
return False
assert await wait_for(check_two_live_nodes, time.time() + 60)
# At this point the second node is live, but hasn't finished bootstrapping
# (we delayed that with the injection). So the "/localnodes" should still
# return just one node - not both. Reproduces #19694 (two nodes used to
# be returned)
localnodes_request = f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
assert len(j) == 1
# We don't want to wait for the second server to finish its long
# injection-caused bootstrap delay, so we won't check here that when the
# second server finally comes up, both nodes will finally be visible in
# /localnodes. This case is checked in other tests, where bootstrap
# finishes normally, so we don't need to check this case again here.
# But we can't just finish here with "task" unwaited or we'll get a
# warning about an unwaited coroutine, and the ScyllaClusterManager's
# tasks_history will wait for it anyway. For the same reason we can't
# task.cancel() (this will cause ScyllaClusterManager's tasks_history
# to report the ScyllaClusterManager got BROKEN and fail the next test).
# Sadly even abruptly killing the servers (with manager.server_stop())
# (with the intention to then "await task" quickly) doesn't work,
# probably because of a bug in the library. So we "await task"
# anyway, and this test takes 2 minutes :-(
#for server in await manager.all_servers():
# await manager.server_stop(server.server_id)
await task
@pytest.mark.asyncio
async def test_localnodes_multi_dc_multi_rack(manager: ManagerClient):
"""A test for /localnodes on a more general setup, with multiple DCs and
multiple racks - an 8-node setup with two DCs, two racks in each, and
two nodes in each rack.
Test both the default of returning the nodes on DC of the server being
connected - and the "dc" and "rack" options for explicitly choosing a
specific dc and/or rack.
"""
# Start 8 nodes on two different dcs (called "dc1" and "dc2") and two
# different racks ("rack1" and "rack2"), two nodes in each rack.
config = alternator_config | {
'endpoint_snitch': 'GossipingPropertyFileSnitch'
}
servers = {}
for dc in ['dc1', 'dc2']:
for rack in ['rack1', 'rack2']:
servers[dc,rack] = await manager.servers_add(2, config=config, property_file={
'dc': dc, 'rack': rack})
def localnodes_request(server):
return f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
# Before we test various variations of the /localnodes request, let's wait
# until all nodes are visible to each other in /localnodes requests. This
# can take time, while nodes finish bootstrapping and gossip to each other
# (see #19694). After this one-time wait_for, the following checks will be
# able to check things immediately - without retries.
for dc in ['dc1', 'dc2']:
for rack in ['rack1', 'rack2']:
for server in servers[dc, rack]:
async def check_localnodes_eight():
for option_dc in ['dc1', 'dc2']:
response = requests.get(localnodes_request(server), {'dc': option_dc})
if len(json.loads(response.content.decode('utf-8'))) < 4:
return None # try again
return True
assert await wait_for(check_localnodes_eight, time.time() + 60)
# Check that the option-less "/localnodes" returns for each of dc1's nodes
# the four dc1 servers, and for each of dc2's nodes, the four dc2 servers:
for dc in ['dc1', 'dc2']:
dc_servers = servers[dc, 'rack1'] + servers[dc, 'rack2']
expected_ips = [server.ip_addr for server in dc_servers]
for server in dc_servers:
response = requests.get(localnodes_request(server))
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
# Check that the "dc" option works - it should return the nodes for the
# specified DC, regardless of which node on which DC the request is sent
# to (we test all combinations of one of 8 target nodes and 2 option dcs).
all_servers = sum(servers.values(), [])
for option_dc in ['dc1', 'dc2']:
expected_servers = servers[option_dc, 'rack1'] + servers[option_dc, 'rack2']
expected_ips = [server.ip_addr for server in expected_servers]
for server in all_servers:
response = requests.get(localnodes_request(server), {'dc': option_dc})
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
# Check that the "rack" option works (without "dc") - it returns for each of dc1's
# nodes the same two servers from the specified rack in dc1, and for each of dc2's
# nodes, the same two dc2 servers in the specified rack:
for dc in ['dc1', 'dc2']:
dc_servers = servers[dc, 'rack1'] + servers[dc, 'rack2']
for option_rack in ['rack1', 'rack2']:
expected_ips = [server.ip_addr for server in servers[dc, option_rack]]
for server in dc_servers:
response = requests.get(localnodes_request(server), {'rack': option_rack})
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
# Check that a combination of the "rack" and "dc" option works - it always returns
# the same two nodes belonging to the given rack and dc, no matter which of the 8
# servers the request is sent to.
for option_dc in ['dc1', 'dc2']:
for option_rack in ['rack1', 'rack2']:
expected_ips = [server.ip_addr for server in servers[option_dc, option_rack]]
for server in all_servers:
response = requests.get(localnodes_request(server), {'dc': option_dc, 'rack': option_rack})
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
# We have in test/alternator/test_cql_rbac.py many functional tests for
# CQL-based Role Based Access Control (RBAC) and all those tests use the
# same one-node cluster with authentication and authorization enabled.
# Here in this file we have the opportunity to create clusters with different
# configurations, so we can check how these configuration settings affect RBAC.
@pytest.mark.asyncio
async def test_alternator_enforce_authorization_false(manager: ManagerClient):
"""A basic test for how Alternator authentication and authorization
work when alternator_enfore_authorization is *false* (and CQL's
authenticator/authorizer options are also unset):
1. Username and signature is not checked - a request with a bad
username is accepted.
2. Any user (or even non-existent user) has permissions to do any
operation.
"""
servers = await manager.servers_add(1, config=alternator_config)
# Requests from a non-existent user with garbage password work,
# and can perform privildged operations like CreateTable, etc.
alternator = get_alternator(servers[0].ip_addr, 'nonexistent_user', 'garbage')
table = alternator.create_table(TableName=unique_table_name(),
BillingMode='PAY_PER_REQUEST',
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
table.put_item(Item={'p': 42})
table.get_item(Key={'p': 42})
table.delete()
async def test_alternator_enforce_authorization_false2(manager: ManagerClient):
"""A variant of the above test for alternator_enforce_authorization=false
Here we check what happens when CQL's authenticator/authorizer are
enabled (in the previous test they were disabled).
This combination of configuration options isn't very useful (setting
authenticator/authorizer is only needed for RBAC, so why set it when
RBAC is not supposed to be enabled?), but it also shouldn't break
alternator_enfore_authorization=false - requests should be allowed
regardless of how they are signed.
Reproduces issue #20619.
"""
config = alternator_config | {
'alternator_enforce_authorization': False,
'authenticator': 'PasswordAuthenticator',
'authorizer': 'CassandraAuthorizer'
}
servers = await manager.servers_add(1, config=config,
driver_connect_opts={'auth_provider': PlainTextAuthProvider(username='cassandra', password='cassandra')})
# Requests from a non-existent user with garbage password work,
# and can perform privildged operations like CreateTable, etc.
# It is important to exercise CreateTable as well, because it has
# special auto-grant code that we want to check as well.
alternator = get_alternator(servers[0].ip_addr, 'nonexistent_user', 'garbage')
table = alternator.create_table(TableName=unique_table_name(),
BillingMode='PAY_PER_REQUEST',
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
table.put_item(Item={'p': 42})
table.get_item(Key={'p': 42})
table.delete()
def get_secret_key(cql, user):
"""The secret key used for a user in Alternator is its role's salted_hash.
This function retrieves it from the system table.
"""
# Newer Scylla places the "roles" table in the "system" keyspace, but
# older versions used "system_auth_v2" or "system_auth"
for ks in ['system', 'system_auth_v2', 'system_auth']:
try:
e = list(cql.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{user}'"))
if e != [] and e[0].salted_hash is not None:
return e[0].salted_hash
except:
pass
pytest.fail(f"Couldn't get secret key for user {user}")
@pytest.mark.skip("flaky, needs to be fixed, see https://github.com/scylladb/scylladb/pull/20135")
@pytest.mark.asyncio
async def test_alternator_enforce_authorization_true(manager: ManagerClient):
"""A basic test for how Alternator authentication and authorization
work when authentication and authorization is enabled in CQL, and
additionally alternator_enfore_authorization is *true*:
1. The username and signature is verified (a request with a bad
username or password is rejected)
2. A new user works, and can do things that don't need permissions
(such as ListTables) but can't perform operations that do need
permissions (e.g., CreateTable).
"""
config = alternator_config | {
'alternator_enforce_authorization': True,
'authenticator': 'PasswordAuthenticator',
'authorizer': 'CassandraAuthorizer'
}
servers = await manager.servers_add(1, config=config,
driver_connect_opts={'auth_provider': PlainTextAuthProvider(username='cassandra', password='cassandra')})
cql = manager.get_cql()
# Any requests from a non-existent user with garbage password is
# rejected - even requests that don't need special permissions
alternator = get_alternator(servers[0].ip_addr, 'nonexistent_user', 'garbage')
with pytest.raises(ClientError, match='UnrecognizedClientException'):
alternator.meta.client.list_tables()
# We know that Scylla is set up with a "cassandra" user. If we retrieve
# its correct secret key, the ListTables will work.
alternator = get_alternator(servers[0].ip_addr, 'cassandra', get_secret_key(cql, 'cassandra'))
alternator.meta.client.list_tables()
# Privileged operations also work for the superuser account "cassandra":
table = alternator.create_table(TableName=unique_table_name(),
BillingMode='PAY_PER_REQUEST',
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
table.put_item(Item={'p': 42})
table.get_item(Key={'p': 42})
table.delete()
# Create a new role "user2" and make a new connection "alternator2" with it:
cql.execute("CREATE ROLE user2 WITH PASSWORD = 'user2' AND LOGIN=TRUE")
alternator2 = get_alternator(servers[0].ip_addr, 'user2', get_secret_key(cql, 'user2'))
# In the new role, ListTables works, but other privileged operations
# don't.
alternator2.meta.client.list_tables()
with pytest.raises(ClientError, match='AccessDeniedException'):
alternator2.create_table(TableName=unique_table_name(),
BillingMode='PAY_PER_REQUEST',
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
# We could further test how GRANT works, but this would be unnecessary
# repeating of the tests in test/alternator/test_cql_rbac.py.