Compare commits
11 Commits
debug_form
...
scylla-6.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97ae704f99 | ||
|
|
738e4c3681 | ||
|
|
ee74fe4e0e | ||
|
|
b2ea946837 | ||
|
|
92e725c467 | ||
|
|
e57d48253f | ||
|
|
47df9f9b05 | ||
|
|
193dc87bd0 | ||
|
|
11d1950957 | ||
|
|
6317325ed5 | ||
|
|
14222ad205 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=6.1.0-dev
|
||||
VERSION=6.1.0-rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "alternator/executor.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "types/types.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
namespace alternator {
|
||||
@@ -31,11 +32,12 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
const column_definition* salted_hash_col = schema->get_column_definition(bytes("salted_hash"));
|
||||
if (!salted_hash_col) {
|
||||
const column_definition* can_login_col = schema->get_column_definition(bytes("can_login"));
|
||||
if (!salted_hash_col || !can_login_col) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Credentials cannot be fetched for: {}", username)));
|
||||
}
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options());
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col, can_login_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
|
||||
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
@@ -51,7 +53,14 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
if (result_set->empty()) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("User not found: {}", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result_set->rows().front().front(); // We only asked for 1 row and 1 column
|
||||
const auto& result = result_set->rows().front();
|
||||
bool can_login = result[1] && value_cast<bool>(boolean_type->deserialize(*result[1]));
|
||||
if (!can_login) {
|
||||
// This is a valid role name, but has "login=False" so should not be
|
||||
// usable for authentication (see #19735).
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Role {} has login=false so cannot be used for login", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result.front();
|
||||
if (!salted_hash) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("No password found for user: {}", username)));
|
||||
}
|
||||
|
||||
@@ -211,7 +211,10 @@ protected:
|
||||
sstring local_dc = topology.get_datacenter();
|
||||
std::unordered_set<gms::inet_address> local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc);
|
||||
for (auto& ip : local_dc_nodes) {
|
||||
if (_gossiper.is_alive(ip)) {
|
||||
// Note that it's not enough for the node to be is_alive() - a
|
||||
// node joining the cluster is also "alive" but not responsive to
|
||||
// requests. We need the node to be in normal state. See #19694.
|
||||
if (_gossiper.is_normal(ip)) {
|
||||
// Use the gossiped broadcast_rpc_address if available instead
|
||||
// of the internal IP address "ip". See discussion in #18711.
|
||||
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
#pragma once
|
||||
#include "seastar/core/semaphore.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "log.hh"
|
||||
#include "utils/digest_algorithm.hh"
|
||||
@@ -31,6 +32,7 @@ private:
|
||||
|
||||
class key_lock_map {
|
||||
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
|
||||
using semaphore_units = semaphore_units<semaphore_default_exception_factory, clock_type>;
|
||||
using map = std::unordered_map<dht::token, semaphore>;
|
||||
|
||||
semaphore& get_semaphore_for_key(const dht::token& key);
|
||||
@@ -46,22 +48,15 @@ private:
|
||||
key_lock_map& _map;
|
||||
dht::token _key;
|
||||
clock_type::time_point _timeout;
|
||||
bool _locked = false;
|
||||
key_lock_map::semaphore_units _units;
|
||||
public:
|
||||
future<> lock() {
|
||||
auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1);
|
||||
_locked = true;
|
||||
return f;
|
||||
future<> lock () {
|
||||
return get_units(_map.get_semaphore_for_key(_key), 1, _timeout).then([this] (auto&& u) { _units = std::move(u); });
|
||||
}
|
||||
guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {};
|
||||
guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) {
|
||||
o._locked = false;
|
||||
}
|
||||
guard(guard&& o) = default;
|
||||
~guard() {
|
||||
if (_locked) {
|
||||
_map.get_semaphore_for_key(_key).signal(1);
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -6283,6 +6283,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
|
||||
auto l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
|
||||
|
||||
co_await utils::get_local_injector().inject("cas_timeout_after_lock", write_timeout + std::chrono::milliseconds(100));
|
||||
|
||||
while (true) {
|
||||
// Finish the previous PAXOS round, if any, and, as a side effect, compute
|
||||
// a ballot (round identifier) which is a) unique b) has good chances of being
|
||||
|
||||
@@ -1782,6 +1782,8 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
|
||||
set_mode(mode::JOINING);
|
||||
|
||||
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
|
||||
|
||||
if (raft_server) { // Raft is enabled. Check if we need to bootstrap ourself using raft
|
||||
rtlogger.info("topology changes are using raft");
|
||||
|
||||
@@ -3820,6 +3822,8 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
// Step 3: Prepare to sync data
|
||||
ctl.prepare(node_ops_cmd::bootstrap_prepare).get();
|
||||
|
||||
utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20)).get();
|
||||
|
||||
// Step 5: Sync data for bootstrap
|
||||
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();
|
||||
on_streaming_finished();
|
||||
@@ -5497,6 +5501,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
|
||||
co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> {
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
|
||||
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
|
||||
|
||||
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens);
|
||||
} else {
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
|
||||
|
||||
@@ -2986,6 +2986,7 @@ sstable::unlink(storage::sync_dir sync) noexcept {
|
||||
|
||||
co_await std::move(remove_fut);
|
||||
_stats.on_delete();
|
||||
_manager.on_unlink(this);
|
||||
}
|
||||
|
||||
thread_local sstables_stats::stats sstables_stats::_shard_stats;
|
||||
|
||||
@@ -323,6 +323,11 @@ void sstables_manager::validate_new_keyspace_storage_options(const data_dictiona
|
||||
}, so.value);
|
||||
}
|
||||
|
||||
void sstables_manager::on_unlink(sstable* sst) {
|
||||
// Remove the sst from manager's reclaimed list to prevent any attempts to reload its components.
|
||||
_reclaimed.erase(*sst);
|
||||
}
|
||||
|
||||
sstables_registry::~sstables_registry() = default;
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -188,6 +188,9 @@ public:
|
||||
|
||||
void validate_new_keyspace_storage_options(const data_dictionary::storage_options&);
|
||||
|
||||
// To be called by the sstable to signal its unlinking
|
||||
void on_unlink(sstable* sst);
|
||||
|
||||
private:
|
||||
void add(sstable* sst);
|
||||
// Transition the sstable to the "inactive" state. It has no
|
||||
|
||||
126
test/alternator/test_cql_rbac.py
Normal file
126
test/alternator/test_cql_rbac.py
Normal file
@@ -0,0 +1,126 @@
|
||||
# Copyright 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
# Tests for how CQL's Role-Based Access Control (RBAC) commands - CREATE ROLE,
|
||||
# GRANT, REVOKE, etc., can be used on Alternator for authentication and for
|
||||
# authorization. For example if the low-level name of an Alternator table "x"
|
||||
# is alternator_x.x, and a certain user is not granted permission to "modify"
|
||||
# keyspace alternator_x, Alternator write requests (PutItem, UpdateItem,
|
||||
# DeleteItem, BatchWriteItem) by that user will be denied.
|
||||
#
|
||||
# Because this file is all about testing the Scylla-only CQL-based RBAC,
|
||||
# all tests in this file are skipped when running against Amazon DynamoDB.
|
||||
|
||||
import pytest
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from test.alternator.util import is_aws, unique_table_name
|
||||
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
|
||||
from cassandra.policies import RoundRobinPolicy
|
||||
import re
|
||||
|
||||
# This file is all about testing RBAC as configured via CQL, so we need to
|
||||
# connect to CQL to set these tests up. The "cql" fixture below enables that.
|
||||
# If we're not testing Scylla, or the CQL port is not available on the same
|
||||
# IP address as the Alternator IP address, a test using this fixture will
|
||||
# be skipped with a message about the CQL API not being available.
|
||||
@pytest.fixture(scope="module")
|
||||
def cql(dynamodb):
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only CQL API not supported by AWS')
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
host, = re.search(r'.*://([^:]*):', url).groups()
|
||||
profile = ExecutionProfile(
|
||||
load_balancing_policy=RoundRobinPolicy(),
|
||||
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
||||
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL)
|
||||
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
|
||||
contact_points=[host],
|
||||
port=9042,
|
||||
protocol_version=4,
|
||||
auth_provider=PlainTextAuthProvider(username='cassandra', password='cassandra'),
|
||||
)
|
||||
try:
|
||||
ret = cluster.connect()
|
||||
# "BEGIN BATCH APPLY BATCH" is the closest to do-nothing I could find
|
||||
ret.execute("BEGIN BATCH APPLY BATCH")
|
||||
except NoHostAvailable:
|
||||
pytest.skip('Could not connect to Scylla-only CQL API')
|
||||
yield ret
|
||||
cluster.shutdown()
|
||||
|
||||
# new_role() is a context manager for temporarily creating a new role with
|
||||
# a unique name and returning its name and the secret key needed to connect
|
||||
# to it with the DynamoDB API.
|
||||
# The "login" and "superuser" flags are passed to the CREATE ROLE statement.
|
||||
@contextmanager
|
||||
def new_role(cql, login=True, superuser=False):
|
||||
# The role name is not a table's name but it doesn't matter. Because our
|
||||
# unique_table_name() uses (deliberately) a non-lower-case character, the
|
||||
# role name has to be quoted in double quotes when used in CQL below.
|
||||
role = unique_table_name()
|
||||
# The password set for the new role is identical to the user name (not
|
||||
# very secure ;-)) - but we later need to retrieve the "salted hash" of
|
||||
# this password, which serves in Alternator as the secret key of the role.
|
||||
cql.execute(f"CREATE ROLE \"{role}\" WITH PASSWORD = '{role}' AND SUPERUSER = {superuser} AND LOGIN = {login}")
|
||||
# Newer Scylla places the "roles" table in the "system" keyspace, but
|
||||
# older versions used "system_auth_v2" or "system_auth"
|
||||
key = None
|
||||
for ks in ['system', 'system_auth_v2', 'system_auth']:
|
||||
try:
|
||||
e = list(cql.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{role}'"))
|
||||
if e != []:
|
||||
key = e[0].salted_hash
|
||||
if key is not None:
|
||||
break
|
||||
except:
|
||||
pass
|
||||
assert key is not None
|
||||
try:
|
||||
yield (role, key)
|
||||
finally:
|
||||
cql.execute(f'DROP ROLE "{role}"')
|
||||
|
||||
# Create a new DynamoDB API resource (connection object) similar to the
|
||||
# existing "dynamodb" resource - but authenticating with the given role
|
||||
# and key.
|
||||
@contextmanager
|
||||
def new_dynamodb(dynamodb, role, key):
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
config = dynamodb.meta.client._client_config
|
||||
verify = not url.startswith('https')
|
||||
ret = boto3.resource('dynamodb', endpoint_url=url, verify=verify,
|
||||
aws_access_key_id=role, aws_secret_access_key=key,
|
||||
region_name='us-east-1', config=config)
|
||||
try:
|
||||
yield ret
|
||||
finally:
|
||||
ret.meta.client.close()
|
||||
|
||||
# A basic test for creating a new role. The ListTables operation is allowed
|
||||
# to any role, so it should work in the new role when given the right password
|
||||
# and fail with the wrong password.
|
||||
def test_new_role(dynamodb, cql):
|
||||
with new_role(cql) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
# ListTables should not fail (we don't care what is the result)
|
||||
d.meta.client.list_tables()
|
||||
# Trying to use the wrong key for the new role should fail to perform
|
||||
# any request. The new_dynamodb() function can't detect the error,
|
||||
# it is detected when attempting to perform a request with it.
|
||||
with new_dynamodb(dynamodb, role, 'wrongkey') as d:
|
||||
with pytest.raises(ClientError, match='UnrecognizedClientException'):
|
||||
d.meta.client.list_tables()
|
||||
|
||||
# A role without "login" permissions cannot be used to authenticate requests.
|
||||
# Reproduces #19735.
|
||||
def test_login_false(dynamodb, cql):
|
||||
with new_role(cql, login=False) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
with pytest.raises(ClientError, match='UnrecognizedClientException.*login=false'):
|
||||
d.meta.client.list_tables()
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
#include "utils/bloom_filter.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components_and_reload_reclaimed_components) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
@@ -52,6 +53,11 @@ std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env
|
||||
return {sst, sst_bf_memory};
|
||||
}
|
||||
|
||||
void dispose_and_stop_tracking_bf_memory(shared_sstable&& sst, test_env_sstables_manager& mgr) {
|
||||
mgr.remove_sst_from_reclaimed(sst.get());
|
||||
shared_sstable::dispose(sst.release().release());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
@@ -89,7 +95,7 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter)
|
||||
|
||||
// Test auto reload - disposing sst3 should trigger reload of the
|
||||
// smallest filter in the reclaimed list, which is sst1's bloom filter.
|
||||
shared_sstable::dispose(sst3.release().release());
|
||||
dispose_and_stop_tracking_bf_memory(std::move(sst3), sst_mgr);
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
|
||||
// only sst4's bloom filter memory should be reported as reclaimed
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst4_bf_memory);
|
||||
@@ -154,7 +160,7 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
|
||||
utils::get_local_injector().enable("reload_reclaimed_components/pause", true);
|
||||
|
||||
// dispose sst2 to trigger reload of sst1's bloom filter
|
||||
shared_sstable::dispose(sst2.release().release());
|
||||
dispose_and_stop_tracking_bf_memory(std::move(sst2), sst_mgr);
|
||||
// _total_reclaimable_memory will be updated when the reload begins; wait for it.
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
|
||||
|
||||
@@ -223,3 +229,57 @@ SEASTAR_TEST_CASE(test_bloom_filters_with_bad_partition_estimate) {
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_bloom_filter_reload_after_unlink) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
||||
return;
|
||||
#endif
|
||||
simple_schema ss;
|
||||
auto schema = ss.schema();
|
||||
|
||||
auto mut = mutation(schema, ss.make_pkey(1));
|
||||
mut.partition().apply_insert(*schema, ss.make_ckey(1), ss.new_timestamp());
|
||||
|
||||
// bloom filter will be reclaimed automatically due to low memory
|
||||
auto sst = make_sstable_containing(env.make_sstable(schema), {mut});
|
||||
auto& sst_mgr = env.manager();
|
||||
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0);
|
||||
auto memory_reclaimed = sst_mgr.get_total_memory_reclaimed();
|
||||
|
||||
// manager's reclaimed set has the sst now
|
||||
auto& reclaimed_set = sst_mgr.get_reclaimed_set();
|
||||
BOOST_REQUIRE_EQUAL(reclaimed_set.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(reclaimed_set.begin()->get_filename(), sst->get_filename());
|
||||
|
||||
// hold a copy of shared sst object in async thread to test reload after unlink
|
||||
utils::get_local_injector().enable("test_bloom_filter_reload_after_unlink");
|
||||
auto async_sst_holder = seastar::async([sst] {
|
||||
// do nothing just hold a copy of sst and wait for message signalling test completion
|
||||
utils::get_local_injector().inject("test_bloom_filter_reload_after_unlink", [] (auto& handler) {
|
||||
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
|
||||
return ret;
|
||||
}).get();
|
||||
});
|
||||
|
||||
// unlink the sst and release the object
|
||||
sst->unlink().get();
|
||||
sst.release();
|
||||
|
||||
// reclaimed set should be now empty but the total memory reclaimed should
|
||||
// be still the same as the sst object is not deactivated yet due to a copy
|
||||
// being alive in the async thread.
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_reclaimed_set().size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), memory_reclaimed);
|
||||
|
||||
// message async thread to complete waiting and thus release its copy of sst, triggering deactivation
|
||||
utils::get_local_injector().receive_message("test_bloom_filter_reload_after_unlink");
|
||||
async_sst_holder.get();
|
||||
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
|
||||
}, {
|
||||
// set available memory = 0 to force reclaim the bloom filter
|
||||
.available_memory = 0
|
||||
});
|
||||
};
|
||||
|
||||
@@ -44,5 +44,7 @@ custom_args:
|
||||
- '-c1 -m256M'
|
||||
commitlog_cleanup_test:
|
||||
- '-c1 -m2G'
|
||||
bloom_filter_test:
|
||||
- '-c1'
|
||||
run_in_debug:
|
||||
- logalloc_standard_allocator_segment_pool_backend_test
|
||||
|
||||
@@ -57,6 +57,14 @@ public:
|
||||
size_t get_total_reclaimable_memory() {
|
||||
return _total_reclaimable_memory;
|
||||
}
|
||||
|
||||
void remove_sst_from_reclaimed(sstable* sst) {
|
||||
_reclaimed.erase(*sst);
|
||||
}
|
||||
|
||||
auto& get_reclaimed_set() {
|
||||
return _reclaimed;
|
||||
}
|
||||
};
|
||||
|
||||
class test_env_compaction_manager {
|
||||
|
||||
37
test/topology_custom/test_lwt_semaphore.py
Normal file
37
test/topology_custom/test_lwt_semaphore.py
Normal file
@@ -0,0 +1,37 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
import pytest
|
||||
from cassandra.protocol import WriteTimeout
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cas_semaphore(manager):
|
||||
""" This is a regression test for scylladb/scylladb#19698 """
|
||||
servers = await manager.servers_add(1, cmdline=['--smp', '1', '--write-request-timeout-in-ms', '500'])
|
||||
|
||||
host = await wait_for_cql_and_get_hosts(manager.cql, {servers[0]}, time.time() + 60)
|
||||
|
||||
await manager.cql.run_async("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
||||
await manager.cql.run_async("CREATE TABLE test.test (a int PRIMARY KEY, b int)")
|
||||
|
||||
async with inject_error(manager.api, servers[0].ip_addr, 'cas_timeout_after_lock'):
|
||||
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
|
||||
try:
|
||||
await asyncio.gather(*res)
|
||||
except WriteTimeout:
|
||||
pass
|
||||
|
||||
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
|
||||
await asyncio.gather(*res)
|
||||
|
||||
metrics = await manager.metrics.query(servers[0].ip_addr)
|
||||
contention = metrics.get(name="scylla_storage_proxy_coordinator_cas_write_contention_count")
|
||||
|
||||
assert contention == None
|
||||
@@ -12,8 +12,8 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import reconnect_driver, enter_recovery_state, wait_for_upgrade_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
from test.topology.util import (delete_raft_data_and_upgrade_state, enter_recovery_state, log_run_time,
|
||||
reconnect_driver, wait_for_upgrade_state, wait_until_upgrade_finishes)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -77,8 +77,7 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {others}")
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
logging.info(f"{others} restarted, waiting until driver reconnects to them")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
@@ -100,11 +99,11 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
logging.info(f"Removing {srv1} using {others[0]}")
|
||||
await manager.remove_node(others[0].server_id, srv1.server_id)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts} and restarting")
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
logging.info(f"Restarting {others}")
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
|
||||
@@ -23,6 +23,8 @@ import requests
|
||||
import json
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
from test.topology.conftest import skip_mode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -206,13 +208,102 @@ async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
|
||||
}
|
||||
servers = await manager.servers_add(2, config=config)
|
||||
for server in servers:
|
||||
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
|
||||
response = requests.get(url, verify=False)
|
||||
assert response.ok
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
# We expect /localnodes to return ["1.2.3.4", "1.2.3.4"]
|
||||
# (since we configured both nodes with the same broadcast_rpc_address):
|
||||
assert j == ['1.2.3.4', '1.2.3.4']
|
||||
# (since we configured both nodes with the same broadcast_rpc_address).
|
||||
# We need the retry loop below because the second node might take a
|
||||
# bit of time to bootstrap after coming up, and only then will it
|
||||
# appear on /localnodes (see #19694).
|
||||
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
|
||||
timeout = time.time() + 10
|
||||
while True:
|
||||
assert time.time() < timeout
|
||||
response = requests.get(url, verify=False)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if j == ['1.2.3.4', '1.2.3.4']:
|
||||
break # done
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_localnodes_drained_node(manager: ManagerClient):
|
||||
"""Test that if in a cluster one node is brought down with "nodetool drain"
|
||||
a "/localnodes" request should NOT return that node. This test does
|
||||
NOT reproduce issue #19694 - a DRAINED node is not considered is_alive()
|
||||
and even before the fix of that issue, "/localnodes" didn't return it.
|
||||
"""
|
||||
# Start a cluster with two nodes and verify that at this point,
|
||||
# "/localnodes" on the first node returns both nodes.
|
||||
# We the retry loop below because the second node might take a
|
||||
# bit of time to bootstrap after coming up, and only then will it
|
||||
# appear on /localnodes (see #19694).
|
||||
servers = await manager.servers_add(2, config=alternator_config)
|
||||
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
||||
async def check_localnodes_two():
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
||||
return True
|
||||
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
|
||||
return None # try again
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_localnodes_two, time.time() + 10)
|
||||
# Now "nodetool" drain on the second node, leaving the second node
|
||||
# in DRAINED state.
|
||||
await manager.api.client.post("/storage_service/drain", host=servers[1].ip_addr)
|
||||
# After that, "/localnodes" should no longer return the second node.
|
||||
# It might take a short while until the first node learns what happened
|
||||
# to node 1, so we may need to retry for a while
|
||||
async def check_localnodes_one():
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
||||
return None # try again
|
||||
elif set(j) == {servers[0].ip_addr}:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_localnodes_one, time.time() + 10)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_localnodes_joining_nodes(manager: ManagerClient):
|
||||
"""Test that if a cluster is being enlarged and a node is coming up but
|
||||
not yet responsive, a "/localnodes" request should NOT return that node.
|
||||
Reproduces issue #19694.
|
||||
"""
|
||||
# Start a cluster with one node, and then bring up a second node,
|
||||
# pausing its bootstrap (with an injection) in JOINING state.
|
||||
# We need to start the second node in the background, because server_add()
|
||||
# will wait for the bootstrap to complete - which we don't want to do.
|
||||
server = await manager.server_add(config=alternator_config)
|
||||
task = asyncio.create_task(manager.server_add(config=alternator_config | {'error_injections_at_startup': ['delay_bootstrap_20s']}))
|
||||
# Sleep until the first node knows of the second one as a "live node"
|
||||
# (we check this with the REST API's /gossiper/endpoint/live.
|
||||
async def check_two_live_nodes():
|
||||
j = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
|
||||
if len(j) == 1:
|
||||
return None # try again
|
||||
elif len(j) == 2:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_two_live_nodes, time.time() + 10)
|
||||
|
||||
# At this point the second node is live, but hasn't finished bootstrapping
|
||||
# (we delayed that with the injection). So the "/localnodes" should still
|
||||
# return just one node - not both. Reproduces #19694 (two nodes used to
|
||||
# be returned)
|
||||
localnodes_request = f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
assert len(j) == 1
|
||||
# Ending the test here will kill both servers. We don't wait for the
|
||||
# second server to finish its long injection-caused bootstrap delay,
|
||||
# so we don't check here that when the second server finally comes up,
|
||||
# both nodes will finally be visible in /localnodes. This case is checked
|
||||
# in other tests, where bootstrap finishes normally - we don't need to
|
||||
# check this case again here.
|
||||
task.cancel()
|
||||
|
||||
# TODO: add a more thorough test for /localnodes, creating a cluster with
|
||||
# multiple nodes in multiple data centers, and check that we can get a list
|
||||
|
||||
Reference in New Issue
Block a user