As noticed in issue #26079, the Alternator test test_number.py::test_invalid_numbers failed on DynamoDB, because one of the things it did, as a "sanity check", was to check that the number 0e1000 was a valid number. But it turns out it isn't allowed by DynamoDB. So this patch removes 0e1000 from the list of *valid* numbers in test_invalid_numbers, and instead creates a whole new test for the case of 0e1000. It turns out that DynamoDB has a bug (it appears to be a regression, because test_invalid_numbers used to pass on DynamoDB!) where it allows 0.0e1000 (since it's just zero, really!) but forbids 0e1000 which is incorrectly considered to have a too-large magnitude. So we introduce a test that confirms that Alternator correctly allows both 0.0e1000 and 0e1000. DynamoDB fails this test (it allows the first, forbidding the second), making it the first Alternator test tagged as a "dynamodb_bug". Signed-off-by: Nadav Har'El <nyh@scylladb.com>
458 lines
22 KiB
Python
458 lines
22 KiB
Python
# Copyright 2019-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
# This file contains "test fixtures", a pytest concept described in
|
|
# https://docs.pytest.org/en/latest/fixture.html.
|
|
# A "fixture" is some sort of setup which an individual test requires to run.
|
|
# The fixture has setup code and teardown code, and if multiple tests
|
|
# require the same fixture, it can be set up only once - while still allowing
|
|
# the user to run individual tests and automatically set up the fixtures they need.
|
|
|
|
import pytest
|
|
import boto3
|
|
import requests
|
|
import re
|
|
|
|
from test.alternator.util import create_test_table, is_aws, scylla_log
|
|
from test.cqlpy.conftest import host # add required fixtures
|
|
from test.pylib.runner import testpy_test_fixture_scope
|
|
from test.pylib.suite.python import add_host_option
|
|
from urllib.parse import urlparse
|
|
from functools import cache
|
|
|
|
# Test that the Boto libraries are new enough. These tests want to test a
|
|
# large variety of DynamoDB API features, and to do this we need a new-enough
|
|
# version of the the Boto libraries (boto3 and botocore) so that they can
|
|
# access all these API features.
|
|
# In particular, the BillingMode feature was added in botocore 1.12.54.
|
|
import botocore
|
|
import sys
|
|
from packaging.version import Version
|
|
if (Version(botocore.__version__) < Version('1.12.54')):
|
|
pytest.exit("Your Boto library is too old. Please upgrade it,\ne.g. using:\n sudo pip{} install --upgrade boto3".format(sys.version_info[0]))
|
|
|
|
# We've been seeing Python crashing when shutting down after successfully
|
|
# finishing Alternator tests, and couldn't figure out why (issue #17564).
|
|
# Hopefully this will produce useful debugging information:
|
|
import faulthandler
|
|
faulthandler.enable(all_threads=True)
|
|
|
|
# By default, tests run against a local Scylla installation on localhost:8080/.
|
|
# The "--aws" option can be used to run against Amazon DynamoDB in the us-east-1
|
|
# region.
|
|
def pytest_addoption(parser):
|
|
parser.addoption("--aws", action="store_true",
|
|
help="run against AWS instead of a local Scylla installation")
|
|
parser.addoption("--https", action="store_true",
|
|
help="communicate via HTTPS protocol on port 8043 instead of HTTP when"
|
|
" running against a local Scylla installation")
|
|
parser.addoption("--url", action="store",
|
|
help="communicate with given URL instead of defaults")
|
|
parser.addoption("--runveryslow", action="store_true",
|
|
help="run tests marked veryslow instead of skipping them")
|
|
add_host_option(parser)
|
|
|
|
def pytest_configure(config):
|
|
config.addinivalue_line("markers", "veryslow: mark test as very slow to run")
|
|
|
|
def pytest_collection_modifyitems(config, items):
|
|
if config.getoption("--runveryslow"):
|
|
# --runveryslow given in cli: do not skip veryslow tests
|
|
return
|
|
skip_veryslow = pytest.mark.skip(reason="need --runveryslow option to run")
|
|
for item in items:
|
|
if "veryslow" in item.keywords:
|
|
item.add_marker(skip_veryslow)
|
|
|
|
# When testing Alternator running with --alternator-enforce-authorization=1,
|
|
# we need to find a valid username and secret key to use in the connection.
|
|
# Alternator allows any CQL role as the username any CQL role, and the key
|
|
# is that role's password's salted hash. We can read a valid role/hash
|
|
# from the appropriate system table, but can't do it with Alternator (because
|
|
# we don't know yet the secret key!), so we need to do it with CQL.
|
|
# If this function can't connect to CQL, it will return an arbitrary
|
|
# user/secret pair, and hope it would work if alternator-enforce-authorization
|
|
# is off.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def get_valid_alternator_role():
|
|
from cassandra.cluster import Cluster, NoHostAvailable
|
|
from cassandra.auth import PlainTextAuthProvider
|
|
|
|
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
|
|
|
|
@cache
|
|
def _get_valid_alternator_role(url, role='cassandra'):
|
|
try:
|
|
with (
|
|
Cluster([urlparse(url).hostname], auth_provider=auth_provider,
|
|
connect_timeout = 60, control_connection_timeout = 60) as cluster,
|
|
cluster.connect() as session
|
|
):
|
|
# Newer Scylla places the "roles" table in the "system" keyspace, but
|
|
# older versions used "system_auth_v2" or "system_auth"
|
|
for ks in ['system', 'system_auth_v2', 'system_auth']:
|
|
try:
|
|
# We could have looked for any role/salted_hash pair, but we
|
|
# already know a role "cassandra" exists (we just used it to
|
|
# connect to CQL!), so let's just use that role.
|
|
salted_hash = list(session.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{role}'"))[0].salted_hash
|
|
if salted_hash is None:
|
|
break
|
|
return (role, salted_hash)
|
|
except:
|
|
pass
|
|
except NoHostAvailable:
|
|
# CQL is not available, so we can't find a valid role.
|
|
pass
|
|
# If we couldn't find a valid role, let's hope that
|
|
# alternator-enforce-authorization is not enabled so anything will work
|
|
return ('unknownuser', 'unknownsecret')
|
|
|
|
return _get_valid_alternator_role
|
|
|
|
# "dynamodb" fixture: set up client object for communicating with the DynamoDB
|
|
# API. Currently this chooses either Amazon's DynamoDB in the default region
|
|
# or a local Alternator installation on http://localhost:8080 - depending on the
|
|
# existence of the "--aws" option. In the future we should provide options
|
|
# for choosing other Amazon regions or local installations.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def dynamodb(request, get_valid_alternator_role):
|
|
# Disable boto3's client-side validation of parameters. This validation
|
|
# only makes it impossible for us to test various error conditions,
|
|
# because boto3 checks them before we can get the server to check them.
|
|
boto_config = botocore.client.Config(parameter_validation=False)
|
|
if request.config.getoption('aws'):
|
|
res = boto3.resource('dynamodb', config=boto_config)
|
|
else:
|
|
# Even though we connect to the local installation, Boto3 still
|
|
# requires us to specify dummy region and credential parameters,
|
|
# otherwise the user is forced to properly configure ~/.aws even
|
|
# for local runs.
|
|
if request.config.getoption('url') != None:
|
|
local_url = request.config.getoption('url')
|
|
elif address := request.getfixturevalue("host"):
|
|
# this argument needed for compatibility with PythonTestSuite without modifying the previous behavior
|
|
local_url = f"http://{address}:8000"
|
|
else:
|
|
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
|
|
# Disable verifying in order to be able to use self-signed TLS certificates
|
|
verify = not request.config.getoption('https')
|
|
user, secret = get_valid_alternator_role(local_url)
|
|
res = boto3.resource('dynamodb', endpoint_url=local_url, verify=verify,
|
|
region_name='us-east-1', aws_access_key_id=user, aws_secret_access_key=secret,
|
|
config=boto_config.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300)))
|
|
yield res
|
|
res.meta.client.close()
|
|
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def new_dynamodb_session(request, dynamodb, get_valid_alternator_role):
|
|
def _new_dynamodb_session(user='cassandra', password='secret_pass'):
|
|
ses = boto3.Session()
|
|
host = urlparse(dynamodb.meta.client._endpoint.host)
|
|
conf = botocore.client.Config(parameter_validation=False)
|
|
if request.config.getoption('aws'):
|
|
return boto3.resource('dynamodb', config=conf)
|
|
if host.hostname == 'localhost':
|
|
conf = conf.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300))
|
|
user, secret = get_valid_alternator_role(dynamodb.meta.client._endpoint.host, role=user)
|
|
return ses.resource('dynamodb', endpoint_url=dynamodb.meta.client._endpoint.host, verify=host.scheme != 'http',
|
|
region_name='us-east-1', aws_access_key_id=user, aws_secret_access_key=secret,
|
|
config=conf)
|
|
return _new_dynamodb_session
|
|
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def dynamodbstreams(request, get_valid_alternator_role):
|
|
# Disable boto3's client-side validation of parameters. This validation
|
|
# only makes it impossible for us to test various error conditions,
|
|
# because boto3 checks them before we can get the server to check them.
|
|
boto_config = botocore.client.Config(parameter_validation=False)
|
|
if request.config.getoption('aws'):
|
|
res = boto3.client('dynamodbstreams', config=boto_config)
|
|
else:
|
|
# Even though we connect to the local installation, Boto3 still
|
|
# requires us to specify dummy region and credential parameters,
|
|
# otherwise the user is forced to properly configure ~/.aws even
|
|
# for local runs.
|
|
if request.config.getoption('url') != None:
|
|
local_url = request.config.getoption('url')
|
|
elif address := request.getfixturevalue("host"):
|
|
# this argument needed for compatibility with PythonTestSuite without modifying the previous behavior
|
|
local_url = f"http://{address}:8000"
|
|
else:
|
|
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
|
|
# Disable verifying in order to be able to use self-signed TLS certificates
|
|
verify = not request.config.getoption('https')
|
|
user, secret = get_valid_alternator_role(local_url)
|
|
res = boto3.client('dynamodbstreams', endpoint_url=local_url, verify=verify,
|
|
region_name='us-east-1', aws_access_key_id=user, aws_secret_access_key=secret,
|
|
config=boto_config.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300)))
|
|
yield res
|
|
res.close()
|
|
|
|
# A function-scoped autouse=True fixture allows us to test after every test
|
|
# that the server is still alive - and if not report the test which crashed
|
|
# it and stop running any more tests.
|
|
@pytest.fixture(scope="function", autouse=True)
|
|
def dynamodb_test_connection(dynamodb, request, optional_rest_api):
|
|
scylla_log(optional_rest_api, f'test/alternator: Starting {request.node.parent.name}::{request.node.name}', 'info')
|
|
if dynamodb_test_connection.scylla_crashed:
|
|
pytest.skip('Server down')
|
|
yield
|
|
try:
|
|
# We want to run a do-nothing DynamoDB command. The health-check
|
|
# URL is the fastest one.
|
|
url = dynamodb.meta.client._endpoint.host
|
|
response = requests.get(url, verify=False)
|
|
# We don't check response: In Alternator and DynamoDB, we expect
|
|
# response.ok (200), but in recent versions of DynamoDB Local we can
|
|
# get error code 400 because it only allows signed health requests
|
|
# and gives an invalid signature error on an unsigned get().
|
|
# In any case, any HTTP response (as opposed to exception in get())
|
|
# means that the server is still alive.
|
|
except:
|
|
dynamodb_test_connection.scylla_crashed = True
|
|
pytest.fail(f'Scylla appears to have crashed in test {request.node.parent.name}::{request.node.name}')
|
|
scylla_log(optional_rest_api, f'test/alternator: Ended {request.node.parent.name}::{request.node.name}', 'info')
|
|
|
|
dynamodb_test_connection.scylla_crashed = False
|
|
|
|
# "test_table" fixture: Create and return a temporary table to be used in tests
|
|
# that need a table to work on. The table is automatically deleted at the end.
|
|
# This "test_table" creates a table which has a specific key schema: both a
|
|
# partition key and a sort key, and both are strings. Other fixtures (below)
|
|
# can be used to create different types of tables.
|
|
#
|
|
# TODO: Although we are careful about deleting temporary tables when the
|
|
# fixture is torn down, in some cases (e.g., interrupted tests) we can be left
|
|
# with some tables not deleted, and they will never be deleted. Because all
|
|
# our temporary tables have the same test_table_prefix, we can actually find
|
|
# and remove these old tables with this prefix. We can have a fixture, which
|
|
# test_table will require, which on teardown will delete all remaining tables
|
|
# (possibly from an older run). Because the table's name includes the current
|
|
# time, we can also remove just tables older than a particular age. Such
|
|
# mechanism will allow running tests in parallel, without the risk of deleting
|
|
# a parallel run's temporary tables.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def test_table(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
|
|
],
|
|
AttributeDefinitions=[
|
|
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
|
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
|
])
|
|
yield table
|
|
# We get back here when this fixture is torn down. We ask Dynamo to delete
|
|
# this table, but not wait for the deletion to complete. The next time
|
|
# we create a test_table fixture, we'll choose a different table name
|
|
# anyway.
|
|
table.delete()
|
|
|
|
# The following fixtures test_table_* are similar to test_table but create
|
|
# tables with different key schemas.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def test_table_s(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ])
|
|
yield table
|
|
table.delete()
|
|
# test_table_s_2 has exactly the same schema as test_table_s, and is useful
|
|
# for tests which need two different tables with the same schema.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def test_table_s_2(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' } ])
|
|
yield table
|
|
table.delete()
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def test_table_b(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'B' } ])
|
|
yield table
|
|
table.delete()
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def test_table_sb(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'B' } ])
|
|
yield table
|
|
table.delete()
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def test_table_sn(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'N' } ])
|
|
yield table
|
|
table.delete()
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def test_table_ss(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' }, { 'AttributeName': 'c', 'KeyType': 'RANGE' } ],
|
|
AttributeDefinitions=[ { 'AttributeName': 'p', 'AttributeType': 'S' }, { 'AttributeName': 'c', 'AttributeType': 'S' } ])
|
|
yield table
|
|
table.delete()
|
|
|
|
# "filled_test_table" fixture: Create a temporary table to be used in tests
|
|
# that involve reading data - GetItem, Scan, etc. The table is filled with
|
|
# 328 items - each consisting of a partition key, clustering key and two
|
|
# string attributes. 164 of the items are in a single partition (with the
|
|
# partition key 'long') and the 164 other items are each in a separate
|
|
# partition. Finally, a 329th item is added with different attributes.
|
|
# This table is supposed to be read from, not updated nor overwritten.
|
|
# This fixture returns both a table object and the description of all items
|
|
# inserted into it.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def filled_test_table(dynamodb):
|
|
table = create_test_table(dynamodb,
|
|
KeySchema=[ { 'AttributeName': 'p', 'KeyType': 'HASH' },
|
|
{ 'AttributeName': 'c', 'KeyType': 'RANGE' }
|
|
],
|
|
AttributeDefinitions=[
|
|
{ 'AttributeName': 'p', 'AttributeType': 'S' },
|
|
{ 'AttributeName': 'c', 'AttributeType': 'S' },
|
|
])
|
|
count = 164
|
|
items = [{
|
|
'p': str(i),
|
|
'c': str(i),
|
|
'attribute': "x" * 7,
|
|
'another': "y" * 16
|
|
} for i in range(count)]
|
|
items = items + [{
|
|
'p': 'long',
|
|
'c': str(i),
|
|
'attribute': "x" * (1 + i % 7),
|
|
'another': "y" * (1 + i % 16)
|
|
} for i in range(count)]
|
|
items.append({'p': 'hello', 'c': 'world', 'str': 'and now for something completely different'})
|
|
|
|
with table.batch_writer() as batch:
|
|
for item in items:
|
|
batch.put_item(item)
|
|
|
|
yield table, items
|
|
table.delete()
|
|
|
|
# The "scylla_only" fixture can be used by tests for Scylla-only features,
|
|
# which do not exist on AWS DynamoDB. A test using this fixture will be
|
|
# skipped if running with "--aws".
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def scylla_only(dynamodb):
|
|
if is_aws(dynamodb):
|
|
pytest.skip('Scylla-only feature not supported by AWS')
|
|
|
|
# "dynamodb_bug" is similar to "scylla_only", except instead of skipping
|
|
# the test, it is expected to fail (xfail) on AWS DynamoDB. It should be
|
|
# used in rare cases where we consider Alternator's behavior to be the
|
|
# corect one, and DynamoDB's to be the bug. Tests using this fixture should
|
|
# have a prominent comment explaining why we believe this to be a bug in
|
|
# DynamoDB.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def dynamodb_bug(dynamodb):
|
|
if is_aws(dynamodb):
|
|
pytest.xfail('A known bug in AWS DynamoDB')
|
|
|
|
# A fixture allowing to make Scylla-specific REST API requests.
|
|
# If we're not testing Scylla, or the REST API port (10000) is not available,
|
|
# the test using this fixture will be skipped with a message about the REST
|
|
# API not being available.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def rest_api(dynamodb, optional_rest_api):
|
|
if optional_rest_api is None:
|
|
pytest.skip('Cannot connect to Scylla REST API')
|
|
return optional_rest_api
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def optional_rest_api(dynamodb):
|
|
if is_aws(dynamodb):
|
|
return None
|
|
url = dynamodb.meta.client._endpoint.host
|
|
# The REST API is on port 10000, and always http, not https.
|
|
url = re.sub(r':[0-9]+(/|$)', ':10000', url)
|
|
url = re.sub(r'^https:', 'http:', url)
|
|
# Scylla's REST API does not have an official "ping" command,
|
|
# so we just list the keyspaces as a (usually) short operation
|
|
try:
|
|
requests.get(f'{url}/column_family/name/keyspace', timeout=1).raise_for_status()
|
|
except:
|
|
return None
|
|
return url
|
|
|
|
# Fixture to check once whether newly created Alternator tables use the
|
|
# tablet feature. It is used by the xfail_tablets and skip_tablets fixtures
|
|
# below to xfail or skip a test which is known to be failing with tablets.
|
|
# This is a temporary measure - eventually everything in Scylla should work
|
|
# correctly with tablets, and these fixtures can be removed.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def has_tablets(dynamodb, test_table):
|
|
# We rely on some knowledge of Alternator internals:
|
|
# 1. For table with name X, Scylla creates a keyspace called alternator_X
|
|
# 2. We can read a CQL system table using the ".scylla.alternator." prefix.
|
|
info = dynamodb.Table('.scylla.alternator.system_schema.scylla_keyspaces')
|
|
try:
|
|
response = info.query(
|
|
KeyConditions={'keyspace_name': {
|
|
'AttributeValueList': ['alternator_'+test_table.name],
|
|
'ComparisonOperator': 'EQ'}})
|
|
except dynamodb.meta.client.exceptions.ResourceNotFoundException:
|
|
# The internal Scylla table doesn't even exist, either this isn't
|
|
# Scylla or it's older Scylla and doesn't use tablets.
|
|
return False
|
|
if not 'Items' in response or not response['Items']:
|
|
return False
|
|
if 'initial_tablets' in response['Items'][0] and response['Items'][0]['initial_tablets']:
|
|
return True
|
|
return False
|
|
|
|
@pytest.fixture(scope="function")
|
|
def xfail_tablets(request, has_tablets):
|
|
if has_tablets:
|
|
request.node.add_marker(pytest.mark.xfail(reason='Test expected to fail when Alternator tables use tablets'))
|
|
|
|
@pytest.fixture(scope="function")
|
|
def skip_tablets(has_tablets):
|
|
if has_tablets:
|
|
pytest.skip("Test may crash when Alternator tables use tablets")
|
|
|
|
# Alternator tests normally use only the DynamoDB API. However, a few tests
|
|
# need to use CQL to set up Scylla-only features such as service levels or
|
|
# CQL-based RBAC (see test_service_levels.py and test_cql_rbac.py), and
|
|
# the "cql" fixture enables using CQL.
|
|
# If we're not testing Scylla, or the CQL port is not available on the same
|
|
# IP address as the Alternator IP address, a test using this fixture will
|
|
# be skipped with a message about the CQL API not being available.
|
|
@pytest.fixture(scope=testpy_test_fixture_scope)
|
|
def cql(dynamodb):
|
|
from cassandra.auth import PlainTextAuthProvider
|
|
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
|
|
from cassandra.policies import RoundRobinPolicy
|
|
if is_aws(dynamodb):
|
|
pytest.skip('Scylla-only CQL API not supported by AWS')
|
|
url = dynamodb.meta.client._endpoint.host
|
|
host, = re.search(r'.*://([^:]*):', url).groups()
|
|
profile = ExecutionProfile(
|
|
load_balancing_policy=RoundRobinPolicy(),
|
|
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
|
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
|
|
request_timeout=120)
|
|
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
|
|
contact_points=[host],
|
|
port=9042,
|
|
protocol_version=4,
|
|
auth_provider=PlainTextAuthProvider(username='cassandra', password='cassandra'),
|
|
connect_timeout=60,
|
|
control_connection_timeout=60
|
|
)
|
|
try:
|
|
ret = cluster.connect()
|
|
# "BEGIN BATCH APPLY BATCH" is the closest to do-nothing I could find
|
|
ret.execute("BEGIN BATCH APPLY BATCH")
|
|
except NoHostAvailable:
|
|
pytest.skip('Could not connect to Scylla-only CQL API')
|
|
yield ret
|
|
cluster.shutdown()
|