Compare commits

...

11 Commits

Author SHA1 Message Date
Nadav Har'El
97ae704f99 alternator: do not allow authentication with a non-"login" role
Alternator allows authentication into the existing CQL roles, but
roles which have the flag "login=false" should be refused in
authentication, and this patch adds the missing check.

The patch also adds a regression test for this feature in the
test/alternator test framework, in a new test file
test/alternator/cql_rbac.py. This test file will later include more
tests of how the CQL RBAC commands (CREATE ROLE, GRANT, REVOKE)
affect authentication and authorization in Alternator.
In particular, these tests need to use not just the DynamoDB API but
also CQL, so this new test file includes the "cql" fixture that allows
us to run CQL commands, to create roles, to retrieve their secret keys,
and so on.

Fixes #19735

(cherry picked from commit 14cd7b5095)

Closes scylladb/scylladb#19863
2024-07-25 12:45:27 +03:00
Nadav Har'El
738e4c3681 alternator: fix "/localnodes" to not return nodes still joining
Alternator's "/localnodes" HTTP request is supposed to return the list of
nodes in the local DC to which the user can send requests.

The existing implementation incorrectly used gossiper::is_alive() to check
for which nodes to return - but "alive" nodes include nodes which are still
joining the cluster and not really usable. These nodes can remain in the
JOINING state for a long time while they are copying data, and an attempt
to send requests to them will fail.

The fix for this bug is trivial: change the call to is_alive() to a call
to is_normal().

But the hard part of this test is the testing:

1. An existing multi-node test for "/localnodes" assummed that right after
   a new node was created, it appears on "/localnodes". But after this
   patch, it may take a bit more time for the bootstrapping to complete
   and the new node to appear in /localnodes - so I had to add a retry loop.

2. I added a test that reproduces the bug fixed here, and verifies its
   fix. The test is in the multi-node topology framework. It adds an
   injection which delays the bootstrap, which leaves a new node in JOINING
   state for a long time. The test then verifies that the new node is
   alive (as checked by the REST API), but is not returned by "/localnodes".

3. The new injection for delaying the bootstrap is unfortunately not
   very pretty - I had to do it in three places because we have several
   code paths of how bootstrap works without repair, with repair, without
   Raft and with Raft - and I wanted to delay all of them.

Fixes #19694.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 0d1aa399f9)

Closes scylladb/scylladb#19855
2024-07-24 11:04:54 +03:00
Lakshmi Narayanan Sreethar
ee74fe4e0e [Backport 6.1] sstables: do not reload components of unlinked sstables
The SSTable is removed from the reclaimed memory tracking logic only
when its object is deleted. However, there is a risk that the Bloom
filter reloader may attempt to reload the SSTable after it has been
unlinked but before the SSTable object is destroyed. Prevent this by
removing the SSTable from the reclaimed list maintained by the manager
as soon as it is unlinked.

The original logic that updated the memory tracking in
`sstables_manager::deactivate()` is left in place as (a) the variables
have to be updated only when the SSTable object is actually deleted, as
the memory used by the filter is not freed as long as the SSTable is
alive, and (b) the `_reclaimed.erase(*sst)` is still useful during
shutdown, for example, when the SSTable is not unlinked but just
destroyed.

Fixes https://github.com/scylladb/scylladb/issues/19722

Closes scylladb/scylladb#19717

* github.com:scylladb/scylladb:
  boost/bloom_filter_test: add testcase to verify unlinked sstables are not reloaded
  sstables: do not reload components of unlinked sstables
  sstables/sstables_manager: introduce on_unlink method

(cherry picked from commit 591876b44e)

Backported from #19717 to 6.1

Closes scylladb/scylladb#19828
2024-07-24 09:03:52 +03:00
Jenkins Promoter
b2ea946837 Update ScyllaDB version to: 6.1.0-rc1 2024-07-23 10:33:48 +03:00
Avi Kivity
92e725c467 Merge '[Backport 6.1] Fix lwt semaphore guard accounting' from ScyllaDB
Currently the guard does not account correctly for ongoing operation if semaphore acquisition fails. It may signal a semaphore when it is not held.

Should be backported to all supported versions.

(cherry picked from commit 87beebeed0)

(cherry picked from commit 4178589826)

 Refs #19699

Closes scylladb/scylladb#19819

* github.com:scylladb/scylladb:
  test: add test to check that coordinator lwt semaphore continues functioning after locking failures
  paxos: do not signal semaphore if it was not acquired
2024-07-22 17:41:30 +03:00
Kamil Braun
e57d48253f Merge '[Backport 6.1] test: raft: fix the flaky test_raft_recovery_stuck' from ScyllaDB
Use the rolling restart to avoid spurious driver reconnects.

This can be eventually reverted once the scylladb/python-driver#295 is fixed.

Fixes scylladb/scylladb#19154

(cherry picked from commit ef3393bd36)

(cherry picked from commit a89facbc74)

 Refs #19771

Closes scylladb/scylladb#19820

* github.com:scylladb/scylladb:
  test: raft: fix the flaky `test_raft_recovery_stuck`
  test: raft: code cleanup in `test_raft_recovery_stuck`
2024-07-22 14:12:26 +02:00
Emil Maskovsky
47df9f9b05 test: raft: fix the flaky test_raft_recovery_stuck
Use the rolling restart to avoid spurious driver reconnects.

This can be eventually reverted once the scylladb/python-driver#295 is
fixed.

Fixes scylladb/scylladb#19154

(cherry picked from commit a89facbc74)
2024-07-22 09:17:05 +00:00
Emil Maskovsky
193dc87bd0 test: raft: code cleanup in test_raft_recovery_stuck
Cleaning up the imports.

(cherry picked from commit ef3393bd36)
2024-07-22 09:17:04 +00:00
Gleb Natapov
11d1950957 test: add test to check that coordinator lwt semaphore continues functioning after locking failures
(cherry picked from commit 4178589826)
2024-07-22 09:01:34 +00:00
Gleb Natapov
6317325ed5 paxos: do not signal semaphore if it was not acquired
The guard signals a semaphore during destruction if it is marked as
locked, but currently it may be marked as locked even if locking failed.
Fix this by using semaphore_units instead of managing the locked flag
manually.

Fixes: https://github.com/scylladb/scylladb/issues/19698
(cherry picked from commit 87beebeed0)
2024-07-22 09:01:34 +00:00
Anna Mikhlin
14222ad205 Update ScyllaDB version to: 6.1.0-rc0 2024-07-18 16:05:23 +03:00
16 changed files with 380 additions and 33 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=6.1.0-dev
VERSION=6.1.0-rc1
if test -f version
then

View File

@@ -19,6 +19,7 @@
#include "alternator/executor.hh"
#include "cql3/selection/selection.hh"
#include "cql3/result_set.hh"
#include "types/types.hh"
#include <seastar/core/coroutine.hh>
namespace alternator {
@@ -31,11 +32,12 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
const column_definition* salted_hash_col = schema->get_column_definition(bytes("salted_hash"));
if (!salted_hash_col) {
const column_definition* can_login_col = schema->get_column_definition(bytes("can_login"));
if (!salted_hash_col || !can_login_col) {
co_await coroutine::return_exception(api_error::unrecognized_client(format("Credentials cannot be fetched for: {}", username)));
}
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col});
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options());
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col, can_login_col});
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
auto cl = auth::password_authenticator::consistency_for_user(username);
@@ -51,7 +53,14 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
if (result_set->empty()) {
co_await coroutine::return_exception(api_error::unrecognized_client(format("User not found: {}", username)));
}
const managed_bytes_opt& salted_hash = result_set->rows().front().front(); // We only asked for 1 row and 1 column
const auto& result = result_set->rows().front();
bool can_login = result[1] && value_cast<bool>(boolean_type->deserialize(*result[1]));
if (!can_login) {
// This is a valid role name, but has "login=False" so should not be
// usable for authentication (see #19735).
co_await coroutine::return_exception(api_error::unrecognized_client(format("Role {} has login=false so cannot be used for login", username)));
}
const managed_bytes_opt& salted_hash = result.front();
if (!salted_hash) {
co_await coroutine::return_exception(api_error::unrecognized_client(format("No password found for user: {}", username)));
}

View File

@@ -211,7 +211,10 @@ protected:
sstring local_dc = topology.get_datacenter();
std::unordered_set<gms::inet_address> local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc);
for (auto& ip : local_dc_nodes) {
if (_gossiper.is_alive(ip)) {
// Note that it's not enough for the node to be is_alive() - a
// node joining the cluster is also "alive" but not responsive to
// requests. We need the node to be in normal state. See #19694.
if (_gossiper.is_normal(ip)) {
// Use the gossiped broadcast_rpc_address if available instead
// of the internal IP address "ip". See discussion in #18711.
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));

View File

@@ -7,6 +7,7 @@
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include "seastar/core/semaphore.hh"
#include "service/paxos/proposal.hh"
#include "log.hh"
#include "utils/digest_algorithm.hh"
@@ -31,6 +32,7 @@ private:
class key_lock_map {
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
using semaphore_units = semaphore_units<semaphore_default_exception_factory, clock_type>;
using map = std::unordered_map<dht::token, semaphore>;
semaphore& get_semaphore_for_key(const dht::token& key);
@@ -46,22 +48,15 @@ private:
key_lock_map& _map;
dht::token _key;
clock_type::time_point _timeout;
bool _locked = false;
key_lock_map::semaphore_units _units;
public:
future<> lock() {
auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1);
_locked = true;
return f;
future<> lock () {
return get_units(_map.get_semaphore_for_key(_key), 1, _timeout).then([this] (auto&& u) { _units = std::move(u); });
}
guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {};
guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) {
o._locked = false;
}
guard(guard&& o) = default;
~guard() {
if (_locked) {
_map.get_semaphore_for_key(_key).signal(1);
_map.release_semaphore_for_key(_key);
}
_map.release_semaphore_for_key(_key);
}
};

View File

@@ -6283,6 +6283,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
auto l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
co_await utils::get_local_injector().inject("cas_timeout_after_lock", write_timeout + std::chrono::milliseconds(100));
while (true) {
// Finish the previous PAXOS round, if any, and, as a side effect, compute
// a ballot (round identifier) which is a) unique b) has good chances of being

View File

@@ -1782,6 +1782,8 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
set_mode(mode::JOINING);
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
if (raft_server) { // Raft is enabled. Check if we need to bootstrap ourself using raft
rtlogger.info("topology changes are using raft");
@@ -3820,6 +3822,8 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
// Step 3: Prepare to sync data
ctl.prepare(node_ops_cmd::bootstrap_prepare).get();
utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20)).get();
// Step 5: Sync data for bootstrap
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();
on_streaming_finished();
@@ -5497,6 +5501,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens);
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),

View File

@@ -2986,6 +2986,7 @@ sstable::unlink(storage::sync_dir sync) noexcept {
co_await std::move(remove_fut);
_stats.on_delete();
_manager.on_unlink(this);
}
thread_local sstables_stats::stats sstables_stats::_shard_stats;

View File

@@ -323,6 +323,11 @@ void sstables_manager::validate_new_keyspace_storage_options(const data_dictiona
}, so.value);
}
void sstables_manager::on_unlink(sstable* sst) {
// Remove the sst from manager's reclaimed list to prevent any attempts to reload its components.
_reclaimed.erase(*sst);
}
sstables_registry::~sstables_registry() = default;
} // namespace sstables

View File

@@ -188,6 +188,9 @@ public:
void validate_new_keyspace_storage_options(const data_dictionary::storage_options&);
// To be called by the sstable to signal its unlinking
void on_unlink(sstable* sst);
private:
void add(sstable* sst);
// Transition the sstable to the "inactive" state. It has no

View File

@@ -0,0 +1,126 @@
# Copyright 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
# Tests for how CQL's Role-Based Access Control (RBAC) commands - CREATE ROLE,
# GRANT, REVOKE, etc., can be used on Alternator for authentication and for
# authorization. For example if the low-level name of an Alternator table "x"
# is alternator_x.x, and a certain user is not granted permission to "modify"
# keyspace alternator_x, Alternator write requests (PutItem, UpdateItem,
# DeleteItem, BatchWriteItem) by that user will be denied.
#
# Because this file is all about testing the Scylla-only CQL-based RBAC,
# all tests in this file are skipped when running against Amazon DynamoDB.
import pytest
import boto3
from botocore.exceptions import ClientError
import time
from contextlib import contextmanager
from test.alternator.util import is_aws, unique_table_name
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
from cassandra.policies import RoundRobinPolicy
import re
# This file is all about testing RBAC as configured via CQL, so we need to
# connect to CQL to set these tests up. The "cql" fixture below enables that.
# If we're not testing Scylla, or the CQL port is not available on the same
# IP address as the Alternator IP address, a test using this fixture will
# be skipped with a message about the CQL API not being available.
@pytest.fixture(scope="module")
def cql(dynamodb):
if is_aws(dynamodb):
pytest.skip('Scylla-only CQL API not supported by AWS')
url = dynamodb.meta.client._endpoint.host
host, = re.search(r'.*://([^:]*):', url).groups()
profile = ExecutionProfile(
load_balancing_policy=RoundRobinPolicy(),
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL)
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
contact_points=[host],
port=9042,
protocol_version=4,
auth_provider=PlainTextAuthProvider(username='cassandra', password='cassandra'),
)
try:
ret = cluster.connect()
# "BEGIN BATCH APPLY BATCH" is the closest to do-nothing I could find
ret.execute("BEGIN BATCH APPLY BATCH")
except NoHostAvailable:
pytest.skip('Could not connect to Scylla-only CQL API')
yield ret
cluster.shutdown()
# new_role() is a context manager for temporarily creating a new role with
# a unique name and returning its name and the secret key needed to connect
# to it with the DynamoDB API.
# The "login" and "superuser" flags are passed to the CREATE ROLE statement.
@contextmanager
def new_role(cql, login=True, superuser=False):
# The role name is not a table's name but it doesn't matter. Because our
# unique_table_name() uses (deliberately) a non-lower-case character, the
# role name has to be quoted in double quotes when used in CQL below.
role = unique_table_name()
# The password set for the new role is identical to the user name (not
# very secure ;-)) - but we later need to retrieve the "salted hash" of
# this password, which serves in Alternator as the secret key of the role.
cql.execute(f"CREATE ROLE \"{role}\" WITH PASSWORD = '{role}' AND SUPERUSER = {superuser} AND LOGIN = {login}")
# Newer Scylla places the "roles" table in the "system" keyspace, but
# older versions used "system_auth_v2" or "system_auth"
key = None
for ks in ['system', 'system_auth_v2', 'system_auth']:
try:
e = list(cql.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{role}'"))
if e != []:
key = e[0].salted_hash
if key is not None:
break
except:
pass
assert key is not None
try:
yield (role, key)
finally:
cql.execute(f'DROP ROLE "{role}"')
# Create a new DynamoDB API resource (connection object) similar to the
# existing "dynamodb" resource - but authenticating with the given role
# and key.
@contextmanager
def new_dynamodb(dynamodb, role, key):
url = dynamodb.meta.client._endpoint.host
config = dynamodb.meta.client._client_config
verify = not url.startswith('https')
ret = boto3.resource('dynamodb', endpoint_url=url, verify=verify,
aws_access_key_id=role, aws_secret_access_key=key,
region_name='us-east-1', config=config)
try:
yield ret
finally:
ret.meta.client.close()
# A basic test for creating a new role. The ListTables operation is allowed
# to any role, so it should work in the new role when given the right password
# and fail with the wrong password.
def test_new_role(dynamodb, cql):
with new_role(cql) as (role, key):
with new_dynamodb(dynamodb, role, key) as d:
# ListTables should not fail (we don't care what is the result)
d.meta.client.list_tables()
# Trying to use the wrong key for the new role should fail to perform
# any request. The new_dynamodb() function can't detect the error,
# it is detected when attempting to perform a request with it.
with new_dynamodb(dynamodb, role, 'wrongkey') as d:
with pytest.raises(ClientError, match='UnrecognizedClientException'):
d.meta.client.list_tables()
# A role without "login" permissions cannot be used to authenticate requests.
# Reproduces #19735.
def test_login_false(dynamodb, cql):
with new_role(cql, login=False) as (role, key):
with new_dynamodb(dynamodb, role, key) as d:
with pytest.raises(ClientError, match='UnrecognizedClientException.*login=false'):
d.meta.client.list_tables()

View File

@@ -15,6 +15,7 @@
#include "readers/from_mutations_v2.hh"
#include "utils/bloom_filter.hh"
#include "utils/error_injection.hh"
SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components_and_reload_reclaimed_components) {
return test_env::do_with_async([] (test_env& env) {
@@ -52,6 +53,11 @@ std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env
return {sst, sst_bf_memory};
}
void dispose_and_stop_tracking_bf_memory(shared_sstable&& sst, test_env_sstables_manager& mgr) {
mgr.remove_sst_from_reclaimed(sst.get());
shared_sstable::dispose(sst.release().release());
}
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
@@ -89,7 +95,7 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter)
// Test auto reload - disposing sst3 should trigger reload of the
// smallest filter in the reclaimed list, which is sst1's bloom filter.
shared_sstable::dispose(sst3.release().release());
dispose_and_stop_tracking_bf_memory(std::move(sst3), sst_mgr);
REQUIRE_EVENTUALLY_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
// only sst4's bloom filter memory should be reported as reclaimed
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst4_bf_memory);
@@ -154,7 +160,7 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
utils::get_local_injector().enable("reload_reclaimed_components/pause", true);
// dispose sst2 to trigger reload of sst1's bloom filter
shared_sstable::dispose(sst2.release().release());
dispose_and_stop_tracking_bf_memory(std::move(sst2), sst_mgr);
// _total_reclaimable_memory will be updated when the reload begins; wait for it.
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
@@ -223,3 +229,57 @@ SEASTAR_TEST_CASE(test_bloom_filters_with_bad_partition_estimate) {
}
});
};
SEASTAR_TEST_CASE(test_bloom_filter_reload_after_unlink) {
return test_env::do_with_async([] (test_env& env) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return;
#endif
simple_schema ss;
auto schema = ss.schema();
auto mut = mutation(schema, ss.make_pkey(1));
mut.partition().apply_insert(*schema, ss.make_ckey(1), ss.new_timestamp());
// bloom filter will be reclaimed automatically due to low memory
auto sst = make_sstable_containing(env.make_sstable(schema), {mut});
auto& sst_mgr = env.manager();
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0);
auto memory_reclaimed = sst_mgr.get_total_memory_reclaimed();
// manager's reclaimed set has the sst now
auto& reclaimed_set = sst_mgr.get_reclaimed_set();
BOOST_REQUIRE_EQUAL(reclaimed_set.size(), 1);
BOOST_REQUIRE_EQUAL(reclaimed_set.begin()->get_filename(), sst->get_filename());
// hold a copy of shared sst object in async thread to test reload after unlink
utils::get_local_injector().enable("test_bloom_filter_reload_after_unlink");
auto async_sst_holder = seastar::async([sst] {
// do nothing just hold a copy of sst and wait for message signalling test completion
utils::get_local_injector().inject("test_bloom_filter_reload_after_unlink", [] (auto& handler) {
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
return ret;
}).get();
});
// unlink the sst and release the object
sst->unlink().get();
sst.release();
// reclaimed set should be now empty but the total memory reclaimed should
// be still the same as the sst object is not deactivated yet due to a copy
// being alive in the async thread.
BOOST_REQUIRE_EQUAL(sst_mgr.get_reclaimed_set().size(), 0);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), memory_reclaimed);
// message async thread to complete waiting and thus release its copy of sst, triggering deactivation
utils::get_local_injector().receive_message("test_bloom_filter_reload_after_unlink");
async_sst_holder.get();
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
}, {
// set available memory = 0 to force reclaim the bloom filter
.available_memory = 0
});
};

View File

@@ -44,5 +44,7 @@ custom_args:
- '-c1 -m256M'
commitlog_cleanup_test:
- '-c1 -m2G'
bloom_filter_test:
- '-c1'
run_in_debug:
- logalloc_standard_allocator_segment_pool_backend_test

View File

@@ -57,6 +57,14 @@ public:
size_t get_total_reclaimable_memory() {
return _total_reclaimable_memory;
}
void remove_sst_from_reclaimed(sstable* sst) {
_reclaimed.erase(*sst);
}
auto& get_reclaimed_set() {
return _reclaimed;
}
};
class test_env_compaction_manager {

View File

@@ -0,0 +1,37 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import time
from test.pylib.rest_client import inject_error
from test.pylib.util import wait_for_cql_and_get_hosts
import pytest
from cassandra.protocol import WriteTimeout
@pytest.mark.asyncio
async def test_cas_semaphore(manager):
""" This is a regression test for scylladb/scylladb#19698 """
servers = await manager.servers_add(1, cmdline=['--smp', '1', '--write-request-timeout-in-ms', '500'])
host = await wait_for_cql_and_get_hosts(manager.cql, {servers[0]}, time.time() + 60)
await manager.cql.run_async("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
await manager.cql.run_async("CREATE TABLE test.test (a int PRIMARY KEY, b int)")
async with inject_error(manager.api, servers[0].ip_addr, 'cas_timeout_after_lock'):
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
try:
await asyncio.gather(*res)
except WriteTimeout:
pass
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
await asyncio.gather(*res)
metrics = await manager.metrics.query(servers[0].ip_addr)
contention = metrics.get(name="scylla_storage_proxy_coordinator_cas_write_contention_count")
assert contention == None

View File

@@ -12,8 +12,8 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
from test.topology.conftest import skip_mode
from test.topology.util import reconnect_driver, enter_recovery_state, wait_for_upgrade_state, \
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
from test.topology.util import (delete_raft_data_and_upgrade_state, enter_recovery_state, log_run_time,
reconnect_driver, wait_for_upgrade_state, wait_until_upgrade_finishes)
@pytest.mark.asyncio
@@ -77,8 +77,7 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
logging.info(f"Restarting {others}")
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
cql = await reconnect_driver(manager)
await manager.rolling_restart(others)
logging.info(f"{others} restarted, waiting until driver reconnects to them")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
@@ -100,11 +99,11 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
logging.info(f"Removing {srv1} using {others[0]}")
await manager.remove_node(others[0].server_id, srv1.server_id)
logging.info(f"Deleting Raft data and upgrade state on {hosts} and restarting")
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
cql = await reconnect_driver(manager)
logging.info(f"Restarting {others}")
await manager.rolling_restart(others)
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)

View File

@@ -23,6 +23,8 @@ import requests
import json
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.topology.conftest import skip_mode
logger = logging.getLogger(__name__)
@@ -206,13 +208,102 @@ async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
}
servers = await manager.servers_add(2, config=config)
for server in servers:
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
response = requests.get(url, verify=False)
assert response.ok
j = json.loads(response.content.decode('utf-8'))
# We expect /localnodes to return ["1.2.3.4", "1.2.3.4"]
# (since we configured both nodes with the same broadcast_rpc_address):
assert j == ['1.2.3.4', '1.2.3.4']
# (since we configured both nodes with the same broadcast_rpc_address).
# We need the retry loop below because the second node might take a
# bit of time to bootstrap after coming up, and only then will it
# appear on /localnodes (see #19694).
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
timeout = time.time() + 10
while True:
assert time.time() < timeout
response = requests.get(url, verify=False)
j = json.loads(response.content.decode('utf-8'))
if j == ['1.2.3.4', '1.2.3.4']:
break # done
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_localnodes_drained_node(manager: ManagerClient):
"""Test that if in a cluster one node is brought down with "nodetool drain"
a "/localnodes" request should NOT return that node. This test does
NOT reproduce issue #19694 - a DRAINED node is not considered is_alive()
and even before the fix of that issue, "/localnodes" didn't return it.
"""
# Start a cluster with two nodes and verify that at this point,
# "/localnodes" on the first node returns both nodes.
# We the retry loop below because the second node might take a
# bit of time to bootstrap after coming up, and only then will it
# appear on /localnodes (see #19694).
servers = await manager.servers_add(2, config=alternator_config)
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
async def check_localnodes_two():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return True
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
return None # try again
else:
return False
assert await wait_for(check_localnodes_two, time.time() + 10)
# Now "nodetool" drain on the second node, leaving the second node
# in DRAINED state.
await manager.api.client.post("/storage_service/drain", host=servers[1].ip_addr)
# After that, "/localnodes" should no longer return the second node.
# It might take a short while until the first node learns what happened
# to node 1, so we may need to retry for a while
async def check_localnodes_one():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return None # try again
elif set(j) == {servers[0].ip_addr}:
return True
else:
return False
assert await wait_for(check_localnodes_one, time.time() + 10)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_localnodes_joining_nodes(manager: ManagerClient):
"""Test that if a cluster is being enlarged and a node is coming up but
not yet responsive, a "/localnodes" request should NOT return that node.
Reproduces issue #19694.
"""
# Start a cluster with one node, and then bring up a second node,
# pausing its bootstrap (with an injection) in JOINING state.
# We need to start the second node in the background, because server_add()
# will wait for the bootstrap to complete - which we don't want to do.
server = await manager.server_add(config=alternator_config)
task = asyncio.create_task(manager.server_add(config=alternator_config | {'error_injections_at_startup': ['delay_bootstrap_20s']}))
# Sleep until the first node knows of the second one as a "live node"
# (we check this with the REST API's /gossiper/endpoint/live.
async def check_two_live_nodes():
j = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
if len(j) == 1:
return None # try again
elif len(j) == 2:
return True
else:
return False
assert await wait_for(check_two_live_nodes, time.time() + 10)
# At this point the second node is live, but hasn't finished bootstrapping
# (we delayed that with the injection). So the "/localnodes" should still
# return just one node - not both. Reproduces #19694 (two nodes used to
# be returned)
localnodes_request = f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
assert len(j) == 1
# Ending the test here will kill both servers. We don't wait for the
# second server to finish its long injection-caused bootstrap delay,
# so we don't check here that when the second server finally comes up,
# both nodes will finally be visible in /localnodes. This case is checked
# in other tests, where bootstrap finishes normally - we don't need to
# check this case again here.
task.cancel()
# TODO: add a more thorough test for /localnodes, creating a cluster with
# multiple nodes in multiple data centers, and check that we can get a list