In test_index_requires_rf_rack_valid_keyspace, the create_table call for a plain tablet-based table can fail with 'Unable to reach schema agreement' after the server's 10s timeout is exceeded. This happens when schema gossip propagation across the 4-node cluster takes longer than expected after a sequence of rapid schema changes earlier in the test. Add a retry (up to 2 attempts) on schema agreement errors for this specific create_table call rather than increasing the server-side timeout. Fixes: SCYLLADB-1135 Closes scylladb/scylladb#29132
1396 lines
67 KiB
Python
1396 lines
67 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
# Multi-node tests for Alternator.
|
|
#
|
|
# Please note that most tests for Alternator are single-node tests and can
|
|
# be found in the test/alternator directory. Most functional testing of the
|
|
# many different syntax features that Alternator provides don't need more
|
|
# than a single node to be tested, and should be able to run also on DynamoDB
|
|
# - not just on Alternator, which the test/alternator framework allows to do.
|
|
# So only the minority of tests that do need a bigger cluster should be here.
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import boto3
|
|
import botocore
|
|
from botocore.exceptions import ClientError
|
|
import requests
|
|
import json
|
|
from cassandra.auth import PlainTextAuthProvider
|
|
import threading
|
|
import random
|
|
import re
|
|
|
|
from test.cluster.util import get_replication
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.util import wait_for
|
|
from test.pylib.tablets import get_all_tablet_replicas
|
|
from test.pylib.tablets import get_tablet_replica
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Convenience function to open a connection to Alternator usable by the
|
|
# AWS SDK.
|
|
alternator_config = {
|
|
'alternator_port': 8000,
|
|
'alternator_write_isolation': 'only_rmw_uses_lwt',
|
|
'alternator_ttl_period_in_seconds': '0.5',
|
|
}
|
|
def get_alternator(ip, user='alternator', passwd='secret_pass'):
|
|
url = f"http://{ip}:{alternator_config['alternator_port']}"
|
|
return boto3.resource('dynamodb', endpoint_url=url,
|
|
region_name='us-east-1',
|
|
aws_access_key_id=user,
|
|
aws_secret_access_key=passwd,
|
|
config=botocore.client.Config(
|
|
retries={"max_attempts": 0},
|
|
read_timeout=300)
|
|
)
|
|
|
|
# Alternator convenience function for fetching the entire result set of a
|
|
# query into an array of items.
|
|
def full_query(table, ConsistentRead=True, **kwargs):
|
|
response = table.query(ConsistentRead=ConsistentRead, **kwargs)
|
|
items = response['Items']
|
|
while 'LastEvaluatedKey' in response:
|
|
response = table.query(ExclusiveStartKey=response['LastEvaluatedKey'],
|
|
ConsistentRead=ConsistentRead, **kwargs)
|
|
items.extend(response['Items'])
|
|
return items
|
|
|
|
# FIXME: boto3 is NOT async. So all tests that use it are not really async.
|
|
# We could use the aioboto3 library to write a really asynchronous test, or
|
|
# implement an async wrapper to the boto3 functions ourselves (e.g., run them
|
|
# in a separate thread) ourselves.
|
|
|
|
|
|
test_table_prefix = 'alternator_Test_'
|
|
def unique_table_name():
|
|
current_ms = int(round(time.time() * 1000))
|
|
# If unique_table_name() is called twice in the same millisecond...
|
|
if unique_table_name.last_ms >= current_ms:
|
|
current_ms = unique_table_name.last_ms + 1
|
|
unique_table_name.last_ms = current_ms
|
|
return test_table_prefix + str(current_ms)
|
|
unique_table_name.last_ms = 0
|
|
|
|
|
|
async def test_alternator_ttl_scheduling_group(manager: ManagerClient):
|
|
"""A reproducer for issue #18719: The expiration scans and deletions
|
|
initiated by the Alternator TTL feature are supposed to run entirely in
|
|
the "streaming" scheduling group. But because of a bug in inheritance
|
|
of scheduling groups through RPC, some of the work ended up being done
|
|
on the "statement" scheduling group.
|
|
This test verifies that Alternator TTL work is done on the right
|
|
scheduling group.
|
|
This test assumes that the cluster is not concurrently busy with
|
|
running any other workload - so we won't see any work appearing
|
|
in the wrong scheduling group. We can assume this because we don't
|
|
run multiple tests in parallel on the same cluster.
|
|
"""
|
|
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
|
|
alternator = get_alternator(servers[0].ip_addr)
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'N' },
|
|
])
|
|
# Enable expiration (TTL) on attribute "expiration"
|
|
table.meta.client.update_time_to_live(TableName=table.name, TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
|
|
|
|
# Insert N rows, setting them all to expire 3 seconds from now.
|
|
N = 100
|
|
expiration = int(time.time())+3
|
|
with table.batch_writer() as batch:
|
|
for p in range(N):
|
|
batch.put_item(Item={'p': p, 'expiration': expiration})
|
|
|
|
|
|
# Unfortunately, Alternator has no way of doing the writes above with
|
|
# CL=ALL, only CL=QUORUM. So at this point we're not sure all the writes
|
|
# above have completed. We want to wait until they are over, so that we
|
|
# won't measure any of those writes in the statement scheduling group.
|
|
# Let's do it by checking the metrics of background writes and wait for
|
|
# them to drop to zero.
|
|
ips = [server.ip_addr for server in await manager.running_servers()]
|
|
timeout = time.time() + 60
|
|
while True:
|
|
if time.time() > timeout:
|
|
pytest.fail("timed out waiting for background writes to complete")
|
|
bg_writes = 0
|
|
for ip in ips:
|
|
metrics = await manager.metrics.query(ip)
|
|
bg_writes += metrics.get('scylla_storage_proxy_coordinator_background_writes')
|
|
if bg_writes == 0:
|
|
break # done waiting for the background writes to finish
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Get the current amount of work (in CPU ms) done across all nodes and
|
|
# shards in different scheduling groups. We expect this to increase
|
|
# considerably for the streaming group while expiration scanning is
|
|
# proceeding, but not increase at all for the statement group because
|
|
# there are no requests being executed.
|
|
async def get_cpu_metrics():
|
|
ms_streaming = 0
|
|
ms_statement = 0
|
|
for ip in ips:
|
|
metrics = await manager.metrics.query(ip)
|
|
ms_streaming += metrics.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
|
|
# in enterprise, default execution is in sl:default, not statement
|
|
ms_statement += metrics.get('scylla_scheduler_runtime_ms', {'group': 'sl:default'})
|
|
return (ms_streaming, ms_statement)
|
|
|
|
ms_streaming_before, ms_statement_before = await get_cpu_metrics()
|
|
|
|
# Wait until all rows expire, and get the CPU metrics again. All items
|
|
# were set to expire in 3 seconds, and the expiration thread is set up
|
|
# in alternator_config to scan the whole table in 0.5 seconds, and the
|
|
# whole table is just 100 rows, so we expect all the data to be gone in
|
|
# 4 seconds. Let's wait 5 seconds just in case. Even if not all the data
|
|
# will have been deleted by then, we do expect some deletions to have
|
|
# happened, and certainly several scans, all taking CPU which we expect
|
|
# to be in the right scheduling group.
|
|
await asyncio.sleep(5)
|
|
ms_streaming_after, ms_statement_after = await get_cpu_metrics()
|
|
|
|
# As a sanity check, verify some of the data really expired, so there
|
|
# was some TTL work actually done. We actually expect all of the data
|
|
# to have been expired by now, but in some extremely slow builds and
|
|
# test machines, this may not be the case.
|
|
assert N > table.scan(ConsistentRead=True, Select='COUNT')['Count']
|
|
|
|
# Between the calls to get_cpu_metrics() above, several expiration scans
|
|
# took place (we configured scans to happen every 0.5 seconds), and also
|
|
# a lot of deletes when the expiration time was reached. We expect all
|
|
# that work to have happened in the streaming group, not statement group,
|
|
# so "ratio" calculate below should be tiny, even exactly zero. Before
|
|
# issue #18719 was fixed, it was not tiny at all - 0.58.
|
|
# Just in case there are other unknown things happening, let's assert it
|
|
# is <0.1 instead of zero.
|
|
ms_streaming = ms_streaming_after - ms_streaming_before
|
|
ms_statement = ms_statement_after - ms_statement_before
|
|
ratio = ms_statement / ms_streaming
|
|
assert ratio < 0.1
|
|
|
|
table.delete()
|
|
|
|
@pytest.mark.parametrize("with_down_node", [False, True], ids=["all_nodes_up", "one_node_down"])
|
|
async def test_alternator_ttl_multinode_expiration(manager: ManagerClient, with_down_node):
|
|
"""When the cluster has multiple nodes, different nodes are responsible
|
|
for checking expiration in different token ranges - each node is
|
|
responsible for its "primary ranges". Let's check that this expiration
|
|
really does happen - for the entire token range - by writing many
|
|
partitions that will span the entire token range, and seeing that they
|
|
all expire. We don't check that nodes don't do more work than they
|
|
should - an inefficient implementation where every node scans the
|
|
entire data set will also pass this test.
|
|
When the test is run a second time with with_down_node=True, we verify
|
|
that TTL expiration works correctly even when one of the nodes is
|
|
brought down. This node's TTL scanner is responsible for scanning part
|
|
of the token range, so when this node is down, part of the data might
|
|
not get expired. At that point - other node(s) should take over
|
|
expiring data in that range - and this test verifies that this indeed
|
|
happens. Reproduces issue #9787 and SCYLLADB-777.
|
|
"""
|
|
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
|
|
alternator = get_alternator(servers[0].ip_addr)
|
|
|
|
if with_down_node:
|
|
# Bring down one of nodes. Everything we do below, like creating a
|
|
# table, reading and writing, should continue to work with one node
|
|
# down.
|
|
await manager.server_stop_gracefully(servers[2].server_id)
|
|
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'N' },
|
|
])
|
|
# Set the "expiration" column to mark item's expiration time
|
|
table.meta.client.update_time_to_live(TableName=table.name, TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
|
|
|
|
# Insert 50 rows, in different partitions, so the murmur3 hash maps them
|
|
# all over the token space so different nodes would be responsible for
|
|
# expiring them. All items are marked to expire 10 seconds in the past,
|
|
# so should all expire as soon as possible, during this test.
|
|
expiration = int(time.time()) - 10
|
|
with table.batch_writer() as batch:
|
|
for p in range(50):
|
|
batch.put_item({'p': p, 'expiration': expiration})
|
|
# Expect that after a short delay, all items in the table will have
|
|
# expired - so a scan should return no responses. This should happen
|
|
# even though one of the nodes is down and not doing its usual
|
|
# expiration-scanning work.
|
|
timeout = time.time() + 60
|
|
items = -1
|
|
while items != 0 and time.time() < timeout:
|
|
response = table.scan(ConsistentRead=True)
|
|
items = len(response['Items'])
|
|
# In theory (though probably not in practice in this test), a scan()
|
|
# can return zero items but have more pages - so we need to be more
|
|
# diligent and scan all pages to check it's completely empty.
|
|
while items == 0 and 'LastEvaluatedKey' in response:
|
|
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], ConsistentRead=True)
|
|
items += len(response['Items'])
|
|
if items == 0:
|
|
break
|
|
time.sleep(0.1)
|
|
assert items == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
|
|
"""Test that if the "broadcast_rpc_address" of a node is set, the
|
|
"/localnodes" request returns not the node's internal IP address,
|
|
but rather the one set in broadcast_rpc_address as passed between
|
|
nodes via gossip. The case where this parameter is not configured is
|
|
tested separately, in test/alternator/test_scylla.py.
|
|
Reproduces issue #18711.
|
|
"""
|
|
# Run two Scylla nodes telling both their broadcast_rpc_address is 127.0.0.0
|
|
# (this is silly, but servers_add() doesn't let us use a different config
|
|
# per server). We need to run two nodes to check that the node to which
|
|
# we send the /localnodes request knows not only its own modified
|
|
# address, but also the other node's (which it learnt by gossip).
|
|
# This address isn't used for any communication, but it will be
|
|
# produced by "/localnodes" and this is what we want to check
|
|
# The address "127.0.0.0" is a silly non-existing address which connecting
|
|
# to fails immediately (this is useful in the test shutdown - we don't want
|
|
# it to hang trying to reach this node, as happened in issue #22744).
|
|
config = alternator_config | {
|
|
'broadcast_rpc_address': '127.0.0.0'
|
|
}
|
|
servers = await manager.servers_add(2, config=config)
|
|
for server in servers:
|
|
# We expect /localnodes to return ["127.0.0.0", "127.0.0.0"]
|
|
# (since we configured both nodes with the same broadcast_rpc_address).
|
|
# We need the retry loop below because the second node might take a
|
|
# bit of time to bootstrap after coming up, and only then will it
|
|
# appear on /localnodes (see #19694).
|
|
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
|
|
timeout = time.time() + 60
|
|
while True:
|
|
assert time.time() < timeout
|
|
response = requests.get(url, verify=False)
|
|
j = json.loads(response.content.decode('utf-8'))
|
|
if j == ['127.0.0.0', '127.0.0.0']:
|
|
break # done
|
|
await asyncio.sleep(0.1)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_localnodes_drained_node(manager: ManagerClient):
|
|
"""Test that if in a cluster one node is brought down with "nodetool drain"
|
|
a "/localnodes" request should NOT return that node. This test does
|
|
NOT reproduce issue #19694 - a DRAINED node is not considered is_alive()
|
|
and even before the fix of that issue, "/localnodes" didn't return it.
|
|
"""
|
|
# Start a cluster with two nodes and verify that at this point,
|
|
# "/localnodes" on the first node returns both nodes.
|
|
# We the retry loop below because the second node might take a
|
|
# bit of time to bootstrap after coming up, and only then will it
|
|
# appear on /localnodes (see #19694).
|
|
servers = await manager.servers_add(2, config=alternator_config)
|
|
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
|
async def check_localnodes_two():
|
|
response = requests.get(localnodes_request)
|
|
j = json.loads(response.content.decode('utf-8'))
|
|
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
|
return True
|
|
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
|
|
return None # try again
|
|
else:
|
|
return False
|
|
assert await wait_for(check_localnodes_two, time.time() + 60)
|
|
# Now "nodetool" drain on the second node, leaving the second node
|
|
# in DRAINED state.
|
|
await manager.api.client.post("/storage_service/drain", host=servers[1].ip_addr)
|
|
# After that, "/localnodes" should no longer return the second node.
|
|
# It might take a short while until the first node learns what happened
|
|
# to node 1, so we may need to retry for a while
|
|
async def check_localnodes_one():
|
|
response = requests.get(localnodes_request)
|
|
j = json.loads(response.content.decode('utf-8'))
|
|
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
|
return None # try again
|
|
elif set(j) == {servers[0].ip_addr}:
|
|
return True
|
|
else:
|
|
return False
|
|
assert await wait_for(check_localnodes_one, time.time() + 60)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_localnodes_down_normal_node(manager: ManagerClient):
|
|
"""Test that if in a cluster one node reaches "normal" state and then
|
|
brought down (so is now in "DN" state), a "/localnodes" request
|
|
should NOT return that node. Reproduces issue #21538.
|
|
"""
|
|
# Start a cluster with two nodes and verify that at this point,
|
|
# "/localnodes" on the first node returns both nodes.
|
|
# We the retry loop below because the second node might take a
|
|
# bit of time to bootstrap after coming up, and only then will it
|
|
# appear on /localnodes (see #19694).
|
|
servers = await manager.servers_add(2, config=alternator_config)
|
|
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
|
async def check_localnodes_two():
|
|
response = requests.get(localnodes_request)
|
|
j = json.loads(response.content.decode('utf-8'))
|
|
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
|
return True
|
|
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
|
|
return None # try again
|
|
else:
|
|
return False
|
|
assert await wait_for(check_localnodes_two, time.time() + 60)
|
|
# Now stop the second node abruptly with server_stop(). The server will
|
|
# be down, the gossiper on the first node will soon realize it is down,
|
|
# but still consider it in a "normal" state - "DN" (down and normal).
|
|
# We then want to check that "/localnodes" handles this state correctly.
|
|
await manager.server_stop(servers[1].server_id)
|
|
# After that, "/localnodes" should no longer return the second node.
|
|
# It might take a short while until the first node learns what happened
|
|
# to the second, so we may need to retry for a while.
|
|
async def check_localnodes_one():
|
|
response = requests.get(localnodes_request)
|
|
j = json.loads(response.content.decode('utf-8'))
|
|
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
|
return None # try again
|
|
elif set(j) == {servers[0].ip_addr}:
|
|
return True
|
|
else:
|
|
return False
|
|
assert await wait_for(check_localnodes_one, time.time() + 60)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.nightly
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_localnodes_joining_nodes(manager: ManagerClient):
|
|
"""Test that if a cluster is being enlarged and a node is coming up but
|
|
not yet responsive, a "/localnodes" request should NOT return that node.
|
|
Reproduces issue #19694.
|
|
"""
|
|
# Start a cluster with one node, and then bring up a second node,
|
|
# pausing its bootstrap (with an injection) in JOINING state.
|
|
# We need to start the second node in the background, because server_add()
|
|
# will wait for the bootstrap to complete - which we don't want to do.
|
|
server = await manager.server_add(config=alternator_config)
|
|
task = asyncio.create_task(manager.server_add(config=alternator_config | {'error_injections_at_startup': ['delay_bootstrap_120s']}))
|
|
# Sleep until the first node knows of the second one as a "live node"
|
|
# (we check this with the REST API's /gossiper/endpoint/live.
|
|
async def check_two_live_nodes():
|
|
j = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
|
|
if len(j) == 1:
|
|
return None # try again
|
|
elif len(j) == 2:
|
|
return True
|
|
else:
|
|
return False
|
|
assert await wait_for(check_two_live_nodes, time.time() + 60)
|
|
|
|
# At this point the second node is live, but hasn't finished bootstrapping
|
|
# (we delayed that with the injection). So the "/localnodes" should still
|
|
# return just one node - not both. Reproduces #19694 (two nodes used to
|
|
# be returned)
|
|
localnodes_request = f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
|
response = requests.get(localnodes_request)
|
|
j = json.loads(response.content.decode('utf-8'))
|
|
assert len(j) == 1
|
|
|
|
# We don't want to wait for the second server to finish its long
|
|
# injection-caused bootstrap delay, so we won't check here that when the
|
|
# second server finally comes up, both nodes will finally be visible in
|
|
# /localnodes. This case is checked in other tests, where bootstrap
|
|
# finishes normally, so we don't need to check this case again here.
|
|
# But we can't just finish here with "task" unwaited or we'll get a
|
|
# warning about an unwaited coroutine, and the ScyllaClusterManager's
|
|
# tasks_history will wait for it anyway. For the same reason we can't
|
|
# task.cancel() (this will cause ScyllaClusterManager's tasks_history
|
|
# to report the ScyllaClusterManager got BROKEN and fail the next test).
|
|
# The solution is kill (with SIGKILL) the server currently starting and
|
|
# then the "await task" will quickly finish (and report a "Failed to add
|
|
# server" error).
|
|
# Without this trick, this test will take 2 minutes of wait to finish.
|
|
for server in await manager.starting_servers():
|
|
await manager.server_stop(server.server_id)
|
|
try:
|
|
await task
|
|
except Exception as e:
|
|
assert 'Failed to add server' in str(e)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_localnodes_multi_dc_multi_rack(manager: ManagerClient):
|
|
"""A test for /localnodes on a more general setup, with multiple DCs and
|
|
multiple racks - an 8-node setup with two DCs, two racks in each, and
|
|
two nodes in each rack.
|
|
Test both the default of returning the nodes on DC of the server being
|
|
connected - and the "dc" and "rack" options for explicitly choosing a
|
|
specific dc and/or rack.
|
|
"""
|
|
# Start 8 nodes on two different dcs (called "dc1" and "dc2") and two
|
|
# different racks ("rack1" and "rack2"), two nodes in each rack.
|
|
config = alternator_config | {
|
|
'endpoint_snitch': 'GossipingPropertyFileSnitch'
|
|
}
|
|
servers = {}
|
|
for dc in ['dc1', 'dc2']:
|
|
for rack in ['rack1', 'rack2']:
|
|
servers[dc,rack] = await manager.servers_add(2, config=config, property_file={
|
|
'dc': dc, 'rack': rack})
|
|
|
|
def localnodes_request(server):
|
|
return f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
|
|
|
# Before we test various variations of the /localnodes request, let's wait
|
|
# until all nodes are visible to each other in /localnodes requests. This
|
|
# can take time, while nodes finish bootstrapping and gossip to each other
|
|
# (see #19694). After this one-time wait_for, the following checks will be
|
|
# able to check things immediately - without retries.
|
|
for dc in ['dc1', 'dc2']:
|
|
for rack in ['rack1', 'rack2']:
|
|
for server in servers[dc, rack]:
|
|
async def check_localnodes_eight():
|
|
for option_dc in ['dc1', 'dc2']:
|
|
response = requests.get(localnodes_request(server), {'dc': option_dc})
|
|
if len(json.loads(response.content.decode('utf-8'))) < 4:
|
|
return None # try again
|
|
return True
|
|
assert await wait_for(check_localnodes_eight, time.time() + 60)
|
|
|
|
# Check that the option-less "/localnodes" returns for each of dc1's nodes
|
|
# the four dc1 servers, and for each of dc2's nodes, the four dc2 servers:
|
|
for dc in ['dc1', 'dc2']:
|
|
dc_servers = servers[dc, 'rack1'] + servers[dc, 'rack2']
|
|
expected_ips = [server.ip_addr for server in dc_servers]
|
|
for server in dc_servers:
|
|
response = requests.get(localnodes_request(server))
|
|
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
|
|
|
|
# Check that the "dc" option works - it should return the nodes for the
|
|
# specified DC, regardless of which node on which DC the request is sent
|
|
# to (we test all combinations of one of 8 target nodes and 2 option dcs).
|
|
all_servers = sum(servers.values(), [])
|
|
for option_dc in ['dc1', 'dc2']:
|
|
expected_servers = servers[option_dc, 'rack1'] + servers[option_dc, 'rack2']
|
|
expected_ips = [server.ip_addr for server in expected_servers]
|
|
for server in all_servers:
|
|
response = requests.get(localnodes_request(server), {'dc': option_dc})
|
|
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
|
|
|
|
# Check that the "rack" option works (without "dc") - it returns for each of dc1's
|
|
# nodes the same two servers from the specified rack in dc1, and for each of dc2's
|
|
# nodes, the same two dc2 servers in the specified rack:
|
|
for dc in ['dc1', 'dc2']:
|
|
dc_servers = servers[dc, 'rack1'] + servers[dc, 'rack2']
|
|
for option_rack in ['rack1', 'rack2']:
|
|
expected_ips = [server.ip_addr for server in servers[dc, option_rack]]
|
|
for server in dc_servers:
|
|
response = requests.get(localnodes_request(server), {'rack': option_rack})
|
|
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
|
|
|
|
# Check that a combination of the "rack" and "dc" option works - it always returns
|
|
# the same two nodes belonging to the given rack and dc, no matter which of the 8
|
|
# servers the request is sent to.
|
|
for option_dc in ['dc1', 'dc2']:
|
|
for option_rack in ['rack1', 'rack2']:
|
|
expected_ips = [server.ip_addr for server in servers[option_dc, option_rack]]
|
|
for server in all_servers:
|
|
response = requests.get(localnodes_request(server), {'dc': option_dc, 'rack': option_rack})
|
|
assert sorted(json.loads(response.content.decode('utf-8'))) == sorted(expected_ips)
|
|
|
|
|
|
# We have in test/alternator/test_cql_rbac.py many functional tests for
|
|
# CQL-based Role Based Access Control (RBAC) and all those tests use the
|
|
# same one-node cluster with authentication and authorization enabled.
|
|
# Here in this file we have the opportunity to create clusters with different
|
|
# configurations, so we can check how these configuration settings affect RBAC.
|
|
@pytest.mark.asyncio
|
|
async def test_alternator_enforce_authorization_false(manager: ManagerClient):
|
|
"""A basic test for how Alternator authentication and authorization
|
|
work when alternator_enfore_authorization is *false* (and CQL's
|
|
authenticator/authorizer options are also unset):
|
|
1. Username and signature is not checked - a request with a bad
|
|
username is accepted.
|
|
2. Any user (or even non-existent user) has permissions to do any
|
|
operation.
|
|
"""
|
|
servers = await manager.servers_add(1, config=alternator_config)
|
|
# Requests from a non-existent user with garbage password work,
|
|
# and can perform privildged operations like CreateTable, etc.
|
|
alternator = get_alternator(servers[0].ip_addr, 'nonexistent_user', 'garbage')
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
|
|
table.put_item(Item={'p': 42})
|
|
table.get_item(Key={'p': 42})
|
|
table.delete()
|
|
|
|
async def test_alternator_enforce_authorization_false2(manager: ManagerClient):
|
|
"""A variant of the above test for alternator_enforce_authorization=false
|
|
Here we check what happens when CQL's authenticator/authorizer are
|
|
enabled (in the previous test they were disabled).
|
|
This combination of configuration options isn't very useful (setting
|
|
authenticator/authorizer is only needed for RBAC, so why set it when
|
|
RBAC is not supposed to be enabled?), but it also shouldn't break
|
|
alternator_enfore_authorization=false - requests should be allowed
|
|
regardless of how they are signed.
|
|
Reproduces issue #20619.
|
|
"""
|
|
config = alternator_config | {
|
|
'alternator_enforce_authorization': False,
|
|
'authenticator': 'PasswordAuthenticator',
|
|
'authorizer': 'CassandraAuthorizer'
|
|
}
|
|
servers = await manager.servers_add(1, config=config,
|
|
driver_connect_opts={'auth_provider': PlainTextAuthProvider(username='cassandra', password='cassandra')})
|
|
# Requests from a non-existent user with garbage password work,
|
|
# and can perform privildged operations like CreateTable, etc.
|
|
# It is important to exercise CreateTable as well, because it has
|
|
# special auto-grant code that we want to check as well.
|
|
alternator = get_alternator(servers[0].ip_addr, 'nonexistent_user', 'garbage')
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
|
|
table.put_item(Item={'p': 42})
|
|
table.get_item(Key={'p': 42})
|
|
table.delete()
|
|
|
|
def get_secret_key(cql, user):
|
|
"""The secret key used for a user in Alternator is its role's salted_hash.
|
|
This function retrieves it from the system table.
|
|
"""
|
|
# Newer Scylla places the "roles" table in the "system" keyspace, but
|
|
# older versions used "system_auth_v2" or "system_auth"
|
|
for ks in ['system', 'system_auth_v2', 'system_auth']:
|
|
try:
|
|
e = list(cql.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{user}'"))
|
|
if e != [] and e[0].salted_hash is not None:
|
|
return e[0].salted_hash
|
|
except:
|
|
pass
|
|
pytest.fail(f"Couldn't get secret key for user {user}")
|
|
|
|
#flaky, see https://github.com/scylladb/scylladb/issues/20135")
|
|
@pytest.mark.unstable
|
|
@pytest.mark.asyncio
|
|
async def test_alternator_enforce_authorization_true(manager: ManagerClient):
|
|
"""A basic test for how Alternator authentication and authorization
|
|
work when authentication and authorization is enabled in CQL, and
|
|
additionally alternator_enfore_authorization is *true*:
|
|
1. The username and signature is verified (a request with a bad
|
|
username or password is rejected)
|
|
2. A new user works, and can do things that don't need permissions
|
|
(such as ListTables) but can't perform operations that do need
|
|
permissions (e.g., CreateTable).
|
|
"""
|
|
config = alternator_config | {
|
|
'alternator_enforce_authorization': True,
|
|
'authenticator': 'PasswordAuthenticator',
|
|
'authorizer': 'CassandraAuthorizer'
|
|
}
|
|
servers = await manager.servers_add(1, config=config,
|
|
driver_connect_opts={'auth_provider': PlainTextAuthProvider(username='cassandra', password='cassandra')})
|
|
cql = manager.get_cql()
|
|
# Any requests from a non-existent user with garbage password is
|
|
# rejected - even requests that don't need special permissions
|
|
alternator = get_alternator(servers[0].ip_addr, 'nonexistent_user', 'garbage')
|
|
with pytest.raises(ClientError, match='UnrecognizedClientException'):
|
|
alternator.meta.client.list_tables()
|
|
# We know that Scylla is set up with a "cassandra" user. If we retrieve
|
|
# its correct secret key, the ListTables will work.
|
|
alternator = get_alternator(servers[0].ip_addr, 'cassandra', get_secret_key(cql, 'cassandra'))
|
|
alternator.meta.client.list_tables()
|
|
# Privileged operations also work for the superuser account "cassandra":
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
|
|
table.put_item(Item={'p': 42})
|
|
table.get_item(Key={'p': 42})
|
|
table.delete()
|
|
# Create a new role "user2" and make a new connection "alternator2" with it:
|
|
cql.execute("CREATE ROLE user2 WITH PASSWORD = 'user2' AND LOGIN=TRUE")
|
|
alternator2 = get_alternator(servers[0].ip_addr, 'user2', get_secret_key(cql, 'user2'))
|
|
# In the new role, ListTables works, but other privileged operations
|
|
# don't.
|
|
alternator2.meta.client.list_tables()
|
|
with pytest.raises(ClientError, match='AccessDeniedException'):
|
|
alternator2.create_table(TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[ {'AttributeName': 'p', 'KeyType': 'HASH' } ],
|
|
AttributeDefinitions=[ {'AttributeName': 'p', 'AttributeType': 'N' } ])
|
|
# We could further test how GRANT works, but this would be unnecessary
|
|
# repeating of the tests in test/alternator/test_cql_rbac.py.
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_index_in_rf_rack_valid_keyspace_does_not_require_rf_rack_flag(manager: ManagerClient):
|
|
"""
|
|
Verify that creating a table with GSI or LSI and adding GSI to an existing table works
|
|
when the keyspace is RF-rack-valid, even if rf_rack_valid_keyspaces=False.
|
|
"""
|
|
config = alternator_config | {"rf_rack_valid_keyspaces": False}
|
|
# Single node in single rack, RF=1, therefore RF-rack-valid
|
|
servers = await manager.servers_add(1, config=config, property_file={'dc': 'dc1', 'rack': 'rack1'})
|
|
alternator = get_alternator(servers[0].ip_addr)
|
|
|
|
table_name_gsi = unique_table_name()
|
|
table_name_lsi = unique_table_name()
|
|
table_name_add_gsi = unique_table_name()
|
|
|
|
# Create table with GSI
|
|
alternator.create_table(
|
|
TableName=table_name_gsi,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE'}
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'S'},
|
|
{'AttributeName': 'c', 'AttributeType': 'S'}
|
|
],
|
|
GlobalSecondaryIndexes=[
|
|
{
|
|
'IndexName': 'gsi1',
|
|
'KeySchema': [
|
|
{'AttributeName': 'c', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'p', 'KeyType': 'RANGE'},
|
|
],
|
|
'Projection': {'ProjectionType': 'ALL'}
|
|
}
|
|
]
|
|
)
|
|
|
|
# Create table with LSI
|
|
alternator.create_table(
|
|
TableName=table_name_lsi,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE'}
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'S'},
|
|
{'AttributeName': 'c', 'AttributeType': 'S'}
|
|
],
|
|
LocalSecondaryIndexes=[
|
|
{
|
|
'IndexName': 'lsi1',
|
|
'KeySchema': [
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE'},
|
|
],
|
|
'Projection': {'ProjectionType': 'ALL'}
|
|
}
|
|
]
|
|
)
|
|
|
|
# Create table without indexes, then add GSI
|
|
alternator.create_table(
|
|
TableName=table_name_add_gsi,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE'}
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'S'},
|
|
{'AttributeName': 'c', 'AttributeType': 'S'}
|
|
]
|
|
)
|
|
alternator.meta.client.update_table(
|
|
TableName=table_name_add_gsi,
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'c', 'AttributeType': 'S'},
|
|
{'AttributeName': 'p', 'AttributeType': 'S'},
|
|
],
|
|
GlobalSecondaryIndexUpdates=[
|
|
{
|
|
'Create': {
|
|
'IndexName': 'gsi1',
|
|
'KeySchema': [
|
|
{'AttributeName': 'c', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'p', 'KeyType': 'RANGE'},
|
|
],
|
|
'Projection': {'ProjectionType': 'ALL'}
|
|
}
|
|
}
|
|
]
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_index_requires_rf_rack_valid_keyspace(manager: ManagerClient):
|
|
"""
|
|
Verify that creating a table with GSI or LSI and adding GSI to an existing table fails if
|
|
the keyspace is not RF-rack-valid.
|
|
The test is only relevant without the rack list feature. Otherwise, the
|
|
keyspace will always be RF-rack-valid and it won't be possible to get the
|
|
error.
|
|
"""
|
|
config = alternator_config | {"rf_rack_valid_keyspaces": False, 'error_injections_at_startup': [{'name': 'suppress_features', 'value': 'RACK_LIST_RF'}]}
|
|
# 4 nodes in 4 racks, default RF=3, therefore not RF-rack-valid
|
|
servers = await manager.servers_add(4, config=config, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1'},
|
|
{'dc': 'dc1', 'rack': 'rack2'},
|
|
{'dc': 'dc1', 'rack': 'rack3'},
|
|
{'dc': 'dc1', 'rack': 'rack4'},
|
|
])
|
|
alternator = get_alternator(servers[0].ip_addr)
|
|
|
|
expected_err_create = "GlobalSecondaryIndexes and LocalSecondaryIndexes on a table " \
|
|
"using tablets require the number of racks in the cluster to be either 1 or 3"
|
|
expected_err_update_add_gsi = "GlobalSecondaryIndexes on a table " \
|
|
"using tablets require the number of racks in the cluster to be either 1 or 3"
|
|
|
|
def create_table_with_index(alternator, table_name, index_type, initial_tablets):
|
|
create_table_args = dict(
|
|
TableName=table_name,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
Tags=[{'Key': 'system:initial_tablets', 'Value': initial_tablets}],
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE'}
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'S'},
|
|
{'AttributeName': 'c', 'AttributeType': 'S'}
|
|
],
|
|
)
|
|
if index_type == "GSI":
|
|
create_table_args["GlobalSecondaryIndexes"] = [
|
|
{
|
|
'IndexName': 'gsi1',
|
|
'KeySchema': [
|
|
{'AttributeName': 'c', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'p', 'KeyType': 'RANGE'},
|
|
],
|
|
'Projection': {'ProjectionType': 'ALL'}
|
|
}
|
|
]
|
|
elif index_type == "LSI":
|
|
create_table_args["LocalSecondaryIndexes"] = [
|
|
{
|
|
'IndexName': 'lsi1',
|
|
'KeySchema': [
|
|
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE'},
|
|
],
|
|
'Projection': {'ProjectionType': 'ALL'}
|
|
}
|
|
]
|
|
alternator.create_table(**create_table_args)
|
|
|
|
# Create a table with tablets and GSI or LSI - should fail because the keyspace is not RF-rack-valid
|
|
for index_type in ["GSI", "LSI"]:
|
|
with pytest.raises(ClientError, match=expected_err_create):
|
|
create_table_with_index(alternator, unique_table_name(), index_type, initial_tablets='1')
|
|
|
|
# Now create the table without tablets - should succeed
|
|
for index_type in ["GSI", "LSI"]:
|
|
create_table_with_index(alternator, unique_table_name(), index_type, initial_tablets='none')
|
|
|
|
# Create a table with tablets and no indexes, then add a GSI - the update should fail
|
|
table_name = unique_table_name()
|
|
# The server waits 10s for schema agreement after creating a table,
|
|
# which may not be enough after a sequence of rapid schema changes
|
|
# on a multi-node cluster (see SCYLLADB-1135). Retry if needed.
|
|
for attempt in range(2):
|
|
try:
|
|
create_table_with_index(alternator, table_name, index_type=None, initial_tablets='1')
|
|
break
|
|
except ClientError as e:
|
|
if 'schema agreement' not in str(e) or attempt == 1:
|
|
raise
|
|
with pytest.raises(ClientError, match=expected_err_update_add_gsi):
|
|
alternator.meta.client.update_table(
|
|
TableName=table_name,
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'c', 'AttributeType': 'S'},
|
|
{'AttributeName': 'p', 'AttributeType': 'S'},
|
|
],
|
|
GlobalSecondaryIndexUpdates=[
|
|
{
|
|
'Create': {
|
|
'IndexName': 'gsi1',
|
|
'KeySchema': [
|
|
{'AttributeName': 'c', 'KeyType': 'HASH'},
|
|
{'AttributeName': 'p', 'KeyType': 'RANGE'},
|
|
],
|
|
'Projection': {'ProjectionType': 'ALL'}
|
|
}
|
|
}
|
|
]
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_rf_rack_flag_enforces_rf_rack_validity(manager: ManagerClient):
|
|
"""
|
|
Verify that the flag `rf_rack_valid_keyspaces` enforces RF-rack-validity.
|
|
Create a cluster with 4 racks, and try to create a table. By default the
|
|
table will have RF=3, so the creation should fail with appropriate error.
|
|
The test is only relevant without the rack list feature. Otherwise, the
|
|
keyspace will always be RF-rack-valid and it won't be possible to get the
|
|
error.
|
|
"""
|
|
config = alternator_config | {"rf_rack_valid_keyspaces": True, 'error_injections_at_startup': [{'name': 'suppress_features', 'value': 'RACK_LIST_RF'}]}
|
|
|
|
# Add 4 nodes in 4 racks
|
|
servers_rack1 = await manager.servers_add(4, config=config, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1'},
|
|
{'dc': 'dc1', 'rack': 'rack2'},
|
|
{'dc': 'dc1', 'rack': 'rack3'},
|
|
{'dc': 'dc1', 'rack': 'rack4'},
|
|
])
|
|
|
|
alternator = get_alternator(servers_rack1[0].ip_addr)
|
|
|
|
expected_err = "the configuration option 'rf_rack_valid_keyspaces' is enabled, " \
|
|
"which enforces that tables using tablets can only be created in clusters " \
|
|
"that have either 1 or 3 racks"
|
|
|
|
# Try to create a table - the default is RF=3, so this should fail due to RF!=#racks
|
|
with pytest.raises(ClientError, match=expected_err):
|
|
alternator.create_table(
|
|
TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
|
|
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}]
|
|
)
|
|
|
|
# Unfortunately by default a Python thread print the exception that kills
|
|
# it (e.g., pytest assert failures) but it doesn't propagate the exception
|
|
# to the join() - so the overall test doesn't fail. The following ThreadWrapper
|
|
# causes join() to rethrow the exception, so the test will fail.
|
|
class ThreadWrapper(threading.Thread):
|
|
def run(self):
|
|
try:
|
|
self.ret = self._target(*self._args, **self._kwargs)
|
|
except BaseException as e:
|
|
self.exception = e
|
|
def join(self, timeout=None):
|
|
super().join(timeout)
|
|
if hasattr(self, 'exception'):
|
|
raise self.exception
|
|
return self.ret
|
|
|
|
# The following tests reproduce issue #13152, where if two schema changes
|
|
# are attempted concurrently, one of them may fail with:
|
|
# "Internal server error: service::group0_concurrent_modification
|
|
# (Failed to apply group 0 change due to concurrent modification)."
|
|
# We had this problem in six different operations - CreateTable, DeleteTable,
|
|
# UpdateTable, TagResource, UntagResource and UpdateTimeToLive - so we have
|
|
# several tests (the last three can be tested with almost identical code,
|
|
# so they share one parameterized test).
|
|
# Each of these tests checks concurrent invocation of just one operation
|
|
# (e.g., CreateTable), to allow us to reproduce the missing code in that
|
|
# specific operation. We assume that the correct code will use the same
|
|
# lock for all operations, so we don't need to test collision of diffent
|
|
# operations (e.g., CreateTable and DeleteTable) after we already test that
|
|
# CreateTable and DeleteTable each does the locking and retry correctly.
|
|
#
|
|
# This issue can only be reproduced on a cluster of multiple nodes when
|
|
# the operations are sent to different nodes - because a single node
|
|
# serializes its own schema modifications. This is why these tests must
|
|
# be here, in test/cluster, and not in the single-node test/alternator.
|
|
|
|
async def test_concurrent_createtable(manager: ManagerClient):
|
|
"""A reproducer for issue #13152 for the CreateTable operation:
|
|
concurrent CreateTable operations shouldn't fail "due to concurrent
|
|
"modification".
|
|
"""
|
|
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
|
|
# In boto3, "resources", the object returned by get_alternator(), are
|
|
# not thread-safe. However, we will create 3 threads each will write to
|
|
# a different alternators[i], so we're fine.
|
|
alternators = [get_alternator(server.ip_addr) for server in servers]
|
|
|
|
# Run the CreateTable operation, once, in each thread. There is no point
|
|
# in running multiple CreateTable operations, since only the very first
|
|
# CreateTable operation (before the table exists) will be slow and have
|
|
# an appreciatable chance of colliding with another concurrent operation.
|
|
# We'll use a barrier to increase the chance that the 3 threads start
|
|
# together and collide - on my test machine, before #15132 was fixed one
|
|
# attempt here fails around 80% of the time, which is good enough to
|
|
# reproduce the bug and test its fix. Nevertheless, we'll run (below)
|
|
# the whole check a "ntries" times in a loop, to bring number of test
|
|
# false-negatives even closer to zero.
|
|
table_name = unique_table_name()
|
|
barrier = threading.Barrier(len(servers), timeout=120)
|
|
def run_op(dynamodb):
|
|
barrier.wait()
|
|
try:
|
|
dynamodb.create_table(TableName=table_name,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH' }],
|
|
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N' }])
|
|
# Expect either a success or a ResourceInUseException.
|
|
# Anything else (e.g., InternalServerError) is a bug
|
|
except ClientError as e:
|
|
assert 'ResourceInUseException' in str(e)
|
|
ntries = 5
|
|
for i in range(ntries):
|
|
threads = [ThreadWrapper(target=run_op, args=[dynamodb]) for dynamodb in alternators]
|
|
for t in threads:
|
|
t.start()
|
|
try:
|
|
for t in threads:
|
|
t.join()
|
|
# If we're here, all the threads were successful, and the
|
|
# test passed. Actually it needs to pass ntries times before
|
|
# we really declare it successful.
|
|
finally:
|
|
barrier.reset()
|
|
# In theory (and in DynamoDB), delete_table() isn't possible
|
|
# until create_table() completed its asynchronous work, so
|
|
# we may need to try delete_table() multiple times.
|
|
timeout = time.time() + 120
|
|
while time.time() < timeout:
|
|
try:
|
|
alternators[0].meta.client.delete_table(TableName=table_name)
|
|
break
|
|
except ClientError as ce:
|
|
if ce.response['Error']['Code'] == 'ResourceInUseException':
|
|
time.sleep(1)
|
|
continue
|
|
elif ce.response['Error']['Code'] == 'ResourceNotFoundException':
|
|
# The table was never created, probably we had an
|
|
# exception from the table-creation threads, let's
|
|
# not add more error messages here.
|
|
break
|
|
raise
|
|
|
|
async def test_concurrent_deletetable(manager: ManagerClient):
|
|
"""A reproducer for issue #13152 for the DeleteTable operation:
|
|
concurrent DeleteTable operations shouldn't fail "due to concurrent
|
|
"modification".
|
|
"""
|
|
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
|
|
alternators = [get_alternator(server.ip_addr) for server in servers]
|
|
table_name = unique_table_name()
|
|
barrier = threading.Barrier(len(servers), timeout=120)
|
|
def run_op(dynamodb):
|
|
barrier.wait()
|
|
try:
|
|
dynamodb.meta.client.delete_table(TableName=table_name)
|
|
# Expect either a success or a ResourceNotFoundException
|
|
# (indicating another thread deleted the table).
|
|
# Anything else (e.g., InternalServerError) is a bug
|
|
except ClientError as e:
|
|
assert 'ResourceNotFoundException' in str(e)
|
|
ntries = 5
|
|
try:
|
|
for i in range(ntries):
|
|
alternators[0].create_table(TableName=table_name,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH' }],
|
|
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N' }])
|
|
alternators[0].meta.client.get_waiter('table_exists').wait(TableName=table_name)
|
|
threads = [ThreadWrapper(target=run_op, args=[dynamodb]) for dynamodb in alternators]
|
|
for t in threads:
|
|
t.start()
|
|
try:
|
|
for t in threads:
|
|
t.join()
|
|
finally:
|
|
barrier.reset()
|
|
try:
|
|
alternators[0].meta.client.delete_table(TableName=table_name)
|
|
except ClientError as e:
|
|
# If we got ResourceNotFoundException, the table was
|
|
# already deleted by the threads, that's expected.
|
|
if not 'ResourceNotFoundException' in str(e):
|
|
raise
|
|
finally:
|
|
# Delete the table, if an exception above caused us not to do it.
|
|
try:
|
|
alternators[0].meta.client.delete_table(TableName=table_name)
|
|
except ClientError as e:
|
|
if not 'ResourceNotFoundException' in str(e):
|
|
raise
|
|
|
|
async def test_concurrent_updatetable(manager: ManagerClient):
|
|
"""A reproducer for issue #13152 for the UpdateTable operation:
|
|
concurrent UpdateTable operations shouldn't fail "due to concurrent
|
|
"modification".
|
|
"""
|
|
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
|
|
alternators = [get_alternator(server.ip_addr) for server in servers]
|
|
table_name = unique_table_name()
|
|
barrier = threading.Barrier(len(servers), timeout=120)
|
|
def run_op(dynamodb):
|
|
barrier.wait()
|
|
try:
|
|
# Pick a slow use case of UpdateTable (adding a GSI) to increase
|
|
# the likelihood of a collision.
|
|
dynamodb.meta.client.update_table(TableName=table_name,
|
|
AttributeDefinitions=[{ 'AttributeName': 'x', 'AttributeType': 'S' }],
|
|
GlobalSecondaryIndexUpdates=[ { 'Create':
|
|
{ 'IndexName': 'hello',
|
|
'KeySchema': [{ 'AttributeName': 'x', 'KeyType': 'HASH' }],
|
|
'Projection': { 'ProjectionType': 'ALL' }
|
|
}}])
|
|
# Expect either a success or an error indicating another thread
|
|
# already added this GSI.
|
|
# Anything else (e.g., InternalServerError) is a bug
|
|
except ClientError as e:
|
|
assert 'GSI hello already exists' in str(e)
|
|
ntries = 5
|
|
try:
|
|
for i in range(ntries):
|
|
alternators[0].create_table(TableName=table_name,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH' }],
|
|
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N' }])
|
|
alternators[0].meta.client.get_waiter('table_exists').wait(TableName=table_name)
|
|
threads = [ThreadWrapper(target=run_op, args=[dynamodb]) for dynamodb in alternators]
|
|
for t in threads:
|
|
t.start()
|
|
try:
|
|
for t in threads:
|
|
t.join()
|
|
finally:
|
|
barrier.reset()
|
|
alternators[0].meta.client.delete_table(TableName=table_name)
|
|
finally:
|
|
# Delete the table, if an exception above caused us not to do it.
|
|
try:
|
|
alternators[0].meta.client.delete_table(TableName=table_name)
|
|
except ClientError as e:
|
|
if not 'ResourceNotFoundException' in str(e):
|
|
raise
|
|
|
|
@pytest.mark.parametrize('op', ['TagResource', 'UntagResource', 'UpdateTimeToLive'])
|
|
async def test_concurrent_modify_tags(manager: ManagerClient, op):
|
|
"""A reproducer for issue #13152 for the TagResource, UntagResource
|
|
and UpdateTimeToLive operation (each one in a separate parametrization
|
|
of the test). Concurrent operations shouldn't fail "due to concurrent
|
|
"modification".
|
|
The name of this test is named after db::modify_tags(), which all
|
|
three of these operations use to implement the change to the table.
|
|
"""
|
|
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
|
|
alternators = [get_alternator(server.ip_addr) for server in servers]
|
|
table_name = unique_table_name()
|
|
barrier = threading.Barrier(len(servers), timeout=120)
|
|
def run_op(dynamodb):
|
|
barrier.wait()
|
|
if op == 'TagResource':
|
|
arn = dynamodb.meta.client.describe_table(TableName=table_name)['Table']['TableArn']
|
|
dynamodb.meta.client.tag_resource(ResourceArn=arn, Tags=[{'Key': 'animal', 'Value': 'dog'}])
|
|
elif op == 'UntagResource':
|
|
arn = dynamodb.meta.client.describe_table(TableName=table_name)['Table']['TableArn']
|
|
dynamodb.meta.client.untag_resource(ResourceArn=arn, TagKeys=['animal'])
|
|
elif op == 'UpdateTimeToLive':
|
|
# For the UpdateTimeToLive operation to actually attempt a write
|
|
# (and possibly notice a collision), we need to set Enabled to
|
|
# the opposite of what it is right now. Let's just pick a random
|
|
# boolean - 50% of the time it will do the right thing and
|
|
# we may see the collision.
|
|
try:
|
|
dynamodb.meta.client.update_time_to_live(TableName=table_name,
|
|
TimeToLiveSpecification={'AttributeName': 'xxx', 'Enabled': bool(random.getrandbits(1))})
|
|
except ClientError as e:
|
|
if not 'TTL is already' in str(e):
|
|
raise
|
|
else:
|
|
pytest.fail(f'oops, bad op {op}')
|
|
alternators[0].create_table(TableName=table_name,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH' }],
|
|
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N' }])
|
|
alternators[0].meta.client.get_waiter('table_exists').wait(TableName=table_name)
|
|
ntries = 5
|
|
try:
|
|
for i in range(ntries):
|
|
threads = [ThreadWrapper(target=run_op, args=[dynamodb]) for dynamodb in alternators]
|
|
for t in threads:
|
|
t.start()
|
|
try:
|
|
for t in threads:
|
|
t.join()
|
|
finally:
|
|
barrier.reset()
|
|
finally:
|
|
alternators[0].meta.client.delete_table(TableName=table_name)
|
|
|
|
async def nodes_with_data(manager, ks, cf, host):
|
|
"""Retrieves a set of node uuids which contain *any* data for the given
|
|
table. If the table uses tablets, we use the system.tablets (via
|
|
the convenience function get_all_tablet_replicas()). But if the table
|
|
uses vnodes, we use a REST API request /storage_service/tokens_endpoint
|
|
which returns the primary node for each token (with vnodes, if a node
|
|
has any data at all, it is also primary for some of the data).
|
|
The information is retrieved using requests to the given "host",
|
|
which can be any live node.
|
|
"""
|
|
r = await get_all_tablet_replicas(manager, host, ks, cf)
|
|
if r:
|
|
# If table uses tablets it will have a non-empty list of tablets (r)
|
|
# and we return it here.
|
|
return { item[0] for entry in r for item in entry.replicas }
|
|
else:
|
|
# Otherwise, the table uses vnodes. Use the REST API that only
|
|
# makes sense with vnodes. Convert the host IP addresses that this
|
|
# API returns to uuids like we have in the system tables
|
|
j = await manager.api.client.get_json('/storage_service/tokens_endpoint', host=host.ip_addr)
|
|
return { await manager.api.get_host_id(entry['value']) for entry in j }
|
|
|
|
@pytest.mark.parametrize("tablets", [True, False])
|
|
@pytest.mark.asyncio
|
|
async def test_zero_token_node_load_balancer(manager, tablets):
|
|
"""Test that a zero-token node (a.k.a. coordinator-only or proxy node),
|
|
can be used as an Alternator server-side load balancer as proposed in
|
|
issue #6527. We set up a cluster with four ordinary nodes (one DC and
|
|
one rack), and a fifth node which doesn't have any data (a zero-token
|
|
node), and make different Alternator requests (CreateTable, PutItem,
|
|
GetItem) to this data-less fifth node, and they should work. Finally
|
|
we verify that the fifth node really does not have any data (and
|
|
wasn't just created as a normal data-holding node).
|
|
Because the implementation of zero-token nodes is very different
|
|
for the tablets and vnodes cases, this test has two parametrized
|
|
versions - tablets=True and tablets=False.
|
|
"""
|
|
if tablets:
|
|
tags = [{'Key': 'system:initial_tablets', 'Value': '0'}]
|
|
else:
|
|
tags = [{'Key': 'system:initial_tablets', 'Value': 'none'}]
|
|
# Start a cluster with 4 nodes. Alternator uses RF=3, so with 4 nodes
|
|
# the assignment of data (tablets or vnodes) to nodes isn't trivial,
|
|
# which will allow us to check that non-trivial request forwarding works.
|
|
servers = await manager.servers_add(4, config=alternator_config, auto_rack_dc="dc1")
|
|
# Add a fifth node, with zero tokens (no data), by setting join_ring=false:
|
|
zero_token_server = await manager.server_add(config=alternator_config | {'join_ring': False},
|
|
property_file={'dc': 'dc1', 'rack': 'rack-zero-token'})
|
|
|
|
# Get an Alternator connection to the zero-token node:
|
|
alternator = get_alternator(zero_token_server.ip_addr)
|
|
|
|
# Create a new table, write 10 different items to it and then read them
|
|
# back - doing all of this through the zero-token node:
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
Tags=tags,
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'N' },
|
|
])
|
|
items = [{'p': i, 'x': f'hello {i}'} for i in range(10)]
|
|
for item in items:
|
|
table.put_item(Item=item)
|
|
for item in items:
|
|
assert item == table.get_item(Key={'p': item['p']}, ConsistentRead=True)['Item']
|
|
# Verify that the zero-token node is really "zero-token", i.e., does not
|
|
# have any data for our table. The nodes_with_data() function returns
|
|
# the list of node uuids which contain *any* data for the given table -
|
|
# we want this to be just the first four nodes in "servers", not the
|
|
# fifth node zero_token_server.
|
|
ks = f"alternator_{table.name}"
|
|
repl = get_replication(manager.get_cql(), ks)
|
|
if type(repl["dc1"]) is list:
|
|
expected = { await manager.get_host_id(s.server_id) for s in servers if s.rack in repl["dc1"] }
|
|
else:
|
|
expected = { await manager.get_host_id(s.server_id) for s in servers }
|
|
got = await nodes_with_data(manager, 'alternator_'+table.name, table.name, zero_token_server)
|
|
assert got == expected
|
|
table.delete()
|
|
|
|
@pytest.mark.xfail(reason="#16261", strict=False)
|
|
async def test_alternator_concurrent_rmw_same_partition_different_server(manager: ManagerClient):
|
|
"""A reproducer for issue #16261: When sending RMW (read-modify-write)
|
|
operations to the same partition (different item) on different server
|
|
nodes (coordinators), our LWT implementation can reach an
|
|
"uncertainty" situation where it doesn't know whether the update
|
|
succceeded or passed, and returns a failure (InternalServerError)
|
|
almost immediately, not after a cas_contention_timeout_in_ms timeout
|
|
(1 second).
|
|
"""
|
|
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
|
|
alternator = get_alternator(servers[0].ip_addr)
|
|
ips = [server.ip_addr for server in await manager.running_servers()]
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[
|
|
{'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
{'AttributeName': 'c', 'KeyType': 'RANGE' },
|
|
],
|
|
AttributeDefinitions=[
|
|
{'AttributeName': 'p', 'AttributeType': 'N' },
|
|
{'AttributeName': 'c', 'AttributeType': 'N' },
|
|
])
|
|
|
|
# All threads write to one partition 1, each to a different item
|
|
# in that partition (its clustering key is the thread's number).
|
|
# Each update gets sent to a random node (we have 3 nodes in ips).
|
|
nthreads = 3
|
|
def run_rmw(i):
|
|
rand = random.Random()
|
|
rand.seed(i)
|
|
alternators = [get_alternator(ip) for ip in ips]
|
|
# In about 1/10 runs, just one write from each thread is enough
|
|
# to elicit the error. But if I want to get the error in almost
|
|
# every run, I need to repeat the write more times, until two
|
|
# of the writes collide and cause the bug.
|
|
for n in range(150):
|
|
alternator_i = rand.randrange(len(alternators))
|
|
alternator = alternators[alternator_i]
|
|
tbl = alternator.Table(table.name)
|
|
start = time.time()
|
|
try:
|
|
tbl.update_item(Key={'p': 1, 'c': i},
|
|
UpdateExpression='SET v = if_not_exists(v, :init) + :incr',
|
|
ExpressionAttributeValues={':init': 0, ':incr': 1})
|
|
except ClientError:
|
|
# The "raise" will cause this thread to fail, and eventually
|
|
# the join() and therefore the whole test will fail. We also
|
|
# print the time it took for the failure, because it
|
|
# demonstrates that issue #16261 involves an immediate
|
|
# error (in less than 20ms), NOT a normal timeout.
|
|
print(f"In incrementing 1,{i} on node {alternator_i}: error after {time.time()-start}")
|
|
raise
|
|
|
|
threads = [ThreadWrapper(target=run_rmw, args=(i,)) for i in range(nthreads)]
|
|
for t in threads:
|
|
t.start()
|
|
try:
|
|
for t in threads:
|
|
t.join()
|
|
finally:
|
|
table.delete()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
|
|
"""
|
|
Reproducer for issue #27353.
|
|
|
|
LWT requires that storage_proxy::cas() is invoked on a valid shard — the one
|
|
returned by sharder.try_get_shard_for_reads() for a tablets-based table.
|
|
|
|
The bug: if the current shard is invalid and we jump to the valid shard, that
|
|
new shard may become invalid again by the time we attempt to capture the ERM.
|
|
This leads to a failure of the CAS path.
|
|
|
|
The fix: retry the validity check and jump again if the current shard is already
|
|
invalid. We should exit the loop once the shard is valid *and* we hold a strong pointer
|
|
to the ERM — which prevents further tablet movements until the ERM is released.
|
|
|
|
This problem is specific to BatchWriteItem; other commands are already handled
|
|
correctly.
|
|
"""
|
|
config = alternator_config.copy()
|
|
config['alternator_write_isolation'] = 'always_use_lwt'
|
|
cmdline = [
|
|
'--logger-log-level', 'alternator-executor=trace',
|
|
'--logger-log-level', 'alternator_controller=trace',
|
|
'--logger-log-level', 'paxos=trace'
|
|
]
|
|
server = await manager.server_add(config=config, cmdline=cmdline)
|
|
alternator = get_alternator(server.ip_addr)
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Creating alternator test table")
|
|
table = alternator.create_table(TableName=unique_table_name(),
|
|
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
|
|
BillingMode='PAY_PER_REQUEST',
|
|
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
|
|
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N'}])
|
|
table_name = table.name
|
|
ks_name = 'alternator_' + table_name
|
|
last_token = 7 # Any token works since we have only one tablet
|
|
|
|
(src_host_id, src_shard) = await get_tablet_replica(manager, server, ks_name, table_name, last_token)
|
|
dst_shard = 0 if src_shard == 1 else 1
|
|
|
|
logger.info("Inject 'intranode_migration_streaming_wait'")
|
|
await manager.api.enable_injection(server.ip_addr,
|
|
"intranode_migration_streaming_wait",
|
|
one_shot=False)
|
|
|
|
logger.info("Start tablet migration")
|
|
intranode_migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(server.ip_addr, ks_name, table_name,
|
|
src_host_id, src_shard,
|
|
src_host_id, dst_shard, last_token))
|
|
|
|
logger.info("Open server logs")
|
|
log = await manager.server_open_log(server.server_id)
|
|
|
|
logger.info("Wait for intranode_migration_streaming_wait")
|
|
await log.wait_for("intranode_migration_streaming: waiting")
|
|
|
|
logger.info("Inject 'alternator_executor_batch_write_wait'")
|
|
await manager.api.enable_injection(server.ip_addr,
|
|
"alternator_executor_batch_write_wait",
|
|
one_shot=False,
|
|
parameters={
|
|
'table': table_name,
|
|
'keyspace': ks_name,
|
|
'shard': dst_shard
|
|
})
|
|
m = await log.mark()
|
|
|
|
# Start a background thread, which tries to hit the alternator_executor_batch_write_wait
|
|
# injection on the destination shard.
|
|
logger.info("Start a batch_write thread")
|
|
stop_event = threading.Event()
|
|
def run_batch():
|
|
alternator = get_alternator(server.ip_addr)
|
|
table = alternator.Table(table_name)
|
|
while not stop_event.is_set():
|
|
with table.batch_writer() as batch:
|
|
batch.put_item(Item={'p': 1, 'x': 'hellow world'})
|
|
t = ThreadWrapper(target=run_batch)
|
|
t.start()
|
|
|
|
logger.info("Waiting for 'alternator_executor_batch_write_wait: hit'")
|
|
await log.wait_for("alternator_executor_batch_write_wait: hit", from_mark=m)
|
|
|
|
# We have a batch request with "streaming" cas_shard on the destination shard.
|
|
# This means we have already made a decision to jump to the src_shard.
|
|
# Now we're releasing the tablet migration so that it reaches write_both_read_new and
|
|
# and invaldiates this decision.
|
|
|
|
m = await log.mark()
|
|
await manager.api.message_injection(server.ip_addr, "intranode_migration_streaming_wait")
|
|
|
|
# The next barrier must be for the write_both_read_new, we need a guarantee
|
|
# that the src_shard observed it
|
|
logger.info("Waiting for the next barrier")
|
|
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
|
|
from_mark=m)
|
|
|
|
# Now we have a guarantee that a new barrier succeeded on the src_shard,
|
|
# this means the src_shard has already transitioned to write_both_read_new,
|
|
# and our batch write will have to jump back to the destination shard.
|
|
|
|
logger.info("Release the 'alternator_executor_batch_write_wait'")
|
|
await manager.api.message_injection(server.ip_addr, "alternator_executor_batch_write_wait")
|
|
|
|
logger.info("Waiting for migratino task to finish")
|
|
await intranode_migration_task
|
|
|
|
stop_event.set()
|
|
t.join()
|