Compare commits
17 Commits
debug_form
...
scylla-6.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
abbf0b24a6 | ||
|
|
347857e5e5 | ||
|
|
cd2ca5ef57 | ||
|
|
5a4065ecd5 | ||
|
|
ed4f2ecca4 | ||
|
|
8f80a84e93 | ||
|
|
97ae704f99 | ||
|
|
738e4c3681 | ||
|
|
ee74fe4e0e | ||
|
|
b2ea946837 | ||
|
|
92e725c467 | ||
|
|
e57d48253f | ||
|
|
47df9f9b05 | ||
|
|
193dc87bd0 | ||
|
|
11d1950957 | ||
|
|
6317325ed5 | ||
|
|
14222ad205 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=6.1.0-dev
|
||||
VERSION=6.1.0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "alternator/executor.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "types/types.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
namespace alternator {
|
||||
@@ -31,11 +32,12 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
const column_definition* salted_hash_col = schema->get_column_definition(bytes("salted_hash"));
|
||||
if (!salted_hash_col) {
|
||||
const column_definition* can_login_col = schema->get_column_definition(bytes("can_login"));
|
||||
if (!salted_hash_col || !can_login_col) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Credentials cannot be fetched for: {}", username)));
|
||||
}
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id}, selection->get_query_options());
|
||||
auto selection = cql3::selection::selection::for_columns(schema, {salted_hash_col, can_login_col});
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
|
||||
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
@@ -51,7 +53,14 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::serv
|
||||
if (result_set->empty()) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("User not found: {}", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result_set->rows().front().front(); // We only asked for 1 row and 1 column
|
||||
const auto& result = result_set->rows().front();
|
||||
bool can_login = result[1] && value_cast<bool>(boolean_type->deserialize(*result[1]));
|
||||
if (!can_login) {
|
||||
// This is a valid role name, but has "login=False" so should not be
|
||||
// usable for authentication (see #19735).
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("Role {} has login=false so cannot be used for login", username)));
|
||||
}
|
||||
const managed_bytes_opt& salted_hash = result.front();
|
||||
if (!salted_hash) {
|
||||
co_await coroutine::return_exception(api_error::unrecognized_client(format("No password found for user: {}", username)));
|
||||
}
|
||||
|
||||
@@ -211,7 +211,10 @@ protected:
|
||||
sstring local_dc = topology.get_datacenter();
|
||||
std::unordered_set<gms::inet_address> local_dc_nodes = topology.get_datacenter_endpoints().at(local_dc);
|
||||
for (auto& ip : local_dc_nodes) {
|
||||
if (_gossiper.is_alive(ip)) {
|
||||
// Note that it's not enough for the node to be is_alive() - a
|
||||
// node joining the cluster is also "alive" but not responsive to
|
||||
// requests. We need the node to be in normal state. See #19694.
|
||||
if (_gossiper.is_normal(ip)) {
|
||||
// Use the gossiped broadcast_rpc_address if available instead
|
||||
// of the internal IP address "ip". See discussion in #18711.
|
||||
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));
|
||||
|
||||
@@ -121,7 +121,7 @@ static future<> announce_mutations_with_guard(
|
||||
::service::raft_group0_client& group0_client,
|
||||
std::vector<canonical_mutation> muts,
|
||||
::service::group0_guard group0_guard,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
auto group0_cmd = group0_client.prepare_command(
|
||||
::service::write_mutations{
|
||||
@@ -137,7 +137,7 @@ future<> announce_mutations_with_batching(
|
||||
::service::raft_group0_client& group0_client,
|
||||
start_operation_func_t start_operation_func,
|
||||
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
// account for command's overhead, it's better to use smaller threshold than constantly bounce off the limit
|
||||
size_t memory_threshold = group0_client.max_command_size() * 0.75;
|
||||
@@ -188,7 +188,7 @@ future<> announce_mutations(
|
||||
::service::raft_group0_client& group0_client,
|
||||
const sstring query_string,
|
||||
std::vector<data_value_or_unset> values,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
auto group0_guard = co_await group0_client.start_operation(as, timeout);
|
||||
auto timestamp = group0_guard.write_timestamp();
|
||||
|
||||
@@ -80,7 +80,7 @@ future<> create_legacy_metadata_table_if_missing(
|
||||
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
|
||||
// Use this function when need to perform read before write on a single guard or if
|
||||
// you have more than one mutation and potentially exceed single command size limit.
|
||||
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source*)>;
|
||||
using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source&)>;
|
||||
future<> announce_mutations_with_batching(
|
||||
::service::raft_group0_client& group0_client,
|
||||
// since we can operate also in topology coordinator context where we need stronger
|
||||
@@ -88,7 +88,7 @@ future<> announce_mutations_with_batching(
|
||||
// function here
|
||||
start_operation_func_t start_operation_func,
|
||||
std::function<::service::mutations_generator(api::timestamp_type t)> gen,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout);
|
||||
|
||||
// Execute update query via group0 mechanism, mutations will be applied on all nodes.
|
||||
@@ -97,7 +97,7 @@ future<> announce_mutations(
|
||||
::service::raft_group0_client& group0_client,
|
||||
const sstring query_string,
|
||||
std::vector<data_value_or_unset> values,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout);
|
||||
|
||||
// Appends mutations to a collector, they will be applied later on all nodes via group0 mechanism.
|
||||
|
||||
@@ -136,7 +136,7 @@ future<> password_authenticator::create_default_if_missing() {
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
} else {
|
||||
co_await announce_mutations(_qp, _group0_client, query,
|
||||
{salted_pwd, _superuser}, &_as, ::service::raft_timeout{});
|
||||
{salted_pwd, _superuser}, _as, ::service::raft_timeout{});
|
||||
plogger.info("Created default superuser authentication record.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,7 +681,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
co_await announce_mutations_with_batching(g0,
|
||||
start_operation_func,
|
||||
std::move(gen),
|
||||
&as,
|
||||
as,
|
||||
std::nullopt);
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ future<> standard_role_manager::create_default_role_if_missing() {
|
||||
{_superuser},
|
||||
cql3::query_processor::cache_internal::no).discard_result();
|
||||
} else {
|
||||
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, &_as, ::service::raft_timeout{});
|
||||
co_await announce_mutations(_qp, _group0_client, query, {_superuser}, _as, ::service::raft_timeout{});
|
||||
}
|
||||
log.info("Created default superuser role '{}'.", _superuser);
|
||||
} catch(const exceptions::unavailable_exception& e) {
|
||||
|
||||
@@ -55,6 +55,7 @@ Make sure that administrative operations will not be running while upgrade is in
|
||||
In particular, you must abstain from:
|
||||
|
||||
* :doc:`Cluster management procedures </operating-scylla/procedures/cluster-management/index>` (adding, replacing, removing, decommissioning nodes etc.).
|
||||
* :doc:`Version upgrade procedures </upgrade/index>`. You must ensure that all Scylla nodes are running the same version before proceeding.
|
||||
* Running :doc:`nodetool repair </operating-scylla/nodetool-commands/repair>`.
|
||||
* Running :doc:`nodetool checkAndRepairCdcStreams </operating-scylla/nodetool-commands/checkandrepaircdcstreams>`.
|
||||
* Any modifications of :doc:`authentication </operating-scylla/security/authentication>` and :doc:`authorization </operating-scylla/security/enable-authorization>` settings.
|
||||
|
||||
@@ -906,7 +906,7 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
|
||||
},
|
||||
guard, std::move(description));
|
||||
|
||||
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &_as);
|
||||
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), _as);
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_without_raft(std::vector<mutation> schema, group0_guard guard) {
|
||||
@@ -993,7 +993,7 @@ future<> migration_manager::announce<topology_change>(std::vector<mutation> sche
|
||||
|
||||
future<group0_guard> migration_manager::start_group0_operation() {
|
||||
assert(this_shard_id() == 0);
|
||||
return _group0_client.start_operation(&_as, raft_timeout{});
|
||||
return _group0_client.start_operation(_as, raft_timeout{});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
#pragma once
|
||||
#include "seastar/core/semaphore.hh"
|
||||
#include "service/paxos/proposal.hh"
|
||||
#include "log.hh"
|
||||
#include "utils/digest_algorithm.hh"
|
||||
@@ -31,6 +32,7 @@ private:
|
||||
|
||||
class key_lock_map {
|
||||
using semaphore = basic_semaphore<semaphore_default_exception_factory, clock_type>;
|
||||
using semaphore_units = semaphore_units<semaphore_default_exception_factory, clock_type>;
|
||||
using map = std::unordered_map<dht::token, semaphore>;
|
||||
|
||||
semaphore& get_semaphore_for_key(const dht::token& key);
|
||||
@@ -46,22 +48,15 @@ private:
|
||||
key_lock_map& _map;
|
||||
dht::token _key;
|
||||
clock_type::time_point _timeout;
|
||||
bool _locked = false;
|
||||
key_lock_map::semaphore_units _units;
|
||||
public:
|
||||
future<> lock() {
|
||||
auto f = _map.get_semaphore_for_key(_key).wait(_timeout, 1);
|
||||
_locked = true;
|
||||
return f;
|
||||
future<> lock () {
|
||||
return get_units(_map.get_semaphore_for_key(_key), 1, _timeout).then([this] (auto&& u) { _units = std::move(u); });
|
||||
}
|
||||
guard(key_lock_map& map, const dht::token& key, clock_type::time_point timeout) : _map(map), _key(key), _timeout(timeout) {};
|
||||
guard(guard&& o) noexcept : _map(o._map), _key(std::move(o._key)), _timeout(o._timeout), _locked(o._locked) {
|
||||
o._locked = false;
|
||||
}
|
||||
guard(guard&& o) = default;
|
||||
~guard() {
|
||||
if (_locked) {
|
||||
_map.get_semaphore_for_key(_key).signal(1);
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
_map.release_semaphore_for_key(_key);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -519,7 +519,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
|
||||
val_binders_str += ", ?";
|
||||
}
|
||||
|
||||
auto guard = co_await group0_client.start_operation(&as);
|
||||
auto guard = co_await group0_client.start_operation(as);
|
||||
|
||||
std::vector<mutation> migration_muts;
|
||||
for (const auto& row: *rows) {
|
||||
@@ -554,7 +554,7 @@ future<> service_level_controller::migrate_to_v2(size_t nodes_count, db::system_
|
||||
.mutations{migration_muts.begin(), migration_muts.end()},
|
||||
};
|
||||
auto group0_cmd = group0_client.prepare_command(change, guard, "migrate service levels to v2");
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as);
|
||||
co_await group0_client.add_entry(std::move(group0_cmd), std::move(guard), as);
|
||||
}
|
||||
|
||||
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
|
||||
|
||||
@@ -155,7 +155,7 @@ semaphore& raft_group0_client::operation_mutex() {
|
||||
return _operation_mutex;
|
||||
}
|
||||
|
||||
future<> raft_group0_client::add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as,
|
||||
future<> raft_group0_client::add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source& as,
|
||||
std::optional<raft_timeout> timeout)
|
||||
{
|
||||
if (this_shard_id() != 0) {
|
||||
@@ -239,7 +239,7 @@ static utils::UUID generate_group0_state_id(utils::UUID prev_state_id) {
|
||||
return utils::UUID_gen::get_random_time_UUID_from_micros(std::chrono::microseconds{ts});
|
||||
}
|
||||
|
||||
future<group0_guard> raft_group0_client::start_operation(seastar::abort_source* as, std::optional<raft_timeout> timeout) {
|
||||
future<group0_guard> raft_group0_client::start_operation(seastar::abort_source& as, std::optional<raft_timeout> timeout) {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(logger, "start_group0_operation: must run on shard 0");
|
||||
}
|
||||
@@ -251,12 +251,12 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source*
|
||||
auto [upgrade_lock_holder, upgrade_state] = co_await get_group0_upgrade_state();
|
||||
switch (upgrade_state) {
|
||||
case group0_upgrade_state::use_post_raft_procedures: {
|
||||
auto operation_holder = co_await get_units(_operation_mutex, 1);
|
||||
co_await _raft_gr.group0_with_timeouts().read_barrier(as, timeout);
|
||||
auto operation_holder = co_await get_units(_operation_mutex, 1, as);
|
||||
co_await _raft_gr.group0_with_timeouts().read_barrier(&as, timeout);
|
||||
|
||||
// Take `_group0_read_apply_mutex` *after* read barrier.
|
||||
// Read barrier may wait for `group0_state_machine::apply` which also takes this mutex.
|
||||
auto read_apply_holder = co_await hold_read_apply_mutex();
|
||||
auto read_apply_holder = co_await hold_read_apply_mutex(as);
|
||||
|
||||
auto observed_group0_state_id = co_await _sys_ks.get_last_group0_state_id();
|
||||
auto new_group0_state_id = generate_group0_state_id(observed_group0_state_id);
|
||||
@@ -546,7 +546,7 @@ static future<> add_write_mutations_entry(
|
||||
std::string_view description,
|
||||
std::vector<canonical_mutation> muts,
|
||||
::service::group0_guard group0_guard,
|
||||
seastar::abort_source* as,
|
||||
seastar::abort_source& as,
|
||||
std::optional<::service::raft_timeout> timeout) {
|
||||
logger.trace("add_write_mutations_entry: {} mutations with description {}",
|
||||
muts.size(), description);
|
||||
@@ -582,7 +582,7 @@ future<> group0_batch::commit(::service::raft_group0_client& group0_client, seas
|
||||
// when producer expects substantial number or size of mutations it should use generator
|
||||
if (_generators.size() == 0) {
|
||||
std::vector<canonical_mutation> cmuts = {_muts.begin(), _muts.end()};
|
||||
co_return co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), &as, timeout);
|
||||
co_return co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), as, timeout);
|
||||
}
|
||||
// raft doesn't support streaming so we need to materialize all mutations in memory
|
||||
co_await materialize_mutations();
|
||||
@@ -591,7 +591,7 @@ future<> group0_batch::commit(::service::raft_group0_client& group0_client, seas
|
||||
}
|
||||
std::vector<canonical_mutation> cmuts = {_muts.begin(), _muts.end()};
|
||||
_muts.clear();
|
||||
co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), &as, timeout);
|
||||
co_await add_write_mutations_entry(group0_client, description, std::move(cmuts), std::move(*_guard), as, timeout);
|
||||
}
|
||||
|
||||
future<std::pair<std::vector<mutation>, ::service::group0_guard>> group0_batch::extract() && {
|
||||
|
||||
@@ -109,7 +109,7 @@ public:
|
||||
// Call after `system_keyspace` is initialized.
|
||||
future<> init();
|
||||
|
||||
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source* as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
future<> add_entry(group0_command group0_cmd, group0_guard guard, seastar::abort_source& as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
|
||||
future<> add_entry_unguarded(group0_command group0_cmd, seastar::abort_source* as);
|
||||
|
||||
@@ -133,7 +133,7 @@ public:
|
||||
// FIXME?: this is kind of annoying for the user.
|
||||
// we could forward the call to shard 0, have group0_guard keep a foreign_ptr to the internal data structures on shard 0,
|
||||
// and add_entry would again forward to shard 0.
|
||||
future<group0_guard> start_operation(seastar::abort_source* as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
future<group0_guard> start_operation(seastar::abort_source& as, std::optional<raft_timeout> timeout = std::nullopt);
|
||||
|
||||
template<typename Command>
|
||||
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>
|
||||
|
||||
@@ -514,11 +514,11 @@ raft_server_with_timeouts::run_with_timeout(Op&& op, const char* op_name,
|
||||
}
|
||||
|
||||
future<> raft_server_with_timeouts::add_entry(raft::command command, raft::wait_type type,
|
||||
seastar::abort_source* as, std::optional<raft_timeout> timeout)
|
||||
seastar::abort_source& as, std::optional<raft_timeout> timeout)
|
||||
{
|
||||
return run_with_timeout([&](abort_source* as) {
|
||||
return _group_server.server->add_entry(std::move(command), type, as);
|
||||
}, "add_entry", as, timeout);
|
||||
}, "add_entry", &as, timeout);
|
||||
}
|
||||
|
||||
future<> raft_server_with_timeouts::modify_config(std::vector<raft::config_member> add, std::vector<raft::server_id> del,
|
||||
|
||||
@@ -92,7 +92,7 @@ class raft_server_with_timeouts {
|
||||
run_with_timeout(Op&& op, const char* op_name, seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
public:
|
||||
raft_server_with_timeouts(raft_server_for_group& group_server, raft_group_registry& registry);
|
||||
future<> add_entry(raft::command command, raft::wait_type type, seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
future<> add_entry(raft::command command, raft::wait_type type, seastar::abort_source& as, std::optional<raft_timeout> timeout);
|
||||
future<> modify_config(std::vector<raft::config_member> add, std::vector<raft::server_id> del, seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
future<bool> trigger_snapshot(seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
future<> read_barrier(seastar::abort_source* as, std::optional<raft_timeout> timeout);
|
||||
|
||||
@@ -6283,6 +6283,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
|
||||
auto l = co_await paxos::paxos_state::get_cas_lock(token, write_timeout);
|
||||
|
||||
co_await utils::get_local_injector().inject("cas_timeout_after_lock", write_timeout + std::chrono::milliseconds(100));
|
||||
|
||||
while (true) {
|
||||
// Finish the previous PAXOS round, if any, and, as a side effect, compute
|
||||
// a ballot (round identifier) which is a) unique b) has good chances of being
|
||||
|
||||
@@ -1019,7 +1019,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
|
||||
{
|
||||
// The scope for the guard
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
auto me = _topology_state_machine._topology.find(server.id());
|
||||
// Recheck that cleanup is needed after the barrier
|
||||
if (!me || me->second.cleanup != cleanup_status::running) {
|
||||
@@ -1056,7 +1056,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
rtlogger.info("cleanup ended");
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
builder.with_node(server.id()).set("cleanup_status", cleanup_status::clean);
|
||||
|
||||
@@ -1064,7 +1064,7 @@ future<> storage_service::sstable_cleanup_fiber(raft::server& server, sharded<se
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup completed for {}", server.id()));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("cleanup flag clearing: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -1281,7 +1281,7 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
|
||||
}
|
||||
|
||||
rtlogger.info("adding myself as the first node to the topology");
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto insert_join_request_mutations = build_mutation_from_join_params(params, guard);
|
||||
|
||||
@@ -1304,7 +1304,7 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
"bootstrap: adding myself as the first node to the topology");
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("bootstrap: concurrent operation is detected, retrying.");
|
||||
}
|
||||
@@ -1353,7 +1353,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
while (true) {
|
||||
rtlogger.info("refreshing topology to check if it's synchronized with local metadata");
|
||||
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
if (synchronized()) {
|
||||
break;
|
||||
@@ -1391,7 +1391,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
std::move(change), guard, ::format("{}: update topology with local metadata", raft_server.id()));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("update topology with local metadata:"
|
||||
" concurrent operation is detected, retrying.");
|
||||
@@ -1428,7 +1428,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) {
|
||||
co_return;
|
||||
@@ -1441,7 +1441,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "upgrade: start");
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("upgrade: concurrent operation is detected, retrying.");
|
||||
@@ -1782,6 +1782,8 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
|
||||
set_mode(mode::JOINING);
|
||||
|
||||
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
|
||||
|
||||
if (raft_server) { // Raft is enabled. Check if we need to bootstrap ourself using raft
|
||||
rtlogger.info("topology changes are using raft");
|
||||
|
||||
@@ -3565,7 +3567,7 @@ future<> storage_service::raft_decommission() {
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
if (!it) {
|
||||
@@ -3595,7 +3597,7 @@ future<> storage_service::raft_decommission() {
|
||||
|
||||
request_id = guard.new_group0_state_id();
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("decommission: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -3820,6 +3822,8 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
// Step 3: Prepare to sync data
|
||||
ctl.prepare(node_ops_cmd::bootstrap_prepare).get();
|
||||
|
||||
utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20)).get();
|
||||
|
||||
// Step 5: Sync data for bootstrap
|
||||
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();
|
||||
on_streaming_finished();
|
||||
@@ -3898,7 +3902,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto it = _topology_state_machine._topology.find(id);
|
||||
|
||||
@@ -3955,7 +3959,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
|
||||
try {
|
||||
// Make non voter during request submission for better HA
|
||||
co_await _group0->make_nonvoters(ignored_ids, _group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("removenode: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -4529,7 +4533,7 @@ future<> storage_service::do_cluster_cleanup() {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (curr_req && *curr_req != global_topology_request::cleanup) {
|
||||
@@ -4557,7 +4561,7 @@ future<> storage_service::do_cluster_cleanup() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("cleanup: cluster cleanup requested"));
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("cleanup: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -4587,11 +4591,11 @@ future<sstring> storage_service::wait_for_topology_request_completion(utils::UUI
|
||||
}
|
||||
|
||||
future<> storage_service::wait_for_topology_not_busy() {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
release_guard(std::move(guard));
|
||||
co_await _topology_state_machine.event.wait();
|
||||
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4600,7 +4604,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
utils::UUID request_id;
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
auto it = _topology_state_machine._topology.find(raft_server.id());
|
||||
if (!it) {
|
||||
@@ -4633,7 +4637,7 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
request_id = guard.new_group0_state_id();
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("rebuild: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -4653,7 +4657,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
|
||||
while (true) {
|
||||
rtlogger.info("request check_and_repair_cdc_streams, refreshing topology");
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
|
||||
// FIXME: replace this with a queue
|
||||
@@ -4679,7 +4683,7 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("request check+repair CDC: concurrent operation is detected, retrying.");
|
||||
continue;
|
||||
@@ -5251,7 +5255,7 @@ future<> storage_service::process_tablet_split_candidate(table_id table) {
|
||||
while (!_async_gate.is_closed() && !_group0_as.abort_requested()) {
|
||||
try {
|
||||
// Ensures that latest changes to tablet metadata, in group0, are visible
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as);
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as);
|
||||
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
|
||||
if (!tmap.needs_split()) {
|
||||
release_guard(std::move(guard));
|
||||
@@ -5497,6 +5501,8 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state
|
||||
co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> {
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) {
|
||||
co_await utils::get_local_injector().inject("delay_bootstrap_20s", std::chrono::seconds(20));
|
||||
|
||||
co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens);
|
||||
} else {
|
||||
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_token_metadata_ptr()->get_my_id(),
|
||||
@@ -6193,7 +6199,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
|
||||
|
||||
future<> storage_service::transit_tablet(table_id table, dht::token token, noncopyable_function<std::tuple<std::vector<canonical_mutation>, sstring>(const locator::tablet_map&, api::timestamp_type)> prepare_mutations) {
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
const auto tstate = *_topology_state_machine._topology.tstate;
|
||||
@@ -6204,7 +6210,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco
|
||||
rtlogger.debug("transit_tablet(): topology state machine is busy: {}", tstate);
|
||||
release_guard(std::move(guard));
|
||||
co_await _topology_state_machine.event.wait();
|
||||
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
}
|
||||
|
||||
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
|
||||
@@ -6226,7 +6232,7 @@ future<> storage_service::transit_tablet(table_id table, dht::token token, nonco
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("transit_tablet(): concurrent modification, retrying");
|
||||
@@ -6251,7 +6257,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
}
|
||||
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
@@ -6263,7 +6269,7 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
|
||||
@@ -6404,7 +6410,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
if (const auto *p = _topology_state_machine._topology.find(params.host_id)) {
|
||||
const auto& rs = p->second;
|
||||
@@ -6458,7 +6464,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
try {
|
||||
// Make replaced node and ignored nodes non voters earlier for better HA
|
||||
co_await _group0->make_nonvoters(ignored_nodes_from_join_params(params), _group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as, raft_timeout{});
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("join_node_request: concurrent operation is detected, retrying.");
|
||||
|
||||
@@ -294,7 +294,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
};
|
||||
|
||||
future<group0_guard> start_operation() {
|
||||
auto guard = co_await _group0.client().start_operation(&_as);
|
||||
auto guard = co_await _group0.client().start_operation(_as);
|
||||
|
||||
if (_term != _raft.get_current_term()) {
|
||||
throw term_changed_error{};
|
||||
@@ -337,7 +337,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtlogger.trace("update_topology_state mutations: {}", updates);
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("race while changing state: {}. Retrying", reason);
|
||||
throw;
|
||||
@@ -829,7 +829,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
mixed_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("handle_global_request(): concurrent modification, retrying");
|
||||
@@ -2604,7 +2604,7 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
|
||||
if (auth_version < db::system_keyspace::auth_version_t::v2) {
|
||||
rtlogger.info("migrating system_auth keyspace data");
|
||||
co_await auth::migrate_to_auth_v2(_sys_ks, _group0.client(),
|
||||
[this] (abort_source*) { return start_operation();}, _as);
|
||||
[this] (abort_source&) { return start_operation();}, _as);
|
||||
}
|
||||
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
|
||||
@@ -2986,6 +2986,7 @@ sstable::unlink(storage::sync_dir sync) noexcept {
|
||||
|
||||
co_await std::move(remove_fut);
|
||||
_stats.on_delete();
|
||||
_manager.on_unlink(this);
|
||||
}
|
||||
|
||||
thread_local sstables_stats::stats sstables_stats::_shard_stats;
|
||||
|
||||
@@ -323,6 +323,11 @@ void sstables_manager::validate_new_keyspace_storage_options(const data_dictiona
|
||||
}, so.value);
|
||||
}
|
||||
|
||||
void sstables_manager::on_unlink(sstable* sst) {
|
||||
// Remove the sst from manager's reclaimed list to prevent any attempts to reload its components.
|
||||
_reclaimed.erase(*sst);
|
||||
}
|
||||
|
||||
sstables_registry::~sstables_registry() = default;
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -188,6 +188,9 @@ public:
|
||||
|
||||
void validate_new_keyspace_storage_options(const data_dictionary::storage_options&);
|
||||
|
||||
// To be called by the sstable to signal its unlinking
|
||||
void on_unlink(sstable* sst);
|
||||
|
||||
private:
|
||||
void add(sstable* sst);
|
||||
// Transition the sstable to the "inactive" state. It has no
|
||||
|
||||
126
test/alternator/test_cql_rbac.py
Normal file
126
test/alternator/test_cql_rbac.py
Normal file
@@ -0,0 +1,126 @@
|
||||
# Copyright 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
# Tests for how CQL's Role-Based Access Control (RBAC) commands - CREATE ROLE,
|
||||
# GRANT, REVOKE, etc., can be used on Alternator for authentication and for
|
||||
# authorization. For example if the low-level name of an Alternator table "x"
|
||||
# is alternator_x.x, and a certain user is not granted permission to "modify"
|
||||
# keyspace alternator_x, Alternator write requests (PutItem, UpdateItem,
|
||||
# DeleteItem, BatchWriteItem) by that user will be denied.
|
||||
#
|
||||
# Because this file is all about testing the Scylla-only CQL-based RBAC,
|
||||
# all tests in this file are skipped when running against Amazon DynamoDB.
|
||||
|
||||
import pytest
|
||||
import boto3
|
||||
from botocore.exceptions import ClientError
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from test.alternator.util import is_aws, unique_table_name
|
||||
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from cassandra.cluster import Cluster, ConsistencyLevel, ExecutionProfile, EXEC_PROFILE_DEFAULT, NoHostAvailable
|
||||
from cassandra.policies import RoundRobinPolicy
|
||||
import re
|
||||
|
||||
# This file is all about testing RBAC as configured via CQL, so we need to
|
||||
# connect to CQL to set these tests up. The "cql" fixture below enables that.
|
||||
# If we're not testing Scylla, or the CQL port is not available on the same
|
||||
# IP address as the Alternator IP address, a test using this fixture will
|
||||
# be skipped with a message about the CQL API not being available.
|
||||
@pytest.fixture(scope="module")
|
||||
def cql(dynamodb):
|
||||
if is_aws(dynamodb):
|
||||
pytest.skip('Scylla-only CQL API not supported by AWS')
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
host, = re.search(r'.*://([^:]*):', url).groups()
|
||||
profile = ExecutionProfile(
|
||||
load_balancing_policy=RoundRobinPolicy(),
|
||||
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
||||
serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL)
|
||||
cluster = Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
|
||||
contact_points=[host],
|
||||
port=9042,
|
||||
protocol_version=4,
|
||||
auth_provider=PlainTextAuthProvider(username='cassandra', password='cassandra'),
|
||||
)
|
||||
try:
|
||||
ret = cluster.connect()
|
||||
# "BEGIN BATCH APPLY BATCH" is the closest to do-nothing I could find
|
||||
ret.execute("BEGIN BATCH APPLY BATCH")
|
||||
except NoHostAvailable:
|
||||
pytest.skip('Could not connect to Scylla-only CQL API')
|
||||
yield ret
|
||||
cluster.shutdown()
|
||||
|
||||
# new_role() is a context manager for temporarily creating a new role with
|
||||
# a unique name and returning its name and the secret key needed to connect
|
||||
# to it with the DynamoDB API.
|
||||
# The "login" and "superuser" flags are passed to the CREATE ROLE statement.
|
||||
@contextmanager
|
||||
def new_role(cql, login=True, superuser=False):
|
||||
# The role name is not a table's name but it doesn't matter. Because our
|
||||
# unique_table_name() uses (deliberately) a non-lower-case character, the
|
||||
# role name has to be quoted in double quotes when used in CQL below.
|
||||
role = unique_table_name()
|
||||
# The password set for the new role is identical to the user name (not
|
||||
# very secure ;-)) - but we later need to retrieve the "salted hash" of
|
||||
# this password, which serves in Alternator as the secret key of the role.
|
||||
cql.execute(f"CREATE ROLE \"{role}\" WITH PASSWORD = '{role}' AND SUPERUSER = {superuser} AND LOGIN = {login}")
|
||||
# Newer Scylla places the "roles" table in the "system" keyspace, but
|
||||
# older versions used "system_auth_v2" or "system_auth"
|
||||
key = None
|
||||
for ks in ['system', 'system_auth_v2', 'system_auth']:
|
||||
try:
|
||||
e = list(cql.execute(f"SELECT salted_hash FROM {ks}.roles WHERE role = '{role}'"))
|
||||
if e != []:
|
||||
key = e[0].salted_hash
|
||||
if key is not None:
|
||||
break
|
||||
except:
|
||||
pass
|
||||
assert key is not None
|
||||
try:
|
||||
yield (role, key)
|
||||
finally:
|
||||
cql.execute(f'DROP ROLE "{role}"')
|
||||
|
||||
# Create a new DynamoDB API resource (connection object) similar to the
|
||||
# existing "dynamodb" resource - but authenticating with the given role
|
||||
# and key.
|
||||
@contextmanager
|
||||
def new_dynamodb(dynamodb, role, key):
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
config = dynamodb.meta.client._client_config
|
||||
verify = not url.startswith('https')
|
||||
ret = boto3.resource('dynamodb', endpoint_url=url, verify=verify,
|
||||
aws_access_key_id=role, aws_secret_access_key=key,
|
||||
region_name='us-east-1', config=config)
|
||||
try:
|
||||
yield ret
|
||||
finally:
|
||||
ret.meta.client.close()
|
||||
|
||||
# A basic test for creating a new role. The ListTables operation is allowed
|
||||
# to any role, so it should work in the new role when given the right password
|
||||
# and fail with the wrong password.
|
||||
def test_new_role(dynamodb, cql):
|
||||
with new_role(cql) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
# ListTables should not fail (we don't care what is the result)
|
||||
d.meta.client.list_tables()
|
||||
# Trying to use the wrong key for the new role should fail to perform
|
||||
# any request. The new_dynamodb() function can't detect the error,
|
||||
# it is detected when attempting to perform a request with it.
|
||||
with new_dynamodb(dynamodb, role, 'wrongkey') as d:
|
||||
with pytest.raises(ClientError, match='UnrecognizedClientException'):
|
||||
d.meta.client.list_tables()
|
||||
|
||||
# A role without "login" permissions cannot be used to authenticate requests.
|
||||
# Reproduces #19735.
|
||||
def test_login_false(dynamodb, cql):
|
||||
with new_role(cql, login=False) as (role, key):
|
||||
with new_dynamodb(dynamodb, role, key) as d:
|
||||
with pytest.raises(ClientError, match='UnrecognizedClientException.*login=false'):
|
||||
d.meta.client.list_tables()
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
#include "utils/bloom_filter.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components_and_reload_reclaimed_components) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
@@ -52,6 +53,11 @@ std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env
|
||||
return {sst, sst_bf_memory};
|
||||
}
|
||||
|
||||
void dispose_and_stop_tracking_bf_memory(shared_sstable&& sst, test_env_sstables_manager& mgr) {
|
||||
mgr.remove_sst_from_reclaimed(sst.get());
|
||||
shared_sstable::dispose(sst.release().release());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
@@ -89,7 +95,7 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter)
|
||||
|
||||
// Test auto reload - disposing sst3 should trigger reload of the
|
||||
// smallest filter in the reclaimed list, which is sst1's bloom filter.
|
||||
shared_sstable::dispose(sst3.release().release());
|
||||
dispose_and_stop_tracking_bf_memory(std::move(sst3), sst_mgr);
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
|
||||
// only sst4's bloom filter memory should be reported as reclaimed
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst4_bf_memory);
|
||||
@@ -154,7 +160,7 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
|
||||
utils::get_local_injector().enable("reload_reclaimed_components/pause", true);
|
||||
|
||||
// dispose sst2 to trigger reload of sst1's bloom filter
|
||||
shared_sstable::dispose(sst2.release().release());
|
||||
dispose_and_stop_tracking_bf_memory(std::move(sst2), sst_mgr);
|
||||
// _total_reclaimable_memory will be updated when the reload begins; wait for it.
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
|
||||
|
||||
@@ -223,3 +229,57 @@ SEASTAR_TEST_CASE(test_bloom_filters_with_bad_partition_estimate) {
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
SEASTAR_TEST_CASE(test_bloom_filter_reload_after_unlink) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
||||
return;
|
||||
#endif
|
||||
simple_schema ss;
|
||||
auto schema = ss.schema();
|
||||
|
||||
auto mut = mutation(schema, ss.make_pkey(1));
|
||||
mut.partition().apply_insert(*schema, ss.make_ckey(1), ss.new_timestamp());
|
||||
|
||||
// bloom filter will be reclaimed automatically due to low memory
|
||||
auto sst = make_sstable_containing(env.make_sstable(schema), {mut});
|
||||
auto& sst_mgr = env.manager();
|
||||
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0);
|
||||
auto memory_reclaimed = sst_mgr.get_total_memory_reclaimed();
|
||||
|
||||
// manager's reclaimed set has the sst now
|
||||
auto& reclaimed_set = sst_mgr.get_reclaimed_set();
|
||||
BOOST_REQUIRE_EQUAL(reclaimed_set.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(reclaimed_set.begin()->get_filename(), sst->get_filename());
|
||||
|
||||
// hold a copy of shared sst object in async thread to test reload after unlink
|
||||
utils::get_local_injector().enable("test_bloom_filter_reload_after_unlink");
|
||||
auto async_sst_holder = seastar::async([sst] {
|
||||
// do nothing just hold a copy of sst and wait for message signalling test completion
|
||||
utils::get_local_injector().inject("test_bloom_filter_reload_after_unlink", [] (auto& handler) {
|
||||
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
|
||||
return ret;
|
||||
}).get();
|
||||
});
|
||||
|
||||
// unlink the sst and release the object
|
||||
sst->unlink().get();
|
||||
sst.release();
|
||||
|
||||
// reclaimed set should be now empty but the total memory reclaimed should
|
||||
// be still the same as the sst object is not deactivated yet due to a copy
|
||||
// being alive in the async thread.
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_reclaimed_set().size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), memory_reclaimed);
|
||||
|
||||
// message async thread to complete waiting and thus release its copy of sst, triggering deactivation
|
||||
utils::get_local_injector().receive_message("test_bloom_filter_reload_after_unlink");
|
||||
async_sst_holder.get();
|
||||
|
||||
REQUIRE_EVENTUALLY_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
|
||||
}, {
|
||||
// set available memory = 0 to force reclaim the bloom filter
|
||||
.available_memory = 0
|
||||
});
|
||||
};
|
||||
|
||||
@@ -252,7 +252,7 @@ SEASTAR_TEST_CASE(test_group0_batch) {
|
||||
};
|
||||
|
||||
auto do_transaction = [&] (std::function<future<>(service::group0_batch&)> f) -> future<> {
|
||||
auto guard = co_await rclient.start_operation(&as);
|
||||
auto guard = co_await rclient.start_operation(as);
|
||||
service::group0_batch mc(std::move(guard));
|
||||
co_await f(mc);
|
||||
co_await std::move(mc).commit(rclient, as, ::service::raft_timeout{});
|
||||
@@ -273,7 +273,7 @@ SEASTAR_TEST_CASE(test_group0_batch) {
|
||||
|
||||
// test extract
|
||||
{
|
||||
auto guard = co_await rclient.start_operation(&as);
|
||||
auto guard = co_await rclient.start_operation(as);
|
||||
service::group0_batch mc(std::move(guard));
|
||||
mc.add_mutation(co_await insert_mut(1, 2));
|
||||
mc.add_generator([&] (api::timestamp_type t) -> ::service::mutations_generator {
|
||||
|
||||
@@ -44,5 +44,7 @@ custom_args:
|
||||
- '-c1 -m256M'
|
||||
commitlog_cleanup_test:
|
||||
- '-c1 -m2G'
|
||||
bloom_filter_test:
|
||||
- '-c1'
|
||||
run_in_debug:
|
||||
- logalloc_standard_allocator_segment_pool_backend_test
|
||||
|
||||
@@ -985,7 +985,7 @@ private:
|
||||
config.is_superuser = true;
|
||||
config.can_login = true;
|
||||
|
||||
auto as = &abort_sources.local();
|
||||
auto& as = abort_sources.local();
|
||||
auto guard = group0_client.start_operation(as).get();
|
||||
service::group0_batch mc{std::move(guard)};
|
||||
auth::create_role(
|
||||
@@ -994,7 +994,7 @@ private:
|
||||
config,
|
||||
auth::authentication_options(),
|
||||
mc).get();
|
||||
std::move(mc).commit(group0_client, *as, ::service::raft_timeout{}).get();
|
||||
std::move(mc).commit(group0_client, as, ::service::raft_timeout{}).get();
|
||||
} catch (const auth::role_already_exists&) {
|
||||
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
|
||||
}
|
||||
@@ -1060,7 +1060,7 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_tes
|
||||
void do_with_mc(cql_test_env& env, std::function<void(service::group0_batch&)> func) {
|
||||
seastar::abort_source as;
|
||||
auto& g0 = env.get_raft_group0_client();
|
||||
auto guard = g0.start_operation(&as).get();
|
||||
auto guard = g0.start_operation(as).get();
|
||||
auto mc = service::group0_batch(std::move(guard));
|
||||
func(mc);
|
||||
std::move(mc).commit(g0, as, std::nullopt).get();
|
||||
|
||||
@@ -57,6 +57,14 @@ public:
|
||||
size_t get_total_reclaimable_memory() {
|
||||
return _total_reclaimable_memory;
|
||||
}
|
||||
|
||||
void remove_sst_from_reclaimed(sstable* sst) {
|
||||
_reclaimed.erase(*sst);
|
||||
}
|
||||
|
||||
auto& get_reclaimed_set() {
|
||||
return _reclaimed;
|
||||
}
|
||||
};
|
||||
|
||||
class test_env_compaction_manager {
|
||||
|
||||
37
test/topology_custom/test_lwt_semaphore.py
Normal file
37
test/topology_custom/test_lwt_semaphore.py
Normal file
@@ -0,0 +1,37 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
import pytest
|
||||
from cassandra.protocol import WriteTimeout
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cas_semaphore(manager):
|
||||
""" This is a regression test for scylladb/scylladb#19698 """
|
||||
servers = await manager.servers_add(1, cmdline=['--smp', '1', '--write-request-timeout-in-ms', '500'])
|
||||
|
||||
host = await wait_for_cql_and_get_hosts(manager.cql, {servers[0]}, time.time() + 60)
|
||||
|
||||
await manager.cql.run_async("CREATE KEYSPACE test WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}")
|
||||
await manager.cql.run_async("CREATE TABLE test.test (a int PRIMARY KEY, b int)")
|
||||
|
||||
async with inject_error(manager.api, servers[0].ip_addr, 'cas_timeout_after_lock'):
|
||||
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
|
||||
try:
|
||||
await asyncio.gather(*res)
|
||||
except WriteTimeout:
|
||||
pass
|
||||
|
||||
res = [manager.cql.run_async(f"INSERT INTO test.test (a) VALUES (0) IF NOT EXISTS", host=host[0]) for r in range(10)]
|
||||
await asyncio.gather(*res)
|
||||
|
||||
metrics = await manager.metrics.query(servers[0].ip_addr)
|
||||
contention = metrics.get(name="scylla_storage_proxy_coordinator_cas_write_contention_count")
|
||||
|
||||
assert contention == None
|
||||
@@ -12,8 +12,8 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import reconnect_driver, enter_recovery_state, wait_for_upgrade_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
from test.topology.util import (delete_raft_data_and_upgrade_state, enter_recovery_state, log_run_time,
|
||||
reconnect_driver, wait_for_upgrade_state, wait_until_upgrade_finishes)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -77,8 +77,7 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {others}")
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
logging.info(f"{others} restarted, waiting until driver reconnects to them")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
@@ -100,11 +99,11 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
logging.info(f"Removing {srv1} using {others[0]}")
|
||||
await manager.remove_node(others[0].server_id, srv1.server_id)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts} and restarting")
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
logging.info(f"Restarting {others}")
|
||||
await manager.rolling_restart(others)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
|
||||
|
||||
@@ -23,6 +23,8 @@ import requests
|
||||
import json
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
from test.topology.conftest import skip_mode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -206,13 +208,102 @@ async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
|
||||
}
|
||||
servers = await manager.servers_add(2, config=config)
|
||||
for server in servers:
|
||||
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
|
||||
response = requests.get(url, verify=False)
|
||||
assert response.ok
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
# We expect /localnodes to return ["1.2.3.4", "1.2.3.4"]
|
||||
# (since we configured both nodes with the same broadcast_rpc_address):
|
||||
assert j == ['1.2.3.4', '1.2.3.4']
|
||||
# (since we configured both nodes with the same broadcast_rpc_address).
|
||||
# We need the retry loop below because the second node might take a
|
||||
# bit of time to bootstrap after coming up, and only then will it
|
||||
# appear on /localnodes (see #19694).
|
||||
url = f"http://{server.ip_addr}:{config['alternator_port']}/localnodes"
|
||||
timeout = time.time() + 10
|
||||
while True:
|
||||
assert time.time() < timeout
|
||||
response = requests.get(url, verify=False)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if j == ['1.2.3.4', '1.2.3.4']:
|
||||
break # done
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_localnodes_drained_node(manager: ManagerClient):
|
||||
"""Test that if in a cluster one node is brought down with "nodetool drain"
|
||||
a "/localnodes" request should NOT return that node. This test does
|
||||
NOT reproduce issue #19694 - a DRAINED node is not considered is_alive()
|
||||
and even before the fix of that issue, "/localnodes" didn't return it.
|
||||
"""
|
||||
# Start a cluster with two nodes and verify that at this point,
|
||||
# "/localnodes" on the first node returns both nodes.
|
||||
# We the retry loop below because the second node might take a
|
||||
# bit of time to bootstrap after coming up, and only then will it
|
||||
# appear on /localnodes (see #19694).
|
||||
servers = await manager.servers_add(2, config=alternator_config)
|
||||
localnodes_request = f"http://{servers[0].ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
||||
async def check_localnodes_two():
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
||||
return True
|
||||
elif set(j).issubset({servers[0].ip_addr, servers[1].ip_addr}):
|
||||
return None # try again
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_localnodes_two, time.time() + 10)
|
||||
# Now "nodetool" drain on the second node, leaving the second node
|
||||
# in DRAINED state.
|
||||
await manager.api.client.post("/storage_service/drain", host=servers[1].ip_addr)
|
||||
# After that, "/localnodes" should no longer return the second node.
|
||||
# It might take a short while until the first node learns what happened
|
||||
# to node 1, so we may need to retry for a while
|
||||
async def check_localnodes_one():
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
if set(j) == {servers[0].ip_addr, servers[1].ip_addr}:
|
||||
return None # try again
|
||||
elif set(j) == {servers[0].ip_addr}:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_localnodes_one, time.time() + 10)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_localnodes_joining_nodes(manager: ManagerClient):
|
||||
"""Test that if a cluster is being enlarged and a node is coming up but
|
||||
not yet responsive, a "/localnodes" request should NOT return that node.
|
||||
Reproduces issue #19694.
|
||||
"""
|
||||
# Start a cluster with one node, and then bring up a second node,
|
||||
# pausing its bootstrap (with an injection) in JOINING state.
|
||||
# We need to start the second node in the background, because server_add()
|
||||
# will wait for the bootstrap to complete - which we don't want to do.
|
||||
server = await manager.server_add(config=alternator_config)
|
||||
task = asyncio.create_task(manager.server_add(config=alternator_config | {'error_injections_at_startup': ['delay_bootstrap_20s']}))
|
||||
# Sleep until the first node knows of the second one as a "live node"
|
||||
# (we check this with the REST API's /gossiper/endpoint/live.
|
||||
async def check_two_live_nodes():
|
||||
j = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
|
||||
if len(j) == 1:
|
||||
return None # try again
|
||||
elif len(j) == 2:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
assert await wait_for(check_two_live_nodes, time.time() + 10)
|
||||
|
||||
# At this point the second node is live, but hasn't finished bootstrapping
|
||||
# (we delayed that with the injection). So the "/localnodes" should still
|
||||
# return just one node - not both. Reproduces #19694 (two nodes used to
|
||||
# be returned)
|
||||
localnodes_request = f"http://{server.ip_addr}:{alternator_config['alternator_port']}/localnodes"
|
||||
response = requests.get(localnodes_request)
|
||||
j = json.loads(response.content.decode('utf-8'))
|
||||
assert len(j) == 1
|
||||
# Ending the test here will kill both servers. We don't wait for the
|
||||
# second server to finish its long injection-caused bootstrap delay,
|
||||
# so we don't check here that when the second server finally comes up,
|
||||
# both nodes will finally be visible in /localnodes. This case is checked
|
||||
# in other tests, where bootstrap finishes normally - we don't need to
|
||||
# check this case again here.
|
||||
task.cancel()
|
||||
|
||||
# TODO: add a more thorough test for /localnodes, creating a cluster with
|
||||
# multiple nodes in multiple data centers, and check that we can get a list
|
||||
|
||||
Reference in New Issue
Block a user