Compare commits

...

17 Commits

Author SHA1 Message Date
Jenkins Promoter
abbf0b24a6 Update ScyllaDB version to: 6.1.0 2024-08-04 14:31:47 +03:00
Kamil Braun
347857e5e5 Merge '[Backport 6.1] raft: fix the shutdown phase being stuck' from ScyllaDB
Some of the calls inside the `raft_group0_client::start_operation()` method were missing the abort source parameter. This caused the repair test to be stuck in the shutdown phase - the abort source has been triggered, but the operations were not checking it.

This was in particular the case of operations that try to take the ownership of the raft group semaphore (`get_units(semaphore)`) - these waits should be cancelled when the abort source is triggered.

This should fix the following tests that were failing in some percentage of dtest runs (about 1-3 of 100):
* TestRepairAdditional::test_repair_kill_1
* TestRepairAdditional::test_repair_kill_3

Fixes scylladb/scylladb#19223

(cherry picked from commit 2dbe9ef2f2)

(cherry picked from commit 5dfc50d354)

 Refs #19860

Closes scylladb/scylladb#19970

* github.com:scylladb/scylladb:
  raft: fix the shutdown phase being stuck
  raft: use the abort source reference in raft group0 client interface
2024-08-02 11:24:34 +02:00
Emil Maskovsky
cd2ca5ef57 raft: fix the shutdown phase being stuck
Some of the calls inside the `raft_group0_client::start_operation()`
method were missing the abort source parameter. This caused the repair
test to be stuck in the shutdown phase - the abort source has been
triggered, but the operations were not checking it.

This was in particular the case of operations that try to take the
ownership of the raft group semaphore (`get_units(semaphore)`) - these
waits should be cancelled when the abort source is triggered.

This should fix the following tests that were failing in some percentage
of dtest runs (about 1-3 of 100):
* TestRepairAdditional::test_repair_kill_1
* TestRepairAdditional::test_repair_kill_3

Fixes scylladb/scylladb#19223

(cherry picked from commit 5dfc50d354)
2024-07-31 20:52:23 +00:00
Emil Maskovsky
5a4065ecd5 raft: use the abort source reference in raft group0 client interface
Most callers of the raft group0 client interface are passing a real
source instance, so we can use the abort source reference in the client
interface. This change makes the code simpler and more consistent.

(cherry picked from commit 2dbe9ef2f2)
2024-07-31 20:52:23 +00:00
Kamil Braun
ed4f2ecca4 docs: extend "forbidden operations" section for Raft-topology upgrade
The Raft-topology upgrade procedure must not be run concurrently with
version upgrade.

(cherry picked from commit bb0c3cdc65)

Closes scylladb/scylladb#19836
2024-07-29 16:52:40 +02:00
Jenkins Promoter
8f80a84e93 Update ScyllaDB version to: 6.1.0-rc2 2024-07-29 15:50:26 +03:00
Nadav Har'El
97ae704f99 alternator: do not allow authentication with a non-"login" role
Alternator allows authentication into the existing CQL roles, but
roles which have the flag "login=false" should be refused in
authentication, and this patch adds the missing check.

The patch also adds a regression test for this feature in the
test/alternator test framework, in a new test file
test/alternator/cql_rbac.py. This test file will later include more
tests of how the CQL RBAC commands (CREATE ROLE, GRANT, REVOKE)
affect authentication and authorization in Alternator.
In particular, these tests need to use not just the DynamoDB API but
also CQL, so this new test file includes the "cql" fixture that allows
us to run CQL commands, to create roles, to retrieve their secret keys,
and so on.

Fixes #19735

(cherry picked from commit 14cd7b5095)

Closes scylladb/scylladb#19863
2024-07-25 12:45:27 +03:00
Nadav Har'El
738e4c3681 alternator: fix "/localnodes" to not return nodes still joining
Alternator's "/localnodes" HTTP request is supposed to return the list of
nodes in the local DC to which the user can send requests.

The existing implementation incorrectly used gossiper::is_alive() to check
for which nodes to return - but "alive" nodes include nodes which are still
joining the cluster and not really usable. These nodes can remain in the
JOINING state for a long time while they are copying data, and an attempt
to send requests to them will fail.

The fix for this bug is trivial: change the call to is_alive() to a call
to is_normal().

But the hard part of this test is the testing:

1. An existing multi-node test for "/localnodes" assummed that right after
   a new node was created, it appears on "/localnodes". But after this
   patch, it may take a bit more time for the bootstrapping to complete
   and the new node to appear in /localnodes - so I had to add a retry loop.

2. I added a test that reproduces the bug fixed here, and verifies its
   fix. The test is in the multi-node topology framework. It adds an
   injection which delays the bootstrap, which leaves a new node in JOINING
   state for a long time. The test then verifies that the new node is
   alive (as checked by the REST API), but is not returned by "/localnodes".

3. The new injection for delaying the bootstrap is unfortunately not
   very pretty - I had to do it in three places because we have several
   code paths of how bootstrap works without repair, with repair, without
   Raft and with Raft - and I wanted to delay all of them.

Fixes #19694.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 0d1aa399f9)

Closes scylladb/scylladb#19855
2024-07-24 11:04:54 +03:00
Lakshmi Narayanan Sreethar
ee74fe4e0e [Backport 6.1] sstables: do not reload components of unlinked sstables
The SSTable is removed from the reclaimed memory tracking logic only
when its object is deleted. However, there is a risk that the Bloom
filter reloader may attempt to reload the SSTable after it has been
unlinked but before the SSTable object is destroyed. Prevent this by
removing the SSTable from the reclaimed list maintained by the manager
as soon as it is unlinked.

The original logic that updated the memory tracking in
`sstables_manager::deactivate()` is left in place as (a) the variables
have to be updated only when the SSTable object is actually deleted, as
the memory used by the filter is not freed as long as the SSTable is
alive, and (b) the `_reclaimed.erase(*sst)` is still useful during
shutdown, for example, when the SSTable is not unlinked but just
destroyed.

Fixes https://github.com/scylladb/scylladb/issues/19722

Closes scylladb/scylladb#19717

* github.com:scylladb/scylladb:
  boost/bloom_filter_test: add testcase to verify unlinked sstables are not reloaded
  sstables: do not reload components of unlinked sstables
  sstables/sstables_manager: introduce on_unlink method

(cherry picked from commit 591876b44e)

Backported from #19717 to 6.1

Closes scylladb/scylladb#19828
2024-07-24 09:03:52 +03:00
Jenkins Promoter
b2ea946837 Update ScyllaDB version to: 6.1.0-rc1 2024-07-23 10:33:48 +03:00
Avi Kivity
92e725c467 Merge '[Backport 6.1] Fix lwt semaphore guard accounting' from ScyllaDB
Currently the guard does not account correctly for ongoing operation if semaphore acquisition fails. It may signal a semaphore when it is not held.

Should be backported to all supported versions.

(cherry picked from commit 87beebeed0)

(cherry picked from commit 4178589826)

 Refs #19699

Closes scylladb/scylladb#19819

* github.com:scylladb/scylladb:
  test: add test to check that coordinator lwt semaphore continues functioning after locking failures
  paxos: do not signal semaphore if it was not acquired
2024-07-22 17:41:30 +03:00
Kamil Braun
e57d48253f Merge '[Backport 6.1] test: raft: fix the flaky test_raft_recovery_stuck' from ScyllaDB
Use the rolling restart to avoid spurious driver reconnects.

This can be eventually reverted once the scylladb/python-driver#295 is fixed.

Fixes scylladb/scylladb#19154

(cherry picked from commit ef3393bd36)

(cherry picked from commit a89facbc74)

 Refs #19771

Closes scylladb/scylladb#19820

* github.com:scylladb/scylladb:
  test: raft: fix the flaky `test_raft_recovery_stuck`
  test: raft: code cleanup in `test_raft_recovery_stuck`
2024-07-22 14:12:26 +02:00
Emil Maskovsky
47df9f9b05 test: raft: fix the flaky test_raft_recovery_stuck
Use the rolling restart to avoid spurious driver reconnects.

This can be eventually reverted once the scylladb/python-driver#295 is
fixed.

Fixes scylladb/scylladb#19154

(cherry picked from commit a89facbc74)
2024-07-22 09:17:05 +00:00
Emil Maskovsky
193dc87bd0 test: raft: code cleanup in test_raft_recovery_stuck
Cleaning up the imports.

(cherry picked from commit ef3393bd36)
2024-07-22 09:17:04 +00:00
Gleb Natapov
11d1950957 test: add test to check that coordinator lwt semaphore continues functioning after locking failures
(cherry picked from commit 4178589826)
2024-07-22 09:01:34 +00:00
Gleb Natapov
6317325ed5 paxos: do not signal semaphore if it was not acquired
The guard signals a semaphore during destruction if it is marked as
locked, but currently it may be marked as locked even if locking failed.
Fix this by using semaphore_units instead of managing the locked flag
manually.

Fixes: https://github.com/scylladb/scylladb/issues/19698
(cherry picked from commit 87beebeed0)
2024-07-22 09:01:34 +00:00
Anna Mikhlin
14222ad205 Update ScyllaDB version to: 6.1.0-rc0 2024-07-18 16:05:23 +03:00
31 changed files with 445 additions and 97 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=6.1.0-dev
VERSION=6.1.0
if test -f version
then

View File

@@ -19,6 +19,7 @@
#include "alternator/executor.hh"
#include "cql3/selection/selection.hh"
#include "cql3/result_set.hh"
#include "types/types.hh"
#include <seastar/core/coroutine.hh>
namespace alternator {
@@ -31,11 +32,12 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
const column_definition* salted_hash_col = schema->get_column_definition(bytes("salted_hash"));
if (!salted_hash_col) {
const column_definition* can_login_col = schema->get_column_definition(bytes("can_login"));
if (!salted_hash_col || !can_login_col) {
co_await coroutine::return_exception(api_error::unrecognized_client(format("Credentials cannot be fetched for: {}", username)));
}
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col});
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options());
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col, can_login_col});
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
auto cl = auth::password_authenticator::consistency_for_user(username);
@@ -51,7 +53,14 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
if (result_set->empty()) {
co_await coroutine::return_exception(api_error::unrecognized_client(format("User not found: {}", username)));
}
const managed_bytes_opt& salted_hash = result_set->rows().front().front(); // We only asked for 1 row and 1 column
const auto& result = result_set->rows().front();
bool can_login = result[1] && value_cast<bool>(boolean_type->deserialize(*result[1]));
if (!can_login) {
// This is a valid role name, but has "login=False" so should not be
// usable for authentication (see #19735).
co_await coroutine::return_exception(api_error::unrecognized_client(format("Role {} has login=false so cannot be used for login", username)));
}
const managed_bytes_opt& salted_hash = result.front();
if (!salted_hash) {
co_await coroutine::return_exception(api_error::unrecognized_client(format("No password found for user: {}", username)));
}

View File

@@ -211,7 +211,10 @@ protected:
sstring local_dc = topology.get_datacenter();
std::unordered_set<gms::inet_address> local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc);
for (auto& ip : local_dc_nodes) {
if (_gossiper.is_alive(ip)) {
// Note that it's not enough for the node to be is_alive() - a
// node joining the cluster is also "alive" but not responsive to
// requests. We need the node to be in normal state. See #19694.
if (_gossiper.is_normal(ip)) {
// Use the gossiped broadcast_rpc_address if available instead
// of the internal IP address "ip". See discussion in #18711.
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));

View File

@@ -121,7 +121,7 @@ static future<> announce_mutations_with_guard(
::service::raft_group0_client& group0_client,
std::vector<canonical_mutation> muts,
::service::group0_guard group0_guard,
seastar::abort_source* as,
seastar::abort_source& as,
std::optional<::service::raft_timeout> timeout) {
auto group0_cmd = group0_client.prepare_command(
::service::write_mutations{
@@ -137,7 +137,7 @@ future<> announce_mutations_with_batching(
::service::raft_group0_client& group0_client,
start_operation_func_t start_operation_func,
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
seastar::abort_source* as,
seastar::abort_source& as,
std::optional<::service::raft_timeout> timeout) {
// account for command's overhead, it's better to use smaller threshold than constantly bounce off the limit
size_t memory_threshold = group0_client.max_command_size() * 0.75;
@@ -188,7 +188,7 @@ future<> announce_mutations(
::service::raft_group0_client& group0_client,
const sstring query_string,
std::vector<data_value_or_unset> values,
seastar::abort_source* as,
seastar::abort_source& as,
std::optional<::service::raft_timeout> timeout) {
auto group0_guard = co_await group0_client.start_operation(as, timeout);
auto timestamp = group0_guard.write_timestamp();

View File

@@ -80,7 +80,7 @@ future<> create_legacy_metadata_table_if_missing(
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
// Use this function when need to perform read before write on a single guard or if
// you have more than one mutation and potentially exceed single command size limit.
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source*)>;
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source&)>;
future<> announce_mutations_with_batching(
::service::raft_group0_client& group0_client,
// since we can operate also in topology coordinator context where we need stronger
@@ -88,7 +88,7 @@ future<> announce_mutations_with_batching(
// function here
start_operation_func_t start_operation_func,
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
seastar::abort_source* as,
seastar::abort_source& as,
std::optional<::service::raft_timeout> timeout);
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
@@ -97,7 +97,7 @@ future<> announce_mutations(
::service::raft_group0_client& group0_client,
const sstring query_string,
std::vector<data_value_or_unset> values,
seastar::abort_source* as,
seastar::abort_source& as,
std::optional<::service::raft_timeout> timeout);
// Appends mutations to a collector, they will be applied later on all nodes via group0 mechanism.

View File

@@ -136,7 +136,7 @@ future<> password_authenticator::create_default_if_missing() {
plogger.info("Created default superuser authentication record.");
} else {
co_await announce_mutations(_qp, _group0_client, query,
{salted_pwd, _superuser}, &_as, ::service::raft_timeout{});
{salted_pwd, _superuser}, _as, ::service::raft_timeout{});
plogger.info("Created default superuser authentication record.");
}
}

View File

@@ -681,7 +681,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
co_await announce_mutations_with_batching(g0,
start_operation_func,
std::move(gen),
&as,
as,
std::nullopt);
}

View File

@@ -192,7 +192,7 @@ future<> standard_role_manager::create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
} else {
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, &_as, ::service::raft_timeout{});
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, _as, ::service::raft_timeout{});
}
log.info("Created default superuser role '{}'.", _superuser);
} catch(const exceptions::unavailable_exception& e) {

View File

@@ -55,6 +55,7 @@ Make sure that administrative operations will not be running while upgrade is in
In particular, you must abstain from:
* :doc:`Cluster management procedures </operating-scylla/procedures/cluster-management/index>` (adding, replacing, removing, decommissioning nodes etc.).
* :doc:`Version upgrade procedures </upgrade/index>`. You must ensure that all Scylla nodes are running the same version before proceeding.
* Running :doc:`nodetool repair </operating-scylla/nodetool-commands/repair>`.
* Running :doc:`nodetool checkAndRepairCdcStreams </operating-scylla/nodetool-commands/checkandrepaircdcstreams>`.
* Any modifications of :doc:`authentication </operating-scylla/security/authentication>` and :doc:`authorization </operating-scylla/security/enable-authorization>` settings.

View File

@@ -906,7 +906,7 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
},
guard, std::move(description));
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &_as);
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), _as);
}
future<> migration_manager::announce_without_raft(std::vector<mutation> schema, group0_guard guard) {
@@ -993,7 +993,7 @@ future<> migration_manager::announce<topology_change>(std::vector<mutation> sche
future<group0_guard> migration_manager::start_group0_operation() {
assert(this_shard_id() == 0);
return _group0_client.start_operation(&_as, raft_timeout{});
return _group0_client.start_operation(_as, raft_timeout{});
}
/**

View File

@@ -7,6 +7,7 @@
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include "seastar/core/semaphore.hh"
#include "service/paxos/proposal.hh"
#include "log.hh"
#include "utils/digest_algorithm.hh"
@@ -31,6 +32,7 @@ private:
class key_lock_map {
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
using semaphore_units = semaphore_units<semaphore_default_exception_factory, clock_type>;
using map = std::unordered_map<dht::token, semaphore>;
semaphore& get_semaphore_for_key(const dht::token& key);
@@ -46,22 +48,15 @@ private:
key_lock_map& _map;
dht::token _key;
clock_type::time_point _timeout;
bool _locked = false;
key_lock_map::semaphore_units _units;
public:
future<> lock() {
auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1);
_locked = true;
return f;
future<> lock () {
return get_units(_map.get_semaphore_for_key(_key), 1, _timeout).then([this] (auto&& u) { _units = std::move(u); });
}
guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {};
guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) {
o._locked = false;
}
guard(guard&& o) = default;
~guard() {
if (_locked) {
_map.get_semaphore_for_key(_key).signal(1);
_map.release_semaphore_for_key(_key);
}
_map.release_semaphore_for_key(_key);
}
};

View File

@@ -519,7 +519,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
val_binders_str += ", ?";
}
auto guard = co_await group0_client.start_operation(&as);
auto guard = co_await group0_client.start_operation(as);
std::vector<mutation> migration_muts;
for (const auto& row: *rows) {
@@ -554,7 +554,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
.mutations{migration_muts.begin(), migration_muts.end()},
};
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as);
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
}
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {

View File

@@ -155,7 +155,7 @@ semaphore& raft_group0_client::operation_mutex() {
return _operation_mutex;
}
future<> raft_group0_client::add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as,
future<> raft_group0_client::add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source& as,
std::optional<raft_timeout> timeout)
{
if (this_shard_id() != 0) {
@@ -239,7 +239,7 @@ static utils::UUID generate_group0_state_id(utils::UUID prev_state_id) {
return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{ts});
}
future<group0_guard> raft_group0_client::start_operation(seastar::abort_source* as, std::optional<raft_timeout> timeout) {
future<group0_guard> raft_group0_client::start_operation(seastar::abort_source& as, std::optional<raft_timeout> timeout) {
if (this_shard_id() != 0) {
on_internal_error(logger, "start_group0_operation: must run on shard 0");
}
@@ -251,12 +251,12 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source*
auto [upgrade_lock_holder, upgrade_state] = co_await get_group0_upgrade_state();
switch (upgrade_state) {
case group0_upgrade_state::use_post_raft_procedures: {
auto operation_holder = co_await get_units(_operation_mutex, 1);
co_await _raft_gr.group0_with_timeouts().read_barrier(as, timeout);
auto operation_holder = co_await get_units(_operation_mutex, 1, as);
co_await _raft_gr.group0_with_timeouts().read_barrier(&as, timeout);
// Take `_group0_read_apply_mutex` *after* read barrier.
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
auto read_apply_holder = co_await hold_read_apply_mutex();
auto read_apply_holder = co_await hold_read_apply_mutex(as);
auto observed_group0_state_id = co_await _sys_ks.get_last_group0_state_id();
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
@@ -546,7 +546,7 @@ static future<> add_write_mutations_entry(
std::string_view description,
std::vector<canonical_mutation> muts,
::service::group0_guard group0_guard,
seastar::abort_source* as,
seastar::abort_source& as,
std::optional<::service::raft_timeout> timeout) {
logger.trace("add_write_mutations_entry: {} mutations with description {}",
muts.size(), description);
@@ -582,7 +582,7 @@ future<> group0_batch::commit(::service::raft_group0_client& group0_client, seas
// when producer expects substantial number or size of mutations it should use generator
if (_generators.size() == 0) {
std::vector<canonical_mutation> cmuts = {_muts.begin(), _muts.end()};
co_return co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), &as, timeout);
co_return co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), as, timeout);
}
// raft doesn't support streaming so we need to materialize all mutations in memory
co_await materialize_mutations();
@@ -591,7 +591,7 @@ future<> group0_batch::commit(::service::raft_group0_client& group0_client, seas
}
std::vector<canonical_mutation> cmuts = {_muts.begin(), _muts.end()};
_muts.clear();
co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), &as, timeout);
co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), as, timeout);
}
future<std::pair<std::vector<mutation>, ::service::group0_guard>> group0_batch::extract() && {

View File

@@ -109,7 +109,7 @@ public:
// Call after `system_keyspace` is initialized.
future<> init();
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as, std::optional<raft_timeout> timeout = std::nullopt);
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source& as, std::optional<raft_timeout> timeout = std::nullopt);
future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as);
@@ -133,7 +133,7 @@ public:
// FIXME?: this is kind of annoying for the user.
// we could forward the call to shard 0, have group0_guard keep a foreign_ptr to the internal data structures on shard 0,
// and add_entry would again forward to shard 0.
future<group0_guard> start_operation(seastar::abort_source* as, std::optional<raft_timeout> timeout = std::nullopt);
future<group0_guard> start_operation(seastar::abort_source& as, std::optional<raft_timeout> timeout = std::nullopt);
template<typename Command>
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>

View File

@@ -514,11 +514,11 @@ raft_server_with_timeouts::run_with_timeout(Op&& op, const char* op_name,
}
future<> raft_server_with_timeouts::add_entry(raft::command command, raft::wait_type type,
seastar::abort_source* as, std::optional<raft_timeout> timeout)
seastar::abort_source& as, std::optional<raft_timeout> timeout)
{
return run_with_timeout([&](abort_source* as) {
return _group_server.server->add_entry(std::move(command), type, as);
}, "add_entry", as, timeout);
}, "add_entry", &as, timeout);
}
future<> raft_server_with_timeouts::modify_config(std::vector<raft::config_member> add, std::vector<raft::server_id> del,

View File

@@ -92,7 +92,7 @@ class raft_server_with_timeouts {
run_with_timeout(Op&& op, const char* op_name, seastar::abort_source* as, std::optional<raft_timeout> timeout);
public:
raft_server_with_timeouts(raft_server_for_group& group_server, raft_group_registry& registry);
future<> add_entry(raft::command command, raft::wait_type type, seastar::abort_source* as, std::optional<raft_timeout> timeout);
future<> add_entry(raft::command command, raft::wait_type type, seastar::abort_source& as, std::optional<raft_timeout> timeout);
future<> modify_config(std::vector<raft::config_member> add, std::vector<raft::server_id> del, seastar::abort_source* as, std::optional<raft_timeout> timeout);
future<bool> trigger_snapshot(seastar::abort_source* as, std::optional<raft_timeout> timeout);
future<> read_barrier(seastar::abort_source* as, std::optional<raft_timeout> timeout);

View File

@@ -6283,6 +6283,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
auto l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
co_await utils::get_local_injector().inject("cas_timeout_after_lock", write_timeout + std::chrono::milliseconds(100));
while (true) {
// Finish the previous PAXOS round, if any, and, as a side effect, compute
// a ballot (round identifier) which is a) unique b) has good chances of being

View File

@@ -1019,7 +1019,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
{
// The scope for the guard
auto guard = co_await _group0->client().start_operation(&_group0_as);
auto guard = co_await _group0->client().start_operation(_group0_as);
auto me = _topology_state_machine._topology.find(server.id());
// Recheck that cleanup is needed after the barrier
if (!me || me->second.cleanup != cleanup_status::running) {
@@ -1056,7 +1056,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
rtlogger.info("cleanup ended");
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
auto guard = co_await _group0->client().start_operation(_group0_as);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
@@ -1064,7 +1064,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup completed for {}", server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
} catch (group0_concurrent_modification&) {
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
continue;
@@ -1281,7 +1281,7 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
}
rtlogger.info("adding myself as the first node to the topology");
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto insert_join_request_mutations = build_mutation_from_join_params(params, guard);
@@ -1304,7 +1304,7 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
"bootstrap: adding myself as the first node to the topology");
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("bootstrap: concurrent operation is detected, retrying.");
}
@@ -1353,7 +1353,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
while (true) {
rtlogger.info("refreshing topology to check if it's synchronized with local metadata");
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (synchronized()) {
break;
@@ -1391,7 +1391,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("update topology with local metadata:"
" concurrent operation is detected, retrying.");
@@ -1428,7 +1428,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
}
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) {
co_return;
@@ -1441,7 +1441,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "upgrade: start");
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("upgrade: concurrent operation is detected, retrying.");
@@ -1782,6 +1782,8 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
set_mode(mode::JOINING);
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
if (raft_server) { // Raft is enabled. Check if we need to bootstrap ourself using raft
rtlogger.info("topology changes are using raft");
@@ -3565,7 +3567,7 @@ future<> storage_service::raft_decommission() {
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
@@ -3595,7 +3597,7 @@ future<> storage_service::raft_decommission() {
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("decommission: concurrent operation is detected, retrying.");
continue;
@@ -3820,6 +3822,8 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
// Step 3: Prepare to sync data
ctl.prepare(node_ops_cmd::bootstrap_prepare).get();
utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20)).get();
// Step 5: Sync data for bootstrap
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();
on_streaming_finished();
@@ -3898,7 +3902,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(id);
@@ -3955,7 +3959,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
try {
// Make non voter during request submission for better HA
co_await _group0->make_nonvoters(ignored_ids, _group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("removenode: concurrent operation is detected, retrying.");
continue;
@@ -4529,7 +4533,7 @@ future<> storage_service::do_cluster_cleanup() {
auto& raft_server = _group0->group0_server();
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto curr_req = _topology_state_machine._topology.global_request;
if (curr_req && *curr_req != global_topology_request::cleanup) {
@@ -4557,7 +4561,7 @@ future<> storage_service::do_cluster_cleanup() {
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested"));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("cleanup: concurrent operation is detected, retrying.");
continue;
@@ -4587,11 +4591,11 @@ future<sstring> storage_service::wait_for_topology_request_completion(utils::UUI
}
future<> storage_service::wait_for_topology_not_busy() {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
while (_topology_state_machine._topology.is_busy()) {
release_guard(std::move(guard));
co_await _topology_state_machine.event.wait();
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
}
}
@@ -4600,7 +4604,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto it = _topology_state_machine._topology.find(raft_server.id());
if (!it) {
@@ -4633,7 +4637,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("rebuild: concurrent operation is detected, retrying.");
continue;
@@ -4653,7 +4657,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
while (true) {
rtlogger.info("request check_and_repair_cdc_streams, refreshing topology");
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
auto curr_req = _topology_state_machine._topology.global_request;
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
// FIXME: replace this with a queue
@@ -4679,7 +4683,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
continue;
@@ -5251,7 +5255,7 @@ future<> storage_service::process_tablet_split_candidate(table_id table) {
while (!_async_gate.is_closed() && !_group0_as.abort_requested()) {
try {
// Ensures that latest changes to tablet metadata, in group0, are visible
auto guard = co_await _group0->client().start_operation(&_group0_as);
auto guard = co_await _group0->client().start_operation(_group0_as);
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
if (!tmap.needs_split()) {
release_guard(std::move(guard));
@@ -5497,6 +5501,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens);
} else {
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
@@ -6193,7 +6199,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
future<> storage_service::transit_tablet(table_id table, dht::token token, noncopyable_function<std::tuple<std::vector<canonical_mutation>, sstring>(const locator::tablet_map&, api::timestamp_type)> prepare_mutations) {
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
while (_topology_state_machine._topology.is_busy()) {
const auto tstate = *_topology_state_machine._topology.tstate;
@@ -6204,7 +6210,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco
rtlogger.debug("transit_tablet(): topology state machine is busy: {}", tstate);
release_guard(std::move(guard));
co_await _topology_state_machine.event.wait();
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
@@ -6226,7 +6232,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("transit_tablet(): concurrent modification, retrying");
@@ -6251,7 +6257,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
}
while (true) {
group0_guard guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
std::vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
@@ -6263,7 +6269,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
@@ -6404,7 +6410,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
}
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
if (const auto *p = _topology_state_machine._topology.find(params.host_id)) {
const auto& rs = p->second;
@@ -6458,7 +6464,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
try {
// Make replaced node and ignored nodes non voters earlier for better HA
co_await _group0->make_nonvoters(ignored_nodes_from_join_params(params), _group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("join_node_request: concurrent operation is detected, retrying.");

View File

@@ -294,7 +294,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
};
future<group0_guard> start_operation() {
auto guard = co_await _group0.client().start_operation(&_as);
auto guard = co_await _group0.client().start_operation(_as);
if (_term != _raft.get_current_term()) {
throw term_changed_error{};
@@ -337,7 +337,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.trace("update_topology_state mutations: {}", updates);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
} catch (group0_concurrent_modification&) {
rtlogger.info("race while changing state: {}. Retrying", reason);
throw;
@@ -829,7 +829,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
mixed_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
break;
} catch (group0_concurrent_modification&) {
rtlogger.info("handle_global_request(): concurrent modification, retrying");
@@ -2604,7 +2604,7 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
if (auth_version < db::system_keyspace::auth_version_t::v2) {
rtlogger.info("migrating system_auth keyspace data");
co_await auth::migrate_to_auth_v2(_sys_ks, _group0.client(),
[this] (abort_source*) { return start_operation();}, _as);
[this] (abort_source&) { return start_operation();}, _as);
}
auto tmptr = get_token_metadata_ptr();

View File

@@ -2986,6 +2986,7 @@ sstable::unlink(storage::sync_dir sync) noexcept {
co_await std::move(remove_fut);
_stats.on_delete();
_manager.on_unlink(this);
}
thread_local sstables_stats::stats sstables_stats::_shard_stats;

View File

@@ -323,6 +323,11 @@ void sstables_manager::validate_new_keyspace_storage_options(const data_dictiona
}, so.value);
}
void sstables_manager::on_unlink(sstable* sst) {
// Remove the sst from manager's reclaimed list to prevent any attempts to reload its components.
_reclaimed.erase(*sst);
}
sstables_registry::~sstables_registry() = default;
} // namespace sstables

View File

@@ -188,6 +188,9 @@ public:
void validate_new_keyspace_storage_options(const data_dictionary::storage_options&);
// To be called by the sstable to signal its unlinking
void on_unlink(sstable* sst);
private:
void add(sstable* sst);
// Transition the sstable to the "inactive" state. It has no

View File

@@ -0,0 +1,126 @@
# Copyright 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
# Tests for how CQL's Role-Based Access Control (RBAC) commands - CREATE ROLE,
# GRANT, REVOKE, etc., can be used on Alternator for authentication and for
# authorization. For example if the low-level name of an Alternator table "x"
# is alternator_x.x, and a certain user is not granted permission to "modify"
# keyspace alternator_x, Alternator write requests (PutItem, UpdateItem,
# DeleteItem, BatchWriteItem) by that user will be denied.
#
# Because this file is all about testing the Scylla-only CQL-based RBAC,
# all tests in this file are skipped when running against Amazon DynamoDB.
import pytest
import boto3
from botocore.exceptions import ClientError
import time
from contextlib import contextmanager
from test.alternator.util import is_aws, unique_table_name
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
from cassandra.policies import RoundRobinPolicy
import re
# This file is all about testing RBAC as configured via CQL, so we need to
# connect to CQL to set these tests up. The "cql" fixture below enables that.
# If we're not testing Scylla, or the CQL port is not available on the same
# IP address as the Alternator IP address, a test using this fixture will
# be skipped with a message about the CQL API not being available.
@pytest.fixture(scope="module")
def cql(dynamodb):
if is_aws(dynamodb):
pytest.skip('Scylla-only CQL API not supported by AWS')
url = dynamodb.meta.client._endpoint.host
host, = re.search(r'.*://([^:]*):', url).groups()
profile = ExecutionProfile(
load_balancing_policy=RoundRobinPolicy(),
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL)
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
contact_points=[host],
port=9042,
protocol_version=4,
auth_provider=PlainTextAuthProvider(username='cassandra', password='cassandra'),
)
try:
ret = cluster.connect()
# "BEGIN BATCH APPLY BATCH" is the closest to do-nothing I could find
ret.execute("BEGIN BATCH APPLY BATCH")
except NoHostAvailable:
pytest.skip('Could not connect to Scylla-only CQL API')
yield ret
cluster.shutdown()
# new_role() is a context manager for temporarily creating a new role with
# a unique name and returning its name and the secret key needed to connect
# to it with the DynamoDB API.
# The "login" and "superuser" flags are passed to the CREATE ROLE statement.
@contextmanager
def new_role(cql, login=True, superuser=False):
# The role name is not a table's name but it doesn't matter. Because our
# unique_table_name() uses (deliberately) a non-lower-case character, the
# role name has to be quoted in double quotes when used in CQL below.
role = unique_table_name()
# The password set for the new role is identical to the user name (not
# very secure ;-)) - but we later need to retrieve the "salted hash" of
# this password, which serves in Alternator as the secret key of the role.
cql.execute(f"CREATE ROLE \"{role}\" WITH PASSWORD = '{role}' AND SUPERUSER = {superuser} AND LOGIN = {login}")
# Newer Scylla places the "roles" table in the "system" keyspace, but
# older versions used "system_auth_v2" or "system_auth"
key = None
for ks in ['system', 'system_auth_v2', 'system_auth']:
try:
e = list(cql.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{role}'"))
if e != []:
key = e[0].salted_hash
if key is not None:
break
except:
pass
assert key is not None
try:
yield (role, key)
finally:
cql.execute(f'DROP ROLE "{role}"')
# Create a new DynamoDB API resource (connection object) similar to the
# existing "dynamodb" resource - but authenticating with the given role
# and key.
@contextmanager
def new_dynamodb(dynamodb, role, key):
url = dynamodb.meta.client._endpoint.host
config = dynamodb.meta.client._client_config
verify = not url.startswith('https')
ret = boto3.resource('dynamodb', endpoint_url=url, verify=verify,
aws_access_key_id=role, aws_secret_access_key=key,
region_name='us-east-1', config=config)
try:
yield ret
finally:
ret.meta.client.close()
# A basic test for creating a new role. The ListTables operation is allowed
# to any role, so it should work in the new role when given the right password
# and fail with the wrong password.
def test_new_role(dynamodb, cql):
with new_role(cql) as (role, key):
with new_dynamodb(dynamodb, role, key) as d:
# ListTables should not fail (we don't care what is the result)
d.meta.client.list_tables()
# Trying to use the wrong key for the new role should fail to perform
# any request. The new_dynamodb() function can't detect the error,
# it is detected when attempting to perform a request with it.
with new_dynamodb(dynamodb, role, 'wrongkey') as d:
with pytest.raises(ClientError, match='UnrecognizedClientException'):
d.meta.client.list_tables()
# A role without "login" permissions cannot be used to authenticate requests.
# Reproduces #19735.
def test_login_false(dynamodb, cql):
with new_role(cql, login=False) as (role, key):
with new_dynamodb(dynamodb, role, key) as d:
with pytest.raises(ClientError, match='UnrecognizedClientException.*login=false'):
d.meta.client.list_tables()

View File

@@ -15,6 +15,7 @@
#include "readers/from_mutations_v2.hh"
#include "utils/bloom_filter.hh"
#include "utils/error_injection.hh"
SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components_and_reload_reclaimed_components) {
return test_env::do_with_async([] (test_env& env) {
@@ -52,6 +53,11 @@ std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env
return {sst, sst_bf_memory};
}
void dispose_and_stop_tracking_bf_memory(shared_sstable&& sst, test_env_sstables_manager& mgr) {
mgr.remove_sst_from_reclaimed(sst.get());
shared_sstable::dispose(sst.release().release());
}
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
@@ -89,7 +95,7 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter)
// Test auto reload - disposing sst3 should trigger reload of the
// smallest filter in the reclaimed list, which is sst1's bloom filter.
shared_sstable::dispose(sst3.release().release());
dispose_and_stop_tracking_bf_memory(std::move(sst3), sst_mgr);
REQUIRE_EVENTUALLY_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
// only sst4's bloom filter memory should be reported as reclaimed
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst4_bf_memory);
@@ -154,7 +160,7 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
utils::get_local_injector().enable("reload_reclaimed_components/pause", true);
// dispose sst2 to trigger reload of sst1's bloom filter
shared_sstable::dispose(sst2.release().release());
dispose_and_stop_tracking_bf_memory(std::move(sst2), sst_mgr);
// _total_reclaimable_memory will be updated when the reload begins; wait for it.
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
@@ -223,3 +229,57 @@ SEASTAR_TEST_CASE(test_bloom_filters_with_bad_partition_estimate) {
}
});
};
SEASTAR_TEST_CASE(test_bloom_filter_reload_after_unlink) {
return test_env::do_with_async([] (test_env& env) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return;
#endif
simple_schema ss;
auto schema = ss.schema();
auto mut = mutation(schema, ss.make_pkey(1));
mut.partition().apply_insert(*schema, ss.make_ckey(1), ss.new_timestamp());
// bloom filter will be reclaimed automatically due to low memory
auto sst = make_sstable_containing(env.make_sstable(schema), {mut});
auto& sst_mgr = env.manager();
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0);
auto memory_reclaimed = sst_mgr.get_total_memory_reclaimed();
// manager's reclaimed set has the sst now
auto& reclaimed_set = sst_mgr.get_reclaimed_set();
BOOST_REQUIRE_EQUAL(reclaimed_set.size(), 1);
BOOST_REQUIRE_EQUAL(reclaimed_set.begin()->get_filename(), sst->get_filename());
// hold a copy of shared sst object in async thread to test reload after unlink
utils::get_local_injector().enable("test_bloom_filter_reload_after_unlink");
auto async_sst_holder = seastar::async([sst] {
// do nothing just hold a copy of sst and wait for message signalling test completion
utils::get_local_injector().inject("test_bloom_filter_reload_after_unlink", [] (auto& handler) {
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
return ret;
}).get();
});
// unlink the sst and release the object
sst->unlink().get();
sst.release();
// reclaimed set should be now empty but the total memory reclaimed should
// be still the same as the sst object is not deactivated yet due to a copy
// being alive in the async thread.
BOOST_REQUIRE_EQUAL(sst_mgr.get_reclaimed_set().size(), 0);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), memory_reclaimed);
// message async thread to complete waiting and thus release its copy of sst, triggering deactivation
utils::get_local_injector().receive_message("test_bloom_filter_reload_after_unlink");
async_sst_holder.get();
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
}, {
// set available memory = 0 to force reclaim the bloom filter
.available_memory = 0
});
};

View File

@@ -252,7 +252,7 @@ SEASTAR_TEST_CASE(test_group0_batch) {
};
auto do_transaction = [&] (std::function<future<>(service::group0_batch&)> f) -> future<> {
auto guard = co_await rclient.start_operation(&as);
auto guard = co_await rclient.start_operation(as);
service::group0_batch mc(std::move(guard));
co_await f(mc);
co_await std::move(mc).commit(rclient, as, ::service::raft_timeout{});
@@ -273,7 +273,7 @@ SEASTAR_TEST_CASE(test_group0_batch) {
// test extract
{
auto guard = co_await rclient.start_operation(&as);
auto guard = co_await rclient.start_operation(as);
service::group0_batch mc(std::move(guard));
mc.add_mutation(co_await insert_mut(1, 2));
mc.add_generator([&] (api::timestamp_type t) -> ::service::mutations_generator {

View File

@@ -44,5 +44,7 @@ custom_args:
- '-c1 -m256M'
commitlog_cleanup_test:
- '-c1 -m2G'
bloom_filter_test:
- '-c1'
run_in_debug:
- logalloc_standard_allocator_segment_pool_backend_test

View File

@@ -985,7 +985,7 @@ private:
config.is_superuser = true;
config.can_login = true;
auto as = &abort_sources.local();
auto& as = abort_sources.local();
auto guard = group0_client.start_operation(as).get();
service::group0_batch mc{std::move(guard)};
auth::create_role(
@@ -994,7 +994,7 @@ private:
config,
auth::authentication_options(),
mc).get();
std::move(mc).commit(group0_client, *as, ::service::raft_timeout{}).get();
std::move(mc).commit(group0_client, as, ::service::raft_timeout{}).get();
} catch (const auth::role_already_exists&) {
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}
@@ -1060,7 +1060,7 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_tes
void do_with_mc(cql_test_env& env, std::function<void(service::group0_batch&)> func) {
seastar::abort_source as;
auto& g0 = env.get_raft_group0_client();
auto guard = g0.start_operation(&as).get();
auto guard = g0.start_operation(as).get();
auto mc = service::group0_batch(std::move(guard));
func(mc);
std::move(mc).commit(g0, as, std::nullopt).get();

View File

@@ -57,6 +57,14 @@ public:
size_t get_total_reclaimable_memory() {
return _total_reclaimable_memory;
}
void remove_sst_from_reclaimed(sstable* sst) {
_reclaimed.erase(*sst);
}
auto& get_reclaimed_set() {
return _reclaimed;
}
};
class test_env_compaction_manager {

View File

@@ -0,0 +1,37 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import time
from test.pylib.rest_client import inject_error
from test.pylib.util import wait_for_cql_and_get_hosts
import pytest
from cassandra.protocol import WriteTimeout
@pytest.mark.asyncio
async def test_cas_semaphore(manager):
""" This is a regression test for scylladb/scylladb#19698 """
servers = await manager.servers_add(1, cmdline=['--smp', '1', '--write-request-timeout-in-ms', '500'])
host = await wait_for_cql_and_get_hosts(manager.cql, {servers[0]}, time.time() + 60)
await manager.cql.run_async("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
await manager.cql.run_async("CREATE TABLE test.test (a int PRIMARY KEY, b int)")
async with inject_error(manager.api, servers[0].ip_addr, 'cas_timeout_after_lock'):
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
try:
await asyncio.gather(*res)
except WriteTimeout:
pass
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
await asyncio.gather(*res)
metrics = await manager.metrics.query(servers[0].ip_addr)
contention = metrics.get(name="scylla_storage_proxy_coordinator_cas_write_contention_count")
assert contention == None

View File

@@ -12,8 +12,8 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
from test.topology.conftest import skip_mode
from test.topology.util import reconnect_driver, enter_recovery_state, wait_for_upgrade_state, \
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
from test.topology.util import (delete_raft_data_and_upgrade_state, enter_recovery_state, log_run_time,
reconnect_driver, wait_for_upgrade_state, wait_until_upgrade_finishes)
@pytest.mark.asyncio
@@ -77,8 +77,7 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
logging.info(f"Restarting {others}")
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
cql = await reconnect_driver(manager)
await manager.rolling_restart(others)
logging.info(f"{others} restarted, waiting until driver reconnects to them")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
@@ -100,11 +99,11 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
logging.info(f"Removing {srv1} using {others[0]}")
await manager.remove_node(others[0].server_id, srv1.server_id)
logging.info(f"Deleting Raft data and upgrade state on {hosts} and restarting")
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
cql = await reconnect_driver(manager)
logging.info(f"Restarting {others}")
await manager.rolling_restart(others)
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)

View File

@@ -23,6 +23,8 @@ import requests
import json
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.topology.conftest import skip_mode
logger = logging.getLogger(__name__)
@@ -206,13 +208,102 @@ async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
}
servers = await manager.servers_add(2, config=config)
for server in servers:
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
response = requests.get(url, verify=False)
assert response.ok
j = json.loads(response.content.decode('utf-8'))
# We expect /localnodes to return ["1.2.3.4", "1.2.3.4"]
# (since we configured both nodes with the same broadcast_rpc_address):
assert j == ['1.2.3.4', '1.2.3.4']
# (since we configured both nodes with the same broadcast_rpc_address).
# We need the retry loop below because the second node might take a
# bit of time to bootstrap after coming up, and only then will it
# appear on /localnodes (see #19694).
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
timeout = time.time() + 10
while True:
assert time.time() < timeout
response = requests.get(url, verify=False)
j = json.loads(response.content.decode('utf-8'))
if j == ['1.2.3.4', '1.2.3.4']:
break # done
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_localnodes_drained_node(manager: ManagerClient):
"""Test that if in a cluster one node is brought down with "nodetool drain"
a "/localnodes" request should NOT return that node. This test does
NOT reproduce issue #19694 - a DRAINED node is not considered is_alive()
and even before the fix of that issue, "/localnodes" didn't return it.
"""
# Start a cluster with two nodes and verify that at this point,
# "/localnodes" on the first node returns both nodes.
# We the retry loop below because the second node might take a
# bit of time to bootstrap after coming up, and only then will it
# appear on /localnodes (see #19694).
servers = await manager.servers_add(2, config=alternator_config)
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
async def check_localnodes_two():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return True
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
return None # try again
else:
return False
assert await wait_for(check_localnodes_two, time.time() + 10)
# Now "nodetool" drain on the second node, leaving the second node
# in DRAINED state.
await manager.api.client.post("/storage_service/drain", host=servers[1].ip_addr)
# After that, "/localnodes" should no longer return the second node.
# It might take a short while until the first node learns what happened
# to node 1, so we may need to retry for a while
async def check_localnodes_one():
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
return None # try again
elif set(j) == {servers[0].ip_addr}:
return True
else:
return False
assert await wait_for(check_localnodes_one, time.time() + 10)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_localnodes_joining_nodes(manager: ManagerClient):
"""Test that if a cluster is being enlarged and a node is coming up but
not yet responsive, a "/localnodes" request should NOT return that node.
Reproduces issue #19694.
"""
# Start a cluster with one node, and then bring up a second node,
# pausing its bootstrap (with an injection) in JOINING state.
# We need to start the second node in the background, because server_add()
# will wait for the bootstrap to complete - which we don't want to do.
server = await manager.server_add(config=alternator_config)
task = asyncio.create_task(manager.server_add(config=alternator_config | {'error_injections_at_startup': ['delay_bootstrap_20s']}))
# Sleep until the first node knows of the second one as a "live node"
# (we check this with the REST API's /gossiper/endpoint/live.
async def check_two_live_nodes():
j = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
if len(j) == 1:
return None # try again
elif len(j) == 2:
return True
else:
return False
assert await wait_for(check_two_live_nodes, time.time() + 10)
# At this point the second node is live, but hasn't finished bootstrapping
# (we delayed that with the injection). So the "/localnodes" should still
# return just one node - not both. Reproduces #19694 (two nodes used to
# be returned)
localnodes_request = f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
response = requests.get(localnodes_request)
j = json.loads(response.content.decode('utf-8'))
assert len(j) == 1
# Ending the test here will kill both servers. We don't wait for the
# second server to finish its long injection-caused bootstrap delay,
# so we don't check here that when the second server finally comes up,
# both nodes will finally be visible in /localnodes. This case is checked
# in other tests, where bootstrap finishes normally - we don't need to
# check this case again here.
task.cancel()
# TODO: add a more thorough test for /localnodes, creating a cluster with
# multiple nodes in multiple data centers, and check that we can get a list