Compare commits

..

7 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
6ce6c33b65 test/alternator: fix copyright year to 2026
Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2026-03-03 16:28:02 +00:00
copilot-swe-agent[bot]
fe77675455 test/alternator: rename fixture, split x/y attrs, reorder tests, fix index naming check
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-03 15:46:23 +00:00
copilot-swe-agent[bot]
2fd5383bd0 test/alternator: add GSI/LSI naming and index key encoding tests
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-03 15:28:34 +00:00
copilot-swe-agent[bot]
c7969f7a46 test/alternator: minor cleanup in test_encoding.py per review
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-03 15:16:11 +00:00
copilot-swe-agent[bot]
2c17c90825 test/alternator: address review feedback on test_encoding.py
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-03 15:00:56 +00:00
copilot-swe-agent[bot]
e00dbfa334 test/alternator: add test_encoding.py to test Alternator's on-disk data encoding
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-03 11:07:10 +00:00
copilot-swe-agent[bot]
744034eec6 Initial plan 2026-03-03 10:51:13 +00:00
211 changed files with 1813 additions and 6963 deletions

View File

@@ -9,6 +9,6 @@ jobs:
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
with:
# Comma-separated list of Jira project keys
jira_project_keys: "SCYLLADB,CUSTOMER,SMI,RELENG"
jira_project_keys: "SCYLLADB,CUSTOMER,SMI"
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -14,20 +14,14 @@ jobs:
steps:
- name: Verify Org Membership
id: verify_author
env:
EVENT_NAME: ${{ github.event_name }}
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
PR_ASSOCIATION: ${{ github.event.pull_request.author_association }}
COMMENT_AUTHOR: ${{ github.event.comment.user.login }}
COMMENT_ASSOCIATION: ${{ github.event.comment.author_association }}
shell: bash
run: |
if [[ "$EVENT_NAME" == "pull_request_target" ]]; then
AUTHOR="$PR_AUTHOR"
ASSOCIATION="$PR_ASSOCIATION"
if [[ "${{ github.event_name }}" == "pull_request_target" ]]; then
AUTHOR="${{ github.event.pull_request.user.login }}"
ASSOCIATION="${{ github.event.pull_request.author_association }}"
else
AUTHOR="$COMMENT_AUTHOR"
ASSOCIATION="$COMMENT_ASSOCIATION"
AUTHOR="${{ github.event.comment.user.login }}"
ASSOCIATION="${{ github.event.comment.author_association }}"
fi
if [[ "$ASSOCIATION" == "MEMBER" || "$ASSOCIATION" == "OWNER" ]]; then
echo "member=true" >> $GITHUB_OUTPUT
@@ -39,11 +33,13 @@ jobs:
- name: Validate Comment Trigger
if: github.event_name == 'issue_comment'
id: verify_comment
env:
COMMENT_BODY: ${{ github.event.comment.body }}
shell: bash
run: |
CLEAN_BODY=$(echo "$COMMENT_BODY" | grep -v '^[[:space:]]*>')
BODY=$(cat << 'EOF'
${{ github.event.comment.body }}
EOF
)
CLEAN_BODY=$(echo "$BODY" | grep -v '^[[:space:]]*>')
if echo "$CLEAN_BODY" | grep -qi '@scylladbbot' && echo "$CLEAN_BODY" | grep -qi 'trigger-ci'; then
echo "trigger=true" >> $GITHUB_OUTPUT

View File

@@ -25,7 +25,6 @@ target_sources(scylla_auth
service.cc
standard_role_manager.cc
transitional.cc
maintenance_socket_authenticator.cc
maintenance_socket_role_manager.cc)
target_include_directories(scylla_auth
PUBLIC

View File

@@ -9,9 +9,19 @@
#include "auth/allow_all_authenticator.hh"
#include "service/migration_manager.hh"
#include "utils/class_registrator.hh"
namespace auth {
constexpr std::string_view allow_all_authenticator_name("org.apache.cassandra.auth.AllowAllAuthenticator");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authenticator,
allow_all_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -9,9 +9,18 @@
#include "auth/allow_all_authorizer.hh"
#include "auth/common.hh"
#include "utils/class_registrator.hh"
namespace auth {
constexpr std::string_view allow_all_authorizer_name("org.apache.cassandra.auth.AllowAllAuthorizer");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authorizer,
allow_all_authorizer,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthorizer");
}

View File

@@ -13,11 +13,14 @@
#include <boost/regex.hpp>
#include <fmt/ranges.h>
#include "utils/class_registrator.hh"
#include "utils/to_string.hh"
#include "data_dictionary/data_dictionary.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
static const auto CERT_AUTH_NAME = "com.scylladb.auth.CertificateAuthenticator";
const std::string_view auth::certificate_authenticator_name(CERT_AUTH_NAME);
static logging::logger clogger("certificate_authenticator");
@@ -27,6 +30,13 @@ static const std::string cfg_query_attr = "query";
static const std::string cfg_source_subject = "SUBJECT";
static const std::string cfg_source_altname = "ALTNAME";
static const class_registrator<auth::authenticator
, auth::certificate_authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
@@ -89,7 +99,7 @@ future<> auth::certificate_authenticator::stop() {
}
std::string_view auth::certificate_authenticator::qualified_java_name() const {
return "com.scylladb.auth.CertificateAuthenticator";
return certificate_authenticator_name;
}
bool auth::certificate_authenticator::require_authentication() const {

View File

@@ -27,6 +27,8 @@ namespace auth {
class cache;
extern const std::string_view certificate_authenticator_name;
class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;

View File

@@ -26,6 +26,7 @@ extern "C" {
#include "cql3/untyped_result_set.hh"
#include "exceptions/exceptions.hh"
#include "utils/log.hh"
#include "utils/class_registrator.hh"
namespace auth {
@@ -39,6 +40,14 @@ static constexpr std::string_view PERMISSIONS_NAME = "permissions";
static logging::logger alogger("default_authorizer");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authorizer,
default_authorizer,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.CassandraAuthorizer");
default_authorizer::default_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
, _migration_manager(mm) {

View File

@@ -24,6 +24,7 @@
#include "exceptions/exceptions.hh"
#include "seastarx.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
#include "db/config.hh"
#include "utils/exponential_backoff_retry.hh"
@@ -71,10 +72,20 @@ std::vector<sstring> get_attr_values(LDAP* ld, LDAPMessage* res, const char* att
return values;
}
const char* ldap_role_manager_full_name = "com.scylladb.auth.LDAPRoleManager";
} // anonymous namespace
namespace auth {
static const class_registrator<
role_manager,
ldap_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(ldap_role_manager_full_name);
ldap_role_manager::ldap_role_manager(
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
uint32_t permissions_update_interval_in_ms,
@@ -104,7 +115,7 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
}
std::string_view ldap_role_manager::qualified_java_name() const noexcept {
return "com.scylladb.auth.LDAPRoleManager";
return ldap_role_manager_full_name;
}
const resource_set& ldap_role_manager::protected_resources() const {

View File

@@ -57,7 +57,8 @@ class ldap_role_manager : public role_manager {
cache& cache ///< Passed to standard_role_manager.
);
/// Retrieves LDAP configuration entries from qp and invokes the other constructor.
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
/// class_registrator<role_manager>.
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
/// Thrown when query-template parsing fails.

View File

@@ -1,31 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "auth/maintenance_socket_authenticator.hh"
namespace auth {
maintenance_socket_authenticator::~maintenance_socket_authenticator() {
}
future<> maintenance_socket_authenticator::start() {
return make_ready_future<>();
}
future<> maintenance_socket_authenticator::ensure_superuser_is_created() const {
return make_ready_future<>();
}
bool maintenance_socket_authenticator::require_authentication() const {
return false;
}
} // namespace auth

View File

@@ -1,36 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <seastar/core/shared_future.hh>
#include "password_authenticator.hh"
namespace auth {
// maintenance_socket_authenticator is used for clients connecting to the
// maintenance socket. It does not require authentication,
// while still allowing the managing of roles and their credentials.
class maintenance_socket_authenticator : public password_authenticator {
public:
using password_authenticator::password_authenticator;
virtual ~maintenance_socket_authenticator();
virtual future<> start() override;
virtual future<> ensure_superuser_is_created() const override;
bool require_authentication() const override;
};
} // namespace auth

View File

@@ -13,48 +13,23 @@
#include <string_view>
#include "auth/cache.hh"
#include "cql3/description.hh"
#include "utils/log.hh"
#include "utils/on_internal_error.hh"
#include "utils/class_registrator.hh"
namespace auth {
static logging::logger log("maintenance_socket_role_manager");
constexpr std::string_view maintenance_socket_role_manager_name = "com.scylladb.auth.MaintenanceSocketRoleManager";
future<> maintenance_socket_role_manager::ensure_role_operations_are_enabled() {
if (_is_maintenance_mode) {
on_internal_error(log, "enabling role operations not allowed in maintenance mode");
}
static const class_registrator<
role_manager,
maintenance_socket_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(sstring{maintenance_socket_role_manager_name});
if (_std_mgr.has_value()) {
on_internal_error(log, "role operations are already enabled");
}
_std_mgr.emplace(_qp, _group0_client, _migration_manager, _cache);
return _std_mgr->start();
}
void maintenance_socket_role_manager::set_maintenance_mode() {
if (_std_mgr.has_value()) {
on_internal_error(log, "cannot enter maintenance mode after role operations have been enabled");
}
_is_maintenance_mode = true;
}
maintenance_socket_role_manager::maintenance_socket_role_manager(
cql3::query_processor& qp,
::service::raft_group0_client& rg0c,
::service::migration_manager& mm,
cache& c)
: _qp(qp)
, _group0_client(rg0c)
, _migration_manager(mm)
, _cache(c)
, _std_mgr(std::nullopt)
, _is_maintenance_mode(false) {
}
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {
return "com.scylladb.auth.MaintenanceSocketRoleManager";
return maintenance_socket_role_manager_name;
}
const resource_set& maintenance_socket_role_manager::protected_resources() const {
@@ -68,161 +43,81 @@ future<> maintenance_socket_role_manager::start() {
}
future<> maintenance_socket_role_manager::stop() {
return _std_mgr ? _std_mgr->stop() : make_ready_future<>();
}
future<> maintenance_socket_role_manager::ensure_superuser_is_created() {
return _std_mgr ? _std_mgr->ensure_superuser_is_created() : make_ready_future<>();
}
template<typename T = void>
future<T> operation_not_available_in_maintenance_mode_exception(std::string_view operation) {
return make_exception_future<T>(
std::runtime_error(fmt::format("role manager: {} operation not available through maintenance socket in maintenance mode", operation)));
}
template<typename T = void>
future<T> manager_not_ready_exception(std::string_view operation) {
return make_exception_future<T>(
std::runtime_error(fmt::format("role manager: {} operation not available because manager not ready yet (role operations not enabled)", operation)));
}
future<> maintenance_socket_role_manager::validate_operation(std::string_view name) const {
if (_is_maintenance_mode) {
return operation_not_available_in_maintenance_mode_exception(name);
}
if (!_std_mgr) {
return manager_not_ready_exception(name);
}
return make_ready_future<>();
}
future<> maintenance_socket_role_manager::create(std::string_view role_name, const role_config& c, ::service::group0_batch& mc) {
auto f = validate_operation("CREATE");
if (f.failed()) {
return f;
}
return _std_mgr->create(role_name, c, mc);
future<> maintenance_socket_role_manager::ensure_superuser_is_created() {
return make_ready_future<>();
}
template<typename T = void>
future<T> operation_not_supported_exception(std::string_view operation) {
return make_exception_future<T>(
std::runtime_error(fmt::format("role manager: {} operation not supported through maintenance socket", operation)));
}
future<> maintenance_socket_role_manager::create(std::string_view role_name, const role_config&, ::service::group0_batch&) {
return operation_not_supported_exception("CREATE");
}
future<> maintenance_socket_role_manager::drop(std::string_view role_name, ::service::group0_batch& mc) {
auto f = validate_operation("DROP");
if (f.failed()) {
return f;
}
return _std_mgr->drop(role_name, mc);
return operation_not_supported_exception("DROP");
}
future<> maintenance_socket_role_manager::alter(std::string_view role_name, const role_config_update& u, ::service::group0_batch& mc) {
auto f = validate_operation("ALTER");
if (f.failed()) {
return f;
}
return _std_mgr->alter(role_name, u, mc);
future<> maintenance_socket_role_manager::alter(std::string_view role_name, const role_config_update&, ::service::group0_batch&) {
return operation_not_supported_exception("ALTER");
}
future<> maintenance_socket_role_manager::grant(std::string_view grantee_name, std::string_view role_name, ::service::group0_batch& mc) {
auto f = validate_operation("GRANT");
if (f.failed()) {
return f;
}
return _std_mgr->grant(grantee_name, role_name, mc);
return operation_not_supported_exception("GRANT");
}
future<> maintenance_socket_role_manager::revoke(std::string_view revokee_name, std::string_view role_name, ::service::group0_batch& mc) {
auto f = validate_operation("REVOKE");
if (f.failed()) {
return f;
}
return _std_mgr->revoke(revokee_name, role_name, mc);
return operation_not_supported_exception("REVOKE");
}
future<role_set> maintenance_socket_role_manager::query_granted(std::string_view grantee_name, recursive_role_query m) {
auto f = validate_operation("QUERY GRANTED");
if (f.failed()) {
return make_exception_future<role_set>(f.get_exception());
}
return _std_mgr->query_granted(grantee_name, m);
future<role_set> maintenance_socket_role_manager::query_granted(std::string_view grantee_name, recursive_role_query) {
return operation_not_supported_exception<role_set>("QUERY GRANTED");
}
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state& qs) {
auto f = validate_operation("QUERY ALL DIRECTLY GRANTED");
if (f.failed()) {
return make_exception_future<role_to_directly_granted_map>(f.get_exception());
}
return _std_mgr->query_all_directly_granted(qs);
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state&) {
return operation_not_supported_exception<role_to_directly_granted_map>("QUERY ALL DIRECTLY GRANTED");
}
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state& qs) {
auto f = validate_operation("QUERY ALL");
if (f.failed()) {
return make_exception_future<role_set>(f.get_exception());
}
return _std_mgr->query_all(qs);
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state&) {
return operation_not_supported_exception<role_set>("QUERY ALL");
}
future<bool> maintenance_socket_role_manager::exists(std::string_view role_name) {
auto f = validate_operation("EXISTS");
if (f.failed()) {
return make_exception_future<bool>(f.get_exception());
}
return _std_mgr->exists(role_name);
return operation_not_supported_exception<bool>("EXISTS");
}
future<bool> maintenance_socket_role_manager::is_superuser(std::string_view role_name) {
auto f = validate_operation("IS SUPERUSER");
if (f.failed()) {
return make_exception_future<bool>(f.get_exception());
}
return _std_mgr->is_superuser(role_name);
return make_ready_future<bool>(true);
}
future<bool> maintenance_socket_role_manager::can_login(std::string_view role_name) {
auto f = validate_operation("CAN LOGIN");
if (f.failed()) {
return make_exception_future<bool>(f.get_exception());
}
return _std_mgr->can_login(role_name);
return make_ready_future<bool>(true);
}
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
auto f = validate_operation("GET ATTRIBUTE");
if (f.failed()) {
return make_exception_future<std::optional<sstring>>(f.get_exception());
}
return _std_mgr->get_attribute(role_name, attribute_name, qs);
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<std::optional<sstring>>("GET ATTRIBUTE");
}
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) {
auto f = validate_operation("QUERY ATTRIBUTE FOR ALL");
if (f.failed()) {
return make_exception_future<role_manager::attribute_vals>(f.get_exception());
}
return _std_mgr->query_attribute_for_all(attribute_name, qs);
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<role_manager::attribute_vals>("QUERY ATTRIBUTE");
}
future<> maintenance_socket_role_manager::set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) {
auto f = validate_operation("SET ATTRIBUTE");
if (f.failed()) {
return f;
}
return _std_mgr->set_attribute(role_name, attribute_name, attribute_value, mc);
return operation_not_supported_exception("SET ATTRIBUTE");
}
future<> maintenance_socket_role_manager::remove_attribute(std::string_view role_name, std::string_view attribute_name, ::service::group0_batch& mc) {
auto f = validate_operation("REMOVE ATTRIBUTE");
if (f.failed()) {
return f;
}
return _std_mgr->remove_attribute(role_name, attribute_name, mc);
return operation_not_supported_exception("REMOVE ATTRIBUTE");
}
future<std::vector<cql3::description>> maintenance_socket_role_manager::describe_role_grants() {
auto f = validate_operation("DESCRIBE ROLE GRANTS");
if (f.failed()) {
return make_exception_future<std::vector<cql3::description>>(f.get_exception());
}
return _std_mgr->describe_role_grants();
return operation_not_supported_exception<std::vector<cql3::description>>("DESCRIBE SCHEMA WITH INTERNALS");
}
} // namespace auth

View File

@@ -11,7 +11,6 @@
#include "auth/cache.hh"
#include "auth/resource.hh"
#include "auth/role_manager.hh"
#include "auth/standard_role_manager.hh"
#include <seastar/core/future.hh>
namespace cql3 {
@@ -25,26 +24,13 @@ class raft_group0_client;
namespace auth {
// This role manager is used by the maintenance socket. It has disabled all role management operations
// in maintenance mode. In normal mode it delegates all operations to a standard_role_manager,
// which is created on demand when the node joins the cluster.
extern const std::string_view maintenance_socket_role_manager_name;
// This role manager is used by the maintenance socket. It has disabled all role management operations to not depend on
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
class maintenance_socket_role_manager final : public role_manager {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
std::optional<standard_role_manager> _std_mgr;
bool _is_maintenance_mode;
public:
void set_maintenance_mode() override;
// Ensures role management operations are enabled.
// It must be called once the node has joined the cluster.
// In the meantime all role management operations will fail.
future<> ensure_role_operations_are_enabled() override;
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
virtual std::string_view qualified_java_name() const noexcept override;
@@ -56,21 +42,21 @@ public:
virtual future<> ensure_superuser_is_created() override;
virtual future<> create(std::string_view role_name, const role_config& c, ::service::group0_batch& mc) override;
virtual future<> create(std::string_view role_name, const role_config&, ::service::group0_batch&) override;
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<> alter(std::string_view role_name, const role_config_update& u, ::service::group0_batch& mc) override;
virtual future<> alter(std::string_view role_name, const role_config_update&, ::service::group0_batch&) override;
virtual future<> grant(std::string_view grantee_name, std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<> revoke(std::string_view revokee_name, std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query m) override;
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state& qs) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
virtual future<role_set> query_all(::service::query_state& qs) override;
virtual future<role_set> query_all(::service::query_state&) override;
virtual future<bool> exists(std::string_view role_name) override;
@@ -78,19 +64,15 @@ public:
virtual future<bool> can_login(std::string_view role_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;
virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name, ::service::group0_batch& mc) override;
virtual future<std::vector<cql3::description>> describe_role_grants() override;
private:
future<> validate_operation(std::string_view name) const;
};
}

View File

@@ -26,6 +26,7 @@
#include "cql3/untyped_result_set.hh"
#include "utils/log.hh"
#include "service/migration_manager.hh"
#include "utils/class_registrator.hh"
#include "replica/database.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
@@ -36,18 +37,27 @@ constexpr std::string_view password_authenticator_name("org.apache.cassandra.aut
// name of the hash column.
static constexpr std::string_view SALTED_HASH = "salted_hash";
static constexpr std::string_view DEFAULT_USER_NAME = meta::DEFAULT_SUPERUSER_NAME;
static const sstring DEFAULT_USER_PASSWORD = sstring(meta::DEFAULT_SUPERUSER_NAME);
static logging::logger plogger("password_authenticator");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authenticator,
password_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
std::string password_authenticator::default_superuser(cql3::query_processor& qp) {
if (legacy_mode(qp)) {
return std::string(meta::DEFAULT_SUPERUSER_NAME);
}
return qp.db().get_config().auth_superuser_name();
static std::string_view get_config_value(std::string_view value, std::string_view def) {
return value.empty() ? def : value;
}
std::string password_authenticator::default_superuser(const db::config& cfg) {
return std::string(get_config_value(cfg.auth_superuser_name(), DEFAULT_USER_NAME));
}
password_authenticator::~password_authenticator() {
@@ -59,6 +69,7 @@ password_authenticator::password_authenticator(cql3::query_processor& qp, ::serv
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
{}
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
@@ -112,14 +123,11 @@ future<> password_authenticator::migrate_legacy_metadata() const {
}
future<> password_authenticator::legacy_create_default_if_missing() {
if (_superuser.empty()) {
on_internal_error(plogger, "Legacy auth default superuser name is empty");
}
const auto exists = co_await legacy::default_role_row_satisfies(_qp, &has_salted_hash, _superuser);
if (exists) {
co_return;
}
std::string salted_pwd(_qp.db().get_config().auth_superuser_salted_password());
std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
if (salted_pwd.empty()) {
salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt, _scheme);
}
@@ -139,9 +147,6 @@ future<> password_authenticator::legacy_create_default_if_missing() {
future<> password_authenticator::maybe_create_default_password() {
auto needs_password = [this] () -> future<bool> {
if (_superuser.empty()) {
co_return false;
}
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE is_superuser = true ALLOW FILTERING", get_auth_ks_name(_qp), meta::roles_table::name);
auto results = co_await _qp.execute_internal(query,
db::consistency_level::LOCAL_ONE,
@@ -173,9 +178,9 @@ future<> password_authenticator::maybe_create_default_password() {
co_return;
}
// Set default superuser's password.
std::string salted_pwd(_qp.db().get_config().auth_superuser_salted_password());
std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
if (salted_pwd.empty()) {
co_return;
salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt, _scheme);
}
const auto update_query = update_row_query();
co_await collect_mutations(_qp, batch, update_query, {salted_pwd, _superuser});
@@ -205,8 +210,6 @@ future<> password_authenticator::maybe_create_default_password_with_retries() {
future<> password_authenticator::start() {
return once_among_shards([this] {
_superuser = default_superuser(_qp);
// Verify that at least one hashing scheme is supported.
passwords::detail::verify_scheme(_scheme);
plogger.info("Using password hashing scheme: {}", passwords::detail::prefix_for_scheme(_scheme));
@@ -214,9 +217,6 @@ future<> password_authenticator::start() {
_stopped = do_after_system_ready(_as, [this] {
return async([this] {
if (legacy_mode(_qp)) {
if (_superuser.empty()) {
on_internal_error(plogger, "Legacy auth default superuser name is empty");
}
if (!_superuser_created_promise.available()) {
// Counterintuitively, we mark promise as ready before any startup work
// because wait_for_schema_agreement() below will block indefinitely
@@ -251,9 +251,6 @@ future<> password_authenticator::start() {
});
if (legacy_mode(_qp)) {
if (_superuser.empty()) {
on_internal_error(plogger, "Legacy auth default superuser name is empty");
}
static const sstring create_roles_query = fmt::format(
"CREATE TABLE {}.{} ("
" {} text PRIMARY KEY,"
@@ -283,7 +280,7 @@ future<> password_authenticator::stop() {
db::consistency_level password_authenticator::consistency_for_user(std::string_view role_name) {
// TODO: this is plain dung. Why treat hardcoded default special, but for example a user-created
// super user uses plain LOCAL_ONE?
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
if (role_name == DEFAULT_USER_NAME) {
return db::consistency_level::QUORUM;
}
return db::consistency_level::LOCAL_ONE;

View File

@@ -51,7 +51,7 @@ class password_authenticator : public authenticator {
public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(cql3::query_processor& qp);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);

View File

@@ -112,11 +112,6 @@ public:
virtual future<> stop() = 0;
///
/// Notify that the maintenance mode is starting.
///
virtual void set_maintenance_mode() {}
///
/// Ensure that superuser role exists.
///
@@ -124,11 +119,6 @@ public:
///
virtual future<> ensure_superuser_is_created() = 0;
///
/// Ensure role management operations are enabled. Some role managers may defer initialization.
///
virtual future<> ensure_role_operations_are_enabled() { return make_ready_future<>(); }
///
/// \returns an exceptional future with \ref role_already_exists for a role that has previously been created.
///

View File

@@ -22,11 +22,21 @@
#include "db/config.hh"
#include "utils/log.hh"
#include "seastarx.hh"
#include "utils/class_registrator.hh"
namespace auth {
static logging::logger mylog("saslauthd_authenticator");
// To ensure correct initialization order, we unfortunately need to use a string literal.
static const class_registrator<
authenticator,
saslauthd_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -16,8 +16,6 @@
#include <algorithm>
#include <chrono>
#include <boost/algorithm/string.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
@@ -25,17 +23,8 @@
#include "auth/allow_all_authenticator.hh"
#include "auth/allow_all_authorizer.hh"
#include "auth/certificate_authenticator.hh"
#include "auth/common.hh"
#include "auth/default_authorizer.hh"
#include "auth/ldap_role_manager.hh"
#include "auth/maintenance_socket_authenticator.hh"
#include "auth/maintenance_socket_role_manager.hh"
#include "auth/password_authenticator.hh"
#include "auth/role_or_anonymous.hh"
#include "auth/saslauthd_authenticator.hh"
#include "auth/standard_role_manager.hh"
#include "auth/transitional.hh"
#include "cql3/functions/functions.hh"
#include "cql3/query_processor.hh"
#include "cql3/description.hh"
@@ -54,6 +43,7 @@
#include "service/raft/raft_group0_client.hh"
#include "mutation/timestamp.hh"
#include "utils/assert.hh"
#include "utils/class_registrator.hh"
#include "locator/abstract_replication_strategy.hh"
#include "data_dictionary/keyspace_metadata.hh"
#include "service/storage_service.hh"
@@ -186,9 +176,8 @@ service::service(
cql3::query_processor& qp,
::service::raft_group0_client& g0,
::service::migration_notifier& mn,
authorizer_factory authorizer_factory,
authenticator_factory authenticator_factory,
role_manager_factory role_manager_factory,
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache)
: service(
@@ -196,9 +185,9 @@ service::service(
qp,
g0,
mn,
authorizer_factory(),
authenticator_factory(),
role_manager_factory(),
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
used_by_maintenance_socket) {
}
@@ -318,10 +307,6 @@ future<permission_set> service::get_permissions(const role_or_anonymous& maybe_r
return _cache.get_permissions(maybe_role, r);
}
void service::set_maintenance_mode() {
_role_manager->set_maintenance_mode();
}
future<bool> service::has_superuser(std::string_view role_name, const role_set& roles) const {
for (const auto& role : roles) {
if (co_await _role_manager->is_superuser(role)) {
@@ -357,10 +342,6 @@ static void validate_authentication_options_are_supported(
}
}
future<> service::ensure_role_operations_are_enabled() {
return _role_manager->ensure_role_operations_are_enabled();
}
future<> service::create_role(std::string_view name,
const role_config& config,
const authentication_options& options,
@@ -678,10 +659,6 @@ future<std::vector<cql3::description>> service::describe_auth(bool with_hashed_p
// Free functions.
//
void set_maintenance_mode(service& ser) {
ser.set_maintenance_mode();
}
future<bool> has_superuser(const service& ser, const authenticated_user& u) {
if (is_anonymous(u)) {
return make_ready_future<bool>(false);
@@ -690,10 +667,6 @@ future<bool> has_superuser(const service& ser, const authenticated_user& u) {
return ser.has_superuser(*u.name);
}
future<> ensure_role_operations_are_enabled(service& ser) {
return ser.underlying_role_manager().ensure_role_operations_are_enabled();
}
future<role_set> get_roles(const service& ser, const authenticated_user& u) {
if (is_anonymous(u)) {
return make_ready_future<role_set>();
@@ -955,111 +928,4 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
std::nullopt);
}
namespace {
std::string_view get_short_name(std::string_view name) {
auto pos = name.find_last_of('.');
if (pos == std::string_view::npos) {
return name;
}
return name.substr(pos + 1);
}
} // anonymous namespace
authorizer_factory make_authorizer_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm) {
std::string_view short_name = get_short_name(name);
if (boost::iequals(short_name, "AllowAllAuthorizer")) {
return [&qp, &g0, &mm] {
return std::make_unique<allow_all_authorizer>(qp.local(), g0, mm.local());
};
} else if (boost::iequals(short_name, "CassandraAuthorizer")) {
return [&qp, &g0, &mm] {
return std::make_unique<default_authorizer>(qp.local(), g0, mm.local());
};
} else if (boost::iequals(short_name, "TransitionalAuthorizer")) {
return [&qp, &g0, &mm] {
return std::make_unique<transitional_authorizer>(qp.local(), g0, mm.local());
};
}
throw std::invalid_argument(fmt::format("Unknown authorizer: {}", name));
}
authenticator_factory make_authenticator_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
std::string_view short_name = get_short_name(name);
if (boost::iequals(short_name, "AllowAllAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<allow_all_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "PasswordAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<password_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "CertificateAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<certificate_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "SaslauthdAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<saslauthd_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "TransitionalAuthenticator")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<transitional_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
throw std::invalid_argument(fmt::format("Unknown authenticator: {}", name));
}
role_manager_factory make_role_manager_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
std::string_view short_name = get_short_name(name);
if (boost::iequals(short_name, "CassandraRoleManager")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<standard_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
};
} else if (boost::iequals(short_name, "LDAPRoleManager")) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<ldap_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
throw std::invalid_argument(fmt::format("Unknown role manager: {}", name));
}
authenticator_factory make_maintenance_socket_authenticator_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<maintenance_socket_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
role_manager_factory make_maintenance_socket_role_manager_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& auth_cache) {
return [&qp, &g0, &mm, &auth_cache] {
return std::make_unique<maintenance_socket_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
};
}
}

View File

@@ -44,10 +44,11 @@ namespace auth {
class role_or_anonymous;
/// Factory function types for creating auth module instances on each shard.
using authorizer_factory = std::function<std::unique_ptr<authorizer>()>;
using authenticator_factory = std::function<std::unique_ptr<authenticator>()>;
using role_manager_factory = std::function<std::unique_ptr<role_manager>()>;
struct service_config final {
sstring authorizer_java_name;
sstring authenticator_java_name;
sstring role_manager_java_name;
};
///
/// Due to poor (in this author's opinion) decisions of Apache Cassandra, certain choices of one role-manager,
@@ -107,16 +108,15 @@ public:
///
/// This constructor is intended to be used when the class is sharded via \ref seastar::sharded. In that case, the
/// arguments must be copyable, which is why we delay construction with instance-construction factories instead
/// arguments must be copyable, which is why we delay construction with instance-construction instructions instead
/// of the instances themselves.
///
service(
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_notifier&,
authorizer_factory,
authenticator_factory,
role_manager_factory,
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&);
@@ -138,11 +138,6 @@ public:
///
future<permission_set> get_uncached_permissions(const role_or_anonymous&, const resource&) const;
///
/// Notify the service that the node is entering maintenance mode.
///
void set_maintenance_mode();
///
/// Query whether the named role has been granted a role that is a superuser.
///
@@ -152,11 +147,6 @@ public:
///
future<bool> has_superuser(std::string_view role_name) const;
///
/// Ensure that the role operations are enabled. Some role managers defer initialization.
///
future<> ensure_role_operations_are_enabled();
///
/// Create a role with optional authentication information.
///
@@ -218,12 +208,8 @@ private:
future<std::vector<cql3::description>> describe_permissions() const;
};
void set_maintenance_mode(service&);
future<bool> has_superuser(const service&, const authenticated_user&);
future<> ensure_role_operations_are_enabled(service&);
future<role_set> get_roles(const service&, const authenticated_user&);
future<permission_set> get_permissions(const service&, const authenticated_user&, const resource&);
@@ -410,52 +396,4 @@ future<> commit_mutations(service& ser, ::service::group0_batch&& mc);
// Migrates data from old keyspace to new one which supports linearizable writes via raft.
future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_client& g0, start_operation_func_t start_operation_func, abort_source& as);
///
/// Factory helper functions for creating auth module instances.
/// These are intended for use with sharded<service>::start() where copyable arguments are required.
/// The returned factories capture the sharded references and call .local() when invoked on each shard.
///
/// Creates an authorizer factory for config-selectable authorizer types.
/// @param name The authorizer class name (e.g., "CassandraAuthorizer", "AllowAllAuthorizer")
authorizer_factory make_authorizer_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm);
/// Creates an authenticator factory for config-selectable authenticator types.
/// @param name The authenticator class name (e.g., "PasswordAuthenticator", "AllowAllAuthenticator")
authenticator_factory make_authenticator_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
/// Creates a role_manager factory for config-selectable role manager types.
/// @param name The role manager class name (e.g., "CassandraRoleManager")
role_manager_factory make_role_manager_factory(
std::string_view name,
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
/// Creates a factory for the maintenance socket authenticator.
/// This authenticator is not config-selectable and is only used for the maintenance socket.
authenticator_factory make_maintenance_socket_authenticator_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
/// Creates a factory for the maintenance socket role manager.
/// This role manager is not config-selectable and is only used for the maintenance socket.
role_manager_factory make_maintenance_socket_role_manager_factory(
sharded<cql3::query_processor>& qp,
::service::raft_group0_client& g0,
sharded<::service::migration_manager>& mm,
sharded<cache>& cache);
}

View File

@@ -34,6 +34,7 @@
#include <seastar/core/loop.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
#include "service/migration_manager.hh"
#include "password_authenticator.hh"
#include "utils/managed_string.hh"
@@ -43,6 +44,14 @@ namespace auth {
static logging::logger log("standard_role_manager");
static const class_registrator<
role_manager,
standard_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
static db::consistency_level consistency_for_role(std::string_view role_name) noexcept {
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
return db::consistency_level::QUORUM;
@@ -114,6 +123,7 @@ standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::servic
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
{}
std::string_view standard_role_manager::qualified_java_name() const noexcept {
@@ -176,9 +186,6 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
}
future<> standard_role_manager::legacy_create_default_role_if_missing() {
if (_superuser.empty()) {
on_internal_error(log, "Legacy auth default superuser name is empty");
}
try {
const auto exists = co_await legacy::default_role_row_satisfies(_qp, &has_can_login, _superuser);
if (exists) {
@@ -202,9 +209,6 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
}
future<> standard_role_manager::maybe_create_default_role() {
if (_superuser.empty()) {
co_return;
}
auto has_superuser = [this] () -> future<bool> {
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE is_superuser = true ALLOW FILTERING", get_auth_ks_name(_qp), meta::roles_table::name);
auto results = co_await _qp.execute_internal(query, db::consistency_level::LOCAL_ONE,
@@ -296,8 +300,6 @@ future<> standard_role_manager::migrate_legacy_metadata() {
future<> standard_role_manager::start() {
return once_among_shards([this] () -> future<> {
_superuser = password_authenticator::default_superuser(_qp);
if (legacy_mode(_qp)) {
co_await create_legacy_metadata_tables_if_missing();
}

View File

@@ -8,200 +8,244 @@
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "auth/transitional.hh"
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/authorizer.hh"
#include "auth/default_authorizer.hh"
#include "auth/password_authenticator.hh"
#include "auth/cache.hh"
#include "auth/permission.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
namespace auth {
transitional_authenticator::transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
static const sstring PACKAGE_NAME("com.scylladb.auth.");
static const sstring& transitional_authenticator_name() {
static const sstring name = PACKAGE_NAME + "TransitionalAuthenticator";
return name;
}
transitional_authenticator::transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
static const sstring& transitional_authorizer_name() {
static const sstring name = PACKAGE_NAME + "TransitionalAuthorizer";
return name;
}
future<> transitional_authenticator::start() {
return _authenticator->start();
}
class transitional_authenticator : public authenticator {
std::unique_ptr<authenticator> _authenticator;
future<> transitional_authenticator::stop() {
return _authenticator->stop();
}
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
std::string_view transitional_authenticator::qualified_java_name() const {
return "com.scylladb.auth.TransitionalAuthenticator";
}
bool transitional_authenticator::require_authentication() const {
return true;
}
authentication_option_set transitional_authenticator::supported_options() const {
return _authenticator->supported_options();
}
authentication_option_set transitional_authenticator::alterable_options() const {
return _authenticator->alterable_options();
}
future<authenticated_user> transitional_authenticator::authenticate(const credentials_map& credentials) const {
auto i = credentials.find(authenticator::USERNAME_KEY);
if ((i == credentials.end() || i->second.empty())
&& (!credentials.contains(PASSWORD_KEY) || credentials.at(PASSWORD_KEY).empty())) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
}
return make_ready_future().then([this, &credentials] {
return _authenticator->authenticate(credentials);
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
}
virtual future<> start() override {
return _authenticator->start();
}
virtual future<> stop() override {
return _authenticator->stop();
}
virtual std::string_view qualified_java_name() const override {
return transitional_authenticator_name();
}
virtual bool require_authentication() const override {
return true;
}
virtual authentication_option_set supported_options() const override {
return _authenticator->supported_options();
}
virtual authentication_option_set alterable_options() const override {
return _authenticator->alterable_options();
}
virtual future<authenticated_user> authenticate(const credentials_map& credentials) const override {
auto i = credentials.find(authenticator::USERNAME_KEY);
if ((i == credentials.end() || i->second.empty())
&& (!credentials.contains(PASSWORD_KEY) || credentials.at(PASSWORD_KEY).empty())) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
});
}
future<> transitional_authenticator::create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) {
return _authenticator->create(role_name, options, mc);
}
future<> transitional_authenticator::alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) {
return _authenticator->alter(role_name, options, mc);
}
future<> transitional_authenticator::drop(std::string_view role_name, ::service::group0_batch& mc) {
return _authenticator->drop(role_name, mc);
}
future<custom_options> transitional_authenticator::query_custom_options(std::string_view role_name) const {
return _authenticator->query_custom_options(role_name);
}
bool transitional_authenticator::uses_password_hashes() const {
return _authenticator->uses_password_hashes();
}
future<std::optional<sstring>> transitional_authenticator::get_password_hash(std::string_view role_name) const {
return _authenticator->get_password_hash(role_name);
}
const resource_set& transitional_authenticator::protected_resources() const {
return _authenticator->protected_resources();
}
::shared_ptr<sasl_challenge> transitional_authenticator::new_sasl_challenge() const {
class sasl_wrapper : public sasl_challenge {
public:
sasl_wrapper(::shared_ptr<sasl_challenge> sasl)
: _sasl(std::move(sasl)) {
}
virtual bytes evaluate_response(bytes_view client_response) override {
return make_ready_future().then([this, &credentials] {
return _authenticator->authenticate(credentials);
}).handle_exception([](auto ep) {
try {
return _sasl->evaluate_response(client_response);
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
_complete = true;
return {};
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
}
});
}
virtual bool is_complete() const override {
return _complete || _sasl->is_complete();
}
virtual future<> create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override {
return _authenticator->create(role_name, options, mc);
}
virtual future<authenticated_user> get_authenticated_user() const override {
return futurize_invoke([this] {
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
virtual future<> alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override {
return _authenticator->alter(role_name, options, mc);
}
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override {
return _authenticator->drop(role_name, mc);
}
virtual future<custom_options> query_custom_options(std::string_view role_name) const override {
return _authenticator->query_custom_options(role_name);
}
virtual bool uses_password_hashes() const override {
return _authenticator->uses_password_hashes();
}
virtual future<std::optional<sstring>> get_password_hash(std::string_view role_name) const override {
return _authenticator->get_password_hash(role_name);
}
virtual const resource_set& protected_resources() const override {
return _authenticator->protected_resources();
}
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override {
class sasl_wrapper : public sasl_challenge {
public:
sasl_wrapper(::shared_ptr<sasl_challenge> sasl)
: _sasl(std::move(sasl)) {
}
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (const exceptions::authentication_exception&) {
_complete = true;
return {};
}
}
virtual bool is_complete() const override {
return _complete || _sasl->is_complete();
}
virtual future<authenticated_user> get_authenticated_user() const override {
return futurize_invoke([this] {
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
});
});
});
}
}
const sstring& get_username() const override {
return _sasl->get_username();
}
const sstring& get_username() const override {
return _sasl->get_username();
}
private:
::shared_ptr<sasl_challenge> _sasl;
private:
::shared_ptr<sasl_challenge> _sasl;
bool _complete = false;
};
return ::make_shared<sasl_wrapper>(_authenticator->new_sasl_challenge());
}
bool _complete = false;
};
return ::make_shared<sasl_wrapper>(_authenticator->new_sasl_challenge());
}
future<> transitional_authenticator::ensure_superuser_is_created() const {
return _authenticator->ensure_superuser_is_created();
}
virtual future<> ensure_superuser_is_created() const override {
return _authenticator->ensure_superuser_is_created();
}
};
transitional_authorizer::transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: transitional_authorizer(std::make_unique<default_authorizer>(qp, g0, mm)) {
}
class transitional_authorizer : public authorizer {
std::unique_ptr<authorizer> _authorizer;
transitional_authorizer::transitional_authorizer(std::unique_ptr<authorizer> a)
: _authorizer(std::move(a)) {
}
public:
transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: transitional_authorizer(std::make_unique<default_authorizer>(qp, g0, mm)) {
}
transitional_authorizer(std::unique_ptr<authorizer> a)
: _authorizer(std::move(a)) {
}
transitional_authorizer::~transitional_authorizer() {
}
~transitional_authorizer() {
}
future<> transitional_authorizer::start() {
return _authorizer->start();
}
virtual future<> start() override {
return _authorizer->start();
}
future<> transitional_authorizer::stop() {
return _authorizer->stop();
}
virtual future<> stop() override {
return _authorizer->stop();
}
std::string_view transitional_authorizer::qualified_java_name() const {
return "com.scylladb.auth.TransitionalAuthorizer";
}
virtual std::string_view qualified_java_name() const override {
return transitional_authorizer_name();
}
future<permission_set> transitional_authorizer::authorize(const role_or_anonymous&, const resource&) const {
static const permission_set transitional_permissions =
permission_set::of<
permission::CREATE,
permission::ALTER,
permission::DROP,
permission::SELECT,
permission::MODIFY>();
virtual future<permission_set> authorize(const role_or_anonymous&, const resource&) const override {
static const permission_set transitional_permissions =
permission_set::of<
permission::CREATE,
permission::ALTER,
permission::DROP,
permission::SELECT,
permission::MODIFY>();
return make_ready_future<permission_set>(transitional_permissions);
}
return make_ready_future<permission_set>(transitional_permissions);
}
future<> transitional_authorizer::grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) {
return _authorizer->grant(s, std::move(ps), r, mc);
}
virtual future<> grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override {
return _authorizer->grant(s, std::move(ps), r, mc);
}
future<> transitional_authorizer::revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) {
return _authorizer->revoke(s, std::move(ps), r, mc);
}
virtual future<> revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override {
return _authorizer->revoke(s, std::move(ps), r, mc);
}
future<std::vector<permission_details>> transitional_authorizer::list_all() const {
return _authorizer->list_all();
}
virtual future<std::vector<permission_details>> list_all() const override {
return _authorizer->list_all();
}
future<> transitional_authorizer::revoke_all(std::string_view s, ::service::group0_batch& mc) {
return _authorizer->revoke_all(s, mc);
}
virtual future<> revoke_all(std::string_view s, ::service::group0_batch& mc) override {
return _authorizer->revoke_all(s, mc);
}
future<> transitional_authorizer::revoke_all(const resource& r, ::service::group0_batch& mc) {
return _authorizer->revoke_all(r, mc);
}
virtual future<> revoke_all(const resource& r, ::service::group0_batch& mc) override {
return _authorizer->revoke_all(r, mc);
}
const resource_set& transitional_authorizer::protected_resources() const {
return _authorizer->protected_resources();
}
virtual const resource_set& protected_resources() const override {
return _authorizer->protected_resources();
}
};
}
//
// To ensure correct initialization order, we unfortunately need to use string literals.
//
static const class_registrator<
auth::authenticator,
auth::transitional_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<
auth::authorizer,
auth::transitional_authorizer,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&> transitional_authorizer_reg(auth::PACKAGE_NAME + "TransitionalAuthorizer");

View File

@@ -1,81 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "auth/authenticator.hh"
#include "auth/authorizer.hh"
#include "auth/cache.hh"
namespace cql3 {
class query_processor;
}
namespace service {
class raft_group0_client;
class migration_manager;
}
namespace auth {
///
/// Transitional authenticator that allows anonymous access when credentials are not provided
/// or authentication fails. Used for migration scenarios.
///
class transitional_authenticator : public authenticator {
std::unique_ptr<authenticator> _authenticator;
public:
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache);
transitional_authenticator(std::unique_ptr<authenticator> a);
virtual future<> start() override;
virtual future<> stop() override;
virtual std::string_view qualified_java_name() const override;
virtual bool require_authentication() const override;
virtual authentication_option_set supported_options() const override;
virtual authentication_option_set alterable_options() const override;
virtual future<authenticated_user> authenticate(const credentials_map& credentials) const override;
virtual future<> create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override;
virtual future<> alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override;
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
virtual future<custom_options> query_custom_options(std::string_view role_name) const override;
virtual bool uses_password_hashes() const override;
virtual future<std::optional<sstring>> get_password_hash(std::string_view role_name) const override;
virtual const resource_set& protected_resources() const override;
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override;
virtual future<> ensure_superuser_is_created() const override;
};
///
/// Transitional authorizer that grants a fixed set of permissions to all users.
/// Used for migration scenarios.
///
class transitional_authorizer : public authorizer {
std::unique_ptr<authorizer> _authorizer;
public:
transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm);
transitional_authorizer(std::unique_ptr<authorizer> a);
~transitional_authorizer();
virtual future<> start() override;
virtual future<> stop() override;
virtual std::string_view qualified_java_name() const override;
virtual future<permission_set> authorize(const role_or_anonymous&, const resource&) const override;
virtual future<> grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override;
virtual future<> revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override;
virtual future<std::vector<permission_details>> list_all() const override;
virtual future<> revoke_all(std::string_view s, ::service::group0_batch& mc) override;
virtual future<> revoke_all(const resource& r, ::service::group0_batch& mc) override;
virtual const resource_set& protected_resources() const override;
};
} // namespace auth

View File

@@ -618,7 +618,7 @@ static void set_default_properties_log_table(schema_builder& b, const schema& s,
b.set_caching_options(caching_options::get_disabled_caching_options());
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, false));
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, db.get_token_metadata(), false));
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
}

View File

@@ -598,7 +598,8 @@ protected:
// Garbage collected sstables that were added to SSTable set and should be eventually removed from it.
std::vector<sstables::shared_sstable> _used_garbage_collected_sstables;
utils::observable<> _stop_request_observable;
tombstone_gc_state _tombstone_gc_state;
// optional tombstone_gc_state that is used when gc has to check only the compacting sstables to collect tombstones.
std::optional<tombstone_gc_state> _tombstone_gc_state_with_commitlog_check_disabled;
int64_t _output_repaired_at = 0;
private:
// Keeps track of monitors for input sstable.
@@ -648,12 +649,9 @@ protected:
, _owned_ranges(std::move(descriptor.owned_ranges))
, _sharder(descriptor.sharder)
, _owned_ranges_checker(_owned_ranges ? std::optional<dht::incremental_owned_ranges_checker>(*_owned_ranges) : std::nullopt)
, _tombstone_gc_state(_table_s.get_tombstone_gc_state())
, _tombstone_gc_state_with_commitlog_check_disabled(descriptor.gc_check_only_compacting_sstables ? std::make_optional(_table_s.get_tombstone_gc_state().with_commitlog_check_disabled()) : std::nullopt)
, _progress_monitor(progress_monitor)
{
if (descriptor.gc_check_only_compacting_sstables) {
_tombstone_gc_state = _tombstone_gc_state.with_commitlog_check_disabled();
}
std::unordered_set<sstables::run_id> ssts_run_ids;
_contains_multi_fragment_runs = std::any_of(_sstables.begin(), _sstables.end(), [&ssts_run_ids] (sstables::shared_sstable& sst) {
return !ssts_run_ids.insert(sst->run_identifier()).second;
@@ -851,8 +849,8 @@ private:
return _table_s.get_compaction_strategy().make_sstable_set(_table_s);
}
tombstone_gc_state get_tombstone_gc_state() const {
return _tombstone_gc_state;
const tombstone_gc_state& get_tombstone_gc_state() const {
return _tombstone_gc_state_with_commitlog_check_disabled ? _tombstone_gc_state_with_commitlog_check_disabled.value() : _table_s.get_tombstone_gc_state();
}
future<> setup() {
@@ -1052,7 +1050,7 @@ private:
return can_never_purge;
}
return [this] (const dht::decorated_key& dk, is_shadowable is_shadowable) {
return get_max_purgeable_timestamp(_table_s, *_selector, _compacting_for_max_purgeable_func, dk, _bloom_filter_checks, _compacting_max_timestamp, !_tombstone_gc_state.is_commitlog_check_enabled(), is_shadowable);
return get_max_purgeable_timestamp(_table_s, *_selector, _compacting_for_max_purgeable_func, dk, _bloom_filter_checks, _compacting_max_timestamp, _tombstone_gc_state_with_commitlog_check_disabled.has_value(), is_shadowable);
};
}

View File

@@ -54,7 +54,7 @@ public:
virtual future<> on_compaction_completion(compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0;
virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0;
virtual bool tombstone_gc_enabled() const noexcept = 0;
virtual tombstone_gc_state get_tombstone_gc_state() const noexcept = 0;
virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0;
virtual compaction_backlog_tracker& get_backlog_tracker() = 0;
virtual const std::string get_group_id() const noexcept = 0;
virtual seastar::condition_variable& get_staging_done_condition() noexcept = 0;

View File

@@ -778,7 +778,6 @@ compaction_manager::get_incremental_repair_read_lock(compaction::compaction_grou
cmlog.debug("Get get_incremental_repair_read_lock for {} started", reason);
}
compaction::compaction_state& cs = get_compaction_state(&t);
auto gh = cs.gate.hold();
auto ret = co_await cs.incremental_repair_lock.hold_read_lock();
if (!reason.empty()) {
cmlog.debug("Get get_incremental_repair_read_lock for {} done", reason);
@@ -792,7 +791,6 @@ compaction_manager::get_incremental_repair_write_lock(compaction::compaction_gro
cmlog.debug("Get get_incremental_repair_write_lock for {} started", reason);
}
compaction::compaction_state& cs = get_compaction_state(&t);
auto gh = cs.gate.hold();
auto ret = co_await cs.incremental_repair_lock.hold_write_lock();
if (!reason.empty()) {
cmlog.debug("Get get_incremental_repair_write_lock for {} done", reason);
@@ -1042,7 +1040,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
_compaction_controller.set_max_shares(max_shares);
}))
, _strategy_control(std::make_unique<strategy_control>(*this))
{
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
register_metrics();
// Bandwidth throttling is node-wide, updater is needed on single shard
@@ -1066,7 +1064,7 @@ compaction_manager::compaction_manager(tasks::task_manager& tm)
, _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer()))
, _compaction_max_shares_observer(_cfg.max_shares.observe([] (const float& max_shares) {}))
, _strategy_control(std::make_unique<strategy_control>(*this))
{
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
// No metric registration because this constructor is supposed to be used only by the testing
// infrastructure.
@@ -2389,8 +2387,6 @@ future<> compaction_manager::remove(compaction_group_view& t, sstring reason) no
if (!c_state.gate.is_closed()) {
auto close_gate = c_state.gate.close();
co_await stop_ongoing_compactions(reason, &t);
// Wait for users of incremental repair lock (can be either repair itself or maintenance compactions).
co_await c_state.incremental_repair_lock.write_lock();
co_await std::move(close_gate);
}

View File

@@ -167,6 +167,10 @@ private:
std::unique_ptr<strategy_control> _strategy_control;
shared_tombstone_gc_state _shared_tombstone_gc_state;
// TODO: tombstone_gc_state should now have value semantics, but the code
// still uses it with reference semantics (inconsistently though).
// Drop this member, once the code is converted into using value semantics.
tombstone_gc_state _tombstone_gc_state;
utils::disk_space_monitor::subscription _out_of_space_subscription;
private:
@@ -452,6 +456,10 @@ public:
compaction::strategy_control& get_strategy_control() const noexcept;
const tombstone_gc_state& get_tombstone_gc_state() const noexcept {
return _tombstone_gc_state;
};
shared_tombstone_gc_state& get_shared_tombstone_gc_state() noexcept {
return _shared_tombstone_gc_state;
};

View File

@@ -639,7 +639,7 @@ strict_is_not_null_in_views: true
# * workdir: the node will open the maintenance socket on the path <scylla's workdir>/cql.m,
# where <scylla's workdir> is a path defined by the workdir configuration option,
# * <socket path>: the node will open the maintenance socket on the path <socket path>.
maintenance_socket: workdir
maintenance_socket: ignore
# If set to true, configuration parameters defined with LiveUpdate option can be updated in runtime with CQL
# by updating system.config virtual table. If we don't want any configuration parameter to be changed in runtime
@@ -648,9 +648,10 @@ maintenance_socket: workdir
# e.g. for cloud users, for whom scylla's configuration should be changed only by support engineers.
# live_updatable_config_params_changeable_via_cql: true
#
# Guardrails options
#
# ****************
# * GUARDRAILS *
# ****************
# Guardrails to warn or fail when Replication Factor is smaller/greater than the threshold.
# Please note that the value of 0 is always allowed,
# which means that having no replication at all, i.e. RF = 0, is always valid.
@@ -660,27 +661,6 @@ maintenance_socket: workdir
# minimum_replication_factor_warn_threshold: 3
# maximum_replication_factor_warn_threshold: -1
# maximum_replication_factor_fail_threshold: -1
#
# Guardrails to warn about or disallow creating a keyspace with specific replication strategy.
# Each of these 2 settings is a list storing replication strategies considered harmful.
# The replication strategies to choose from are:
# 1) SimpleStrategy,
# 2) NetworkTopologyStrategy,
# 3) LocalStrategy,
# 4) EverywhereStrategy
#
# replication_strategy_warn_list:
# - SimpleStrategy
# replication_strategy_fail_list:
#
# Guardrail to enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE.
# enable_create_table_with_compact_storage: false
#
# Guardrails to limit usage of selected consistency levels for writes.
# Adding a warning to a CQL query response can significantly increase network
# traffic and decrease overall throughput.
# write_consistency_levels_warned: []
# write_consistency_levels_disallowed: []
#
# System information encryption settings
@@ -858,6 +838,21 @@ maintenance_socket: workdir
# key_namespace: <kmip key namespace> (optional)
#
# Guardrails to warn about or disallow creating a keyspace with specific replication strategy.
# Each of these 2 settings is a list storing replication strategies considered harmful.
# The replication strategies to choose from are:
# 1) SimpleStrategy,
# 2) NetworkTopologyStrategy,
# 3) LocalStrategy,
# 4) EverywhereStrategy
#
# replication_strategy_warn_list:
# - SimpleStrategy
# replication_strategy_fail_list:
# Guardrail to enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE.
# enable_create_table_with_compact_storage: false
# Control tablets for new keyspaces.
# Can be set to: disabled|enabled|enforced
#

View File

@@ -1204,7 +1204,6 @@ scylla_core = (['message/messaging_service.cc',
'gms/application_state.cc',
'gms/inet_address.cc',
'dht/i_partitioner.cc',
'dht/fixed_shard.cc',
'dht/token.cc',
'dht/murmur3_partitioner.cc',
'dht/boot_strapper.cc',
@@ -1276,7 +1275,6 @@ scylla_core = (['message/messaging_service.cc',
'auth/resource.cc',
'auth/roles-metadata.cc',
'auth/passwords.cc',
'auth/maintenance_socket_authenticator.cc',
'auth/password_authenticator.cc',
'auth/permission.cc',
'auth/service.cc',
@@ -1342,7 +1340,6 @@ scylla_core = (['message/messaging_service.cc',
'service/strong_consistency/groups_manager.cc',
'service/strong_consistency/coordinator.cc',
'service/strong_consistency/state_machine.cc',
'service/strong_consistency/raft_groups_storage.cc',
'service/raft/group0_state_id_handler.cc',
'service/raft/group0_state_machine.cc',
'service/raft/group0_state_machine_merger.cc',

View File

@@ -10,9 +10,8 @@
#include "types/types.hh"
#include "types/vector.hh"
#include "exceptions/exceptions.hh"
#include <bit>
#include <span>
#include <seastar/core/byteorder.hh>
#include <bit>
namespace cql3 {
namespace functions {
@@ -31,10 +30,14 @@ std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension
expected_size, dimension, param->size()));
}
std::vector<float> result(dimension);
const char* p = reinterpret_cast<const char*>(param->data());
std::vector<float> result;
result.reserve(dimension);
bytes_view view(*param);
for (size_t i = 0; i < dimension; ++i) {
result[i] = std::bit_cast<float>(consume_be<uint32_t>(p));
// read_simple handles network byte order (big-endian) conversion
uint32_t raw = read_simple<uint32_t>(view);
result.push_back(std::bit_cast<float>(raw));
}
return result;
@@ -52,14 +55,13 @@ namespace {
// You should only use this function if you need to preserve the original vectors and cannot normalize
// them in advance.
float compute_cosine_similarity(std::span<const float> v1, std::span<const float> v2) {
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
float dot_product = 0.0;
float squared_norm_a = 0.0;
float squared_norm_b = 0.0;
double dot_product = 0.0;
double squared_norm_a = 0.0;
double squared_norm_b = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
float a = v1[i];
float b = v2[i];
double a = v1[i];
double b = v2[i];
dot_product += a * b;
squared_norm_a += a * a;
@@ -77,14 +79,13 @@ float compute_cosine_similarity(std::span<const float> v1, std::span<const float
}
float compute_euclidean_similarity(std::span<const float> v1, std::span<const float> v2) {
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
float sum = 0.0;
double sum = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
float a = v1[i];
float b = v2[i];
double a = v1[i];
double b = v2[i];
float diff = a - b;
double diff = a - b;
sum += diff * diff;
}
@@ -97,12 +98,11 @@ float compute_euclidean_similarity(std::span<const float> v1, std::span<const fl
// Assumes that both vectors are L2-normalized.
// This similarity is intended as an optimized way to perform cosine similarity calculation.
float compute_dot_product_similarity(std::span<const float> v1, std::span<const float> v2) {
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
float dot_product = 0.0;
double dot_product = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
float a = v1[i];
float b = v2[i];
double a = v1[i];
double b = v2[i];
dot_product += a * b;
}

View File

@@ -91,11 +91,7 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _authorized_prepared_cache_validity_in_ms_observer(_db.get_config().permissions_validity_in_ms.observe(_auth_prepared_cache_cfg_cb))
, _lang_manager(langm)
, _write_consistency_levels_warned_observer(_db.get_config().write_consistency_levels_warned.observe([this](const auto& v) { _write_consistency_levels_warned = to_consistency_level_set(v); }))
, _write_consistency_levels_disallowed_observer(_db.get_config().write_consistency_levels_disallowed.observe([this](const auto& v) { _write_consistency_levels_disallowed = to_consistency_level_set(v); }))
{
_write_consistency_levels_warned = to_consistency_level_set(_db.get_config().write_consistency_levels_warned());
_write_consistency_levels_disallowed = to_consistency_level_set(_db.get_config().write_consistency_levels_disallowed());
namespace sm = seastar::metrics;
namespace stm = statements;
using clevel = db::consistency_level;
@@ -512,32 +508,6 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
"i.e. attempts to set a forbidden replication strategy in a keyspace via CREATE/ALTER KEYSPACE.")).set_skip_when_empty(),
});
std::vector<sm::metric_definition> cql_cl_group;
for (auto cl = size_t(clevel::MIN_VALUE); cl <= size_t(clevel::MAX_VALUE); ++cl) {
cql_cl_group.push_back(
sm::make_counter(
"writes_per_consistency_level",
_cql_stats.writes_per_consistency_level[cl],
sm::description("Counts the number of writes for each consistency level."),
{cl_label(clevel(cl)), basic_level}).set_skip_when_empty());
}
_metrics.add_group("cql", cql_cl_group);
_metrics.add_group("cql", {
sm::make_counter(
"write_consistency_levels_disallowed_violations",
_cql_stats.write_consistency_levels_disallowed_violations,
sm::description("Counts the number of write_consistency_levels_disallowed guardrail violations, "
"i.e. attempts to write with a forbidden consistency level."),
{basic_level}),
sm::make_counter(
"write_consistency_levels_warned_violations",
_cql_stats.write_consistency_levels_warned_violations,
sm::description("Counts the number of write_consistency_levels_warned guardrail violations, "
"i.e. attempts to write with a discouraged consistency level."),
{basic_level}),
});
_mnotifier.register_listener(_migration_subscriber.get());
}
@@ -1263,14 +1233,6 @@ shared_ptr<cql_transport::messages::result_message> query_processor::bounce_to_s
return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard, std::move(cached_fn_calls));
}
query_processor::consistency_level_set query_processor::to_consistency_level_set(const query_processor::cl_option_list& levels) {
query_processor::consistency_level_set result;
for (const auto& opt : levels) {
result.set(static_cast<db::consistency_level>(opt));
}
return result;
}
void query_processor::update_authorized_prepared_cache_config() {
utils::loading_cache_config cfg;
cfg.max_size = _mcfg.authorized_prepared_cache_size;

View File

@@ -34,9 +34,6 @@
#include "service/raft/raft_group0_client.hh"
#include "types/types.hh"
#include "db/auth_version.hh"
#include "db/consistency_level_type.hh"
#include "db/config.hh"
#include "utils/enum_option.hh"
#include "service/storage_proxy_fwd.hh"
@@ -145,30 +142,6 @@ private:
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
lang::manager& _lang_manager;
using cl_option_list = std::vector<enum_option<db::consistency_level_restriction_t>>;
/// Efficient bitmask-based set of consistency levels.
using consistency_level_set = enum_set<super_enum<db::consistency_level,
db::consistency_level::ANY,
db::consistency_level::ONE,
db::consistency_level::TWO,
db::consistency_level::THREE,
db::consistency_level::QUORUM,
db::consistency_level::ALL,
db::consistency_level::LOCAL_QUORUM,
db::consistency_level::EACH_QUORUM,
db::consistency_level::SERIAL,
db::consistency_level::LOCAL_SERIAL,
db::consistency_level::LOCAL_ONE>>;
consistency_level_set _write_consistency_levels_warned;
consistency_level_set _write_consistency_levels_disallowed;
utils::observer<cl_option_list> _write_consistency_levels_warned_observer;
utils::observer<cl_option_list> _write_consistency_levels_disallowed_observer;
static consistency_level_set to_consistency_level_set(const cl_option_list& levels);
public:
static const sstring CQL_VERSION;
@@ -520,21 +493,6 @@ public:
int32_t page_size = -1,
service::node_local_only node_local_only = service::node_local_only::no) const;
enum class write_consistency_guardrail_state { NONE, WARN, FAIL };
inline write_consistency_guardrail_state check_write_consistency_levels_guardrail(db::consistency_level cl) {
_cql_stats.writes_per_consistency_level[size_t(cl)]++;
if (_write_consistency_levels_disallowed.contains(cl)) [[unlikely]] {
_cql_stats.write_consistency_levels_disallowed_violations++;
return write_consistency_guardrail_state::FAIL;
}
if (_write_consistency_levels_warned.contains(cl)) [[unlikely]] {
_cql_stats.write_consistency_levels_warned_violations++;
return write_consistency_guardrail_state::WARN;
}
return write_consistency_guardrail_state::NONE;
}
private:
// Keep the holder until you stop using the `remote` services.
std::pair<std::reference_wrapper<remote>, gate::holder> remote();

View File

@@ -212,20 +212,11 @@ public:
}
virtual uint32_t add_column_for_post_processing(const column_definition& c) override {
auto it = std::find_if(_selectors.begin(), _selectors.end(), [&c](const expr::expression& e) {
auto col = expr::as_if<expr::column_value>(&e);
return col && col->col == &c;
});
if (it != _selectors.end()) {
return std::distance(_selectors.begin(), it);
}
add_column(c);
get_result_metadata()->add_non_serialized_column(c.column_specification);
uint32_t index = selection::add_column_for_post_processing(c);
_selectors.push_back(expr::column_value(&c));
if (_inner_loop.empty()) {
// Simple case: no aggregation
return _selectors.size() - 1;
return index;
} else {
// Complex case: aggregation, must pass through temporary
auto first_func = cql3::functions::aggregate_fcts::make_first_function(c.type);
@@ -479,21 +470,10 @@ std::vector<const column_definition*> selection::wildcard_columns(schema_ptr sch
return simple_selection::make(schema, std::move(columns), false);
}
selection::add_column_result selection::add_column(const column_definition& c) {
auto index = index_of(c);
if (index != -1) {
return {index, false};
}
_columns.push_back(&c);
return {_columns.size() - 1, true};
}
uint32_t selection::add_column_for_post_processing(const column_definition& c) {
auto col = add_column(c);
if (col.added) {
_metadata->add_non_serialized_column(c.column_specification);
}
return col.index;
_columns.push_back(&c);
_metadata->add_non_serialized_column(c.column_specification);
return _columns.size() - 1;
}
::shared_ptr<selection> selection::from_selectors(data_dictionary::database db, schema_ptr schema, const sstring& ks, const std::vector<prepared_selector>& prepared_selectors) {

View File

@@ -130,14 +130,6 @@ public:
virtual std::vector<shared_ptr<functions::function>> used_functions() const { return {}; }
query::partition_slice::option_set get_query_options();
protected:
// Result of add_column: index in _columns and whether it was added now (or existed already).
struct add_column_result {
uint32_t index;
bool added;
};
// Adds a column to the _columns if not already present, returns add_column_result.
add_column_result add_column(const column_definition& c);
private:
static bool processes_selection(const std::vector<prepared_selector>& prepared_selectors);

View File

@@ -259,15 +259,6 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
if (options.getSerialConsistency() == null)
throw new InvalidRequestException("Invalid empty serial consistency level");
#endif
const auto cl = options.get_consistency();
const query_processor::write_consistency_guardrail_state guardrail_state = qp.check_write_consistency_levels_guardrail(cl);
if (guardrail_state == query_processor::write_consistency_guardrail_state::FAIL) {
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(
exceptions::invalid_request_exception(
format("Consistency level {} is not allowed for write operations", cl)));
}
for (size_t i = 0; i < _statements.size(); ++i) {
_statements[i].statement->restrictions().validate_primary_key(options.for_statement(i));
}
@@ -275,31 +266,23 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
if (_has_conditions) {
++_stats.cas_batches;
_stats.statements_in_cas_batches += _statements.size();
return execute_with_conditions(qp, options, query_state).then([guardrail_state, cl] (auto result) {
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
return result;
});
return execute_with_conditions(qp, options, query_state);
}
++_stats.batches;
_stats.statements_in_batches += _statements.size();
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, cl, timeout, tr_state = query_state.get_trace_state(),
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, &options, timeout, tr_state = query_state.get_trace_state(),
permit = query_state.get_permit()] (utils::chunked_vector<mutation> ms) mutable {
return execute_without_conditions(qp, std::move(ms), cl, timeout, std::move(tr_state), std::move(permit));
}).then([guardrail_state, cl] (coordinator_result<> res) {
return execute_without_conditions(qp, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit));
}).then([] (coordinator_result<> res) {
if (!res) {
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
seastar::make_shared<cql_transport::messages::result_message::exception>(std::move(res).assume_error()));
}
auto result = make_shared<cql_transport::messages::result_message::void_message>();
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(result));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
make_shared<cql_transport::messages::result_message::void_message>());
});
}

View File

@@ -659,7 +659,8 @@ future<std::vector<std::vector<managed_bytes_opt>>> schema_describe_statement::d
auto& auth_service = *client_state.get_auth_service();
if (config.with_hashed_passwords) {
if (!co_await client_state.has_superuser()) {
const auto maybe_user = client_state.user();
if (!maybe_user || !co_await auth::has_superuser(auth_service, *maybe_user)) {
co_await coroutine::return_exception(exceptions::unauthorized_exception(
"DESCRIBE SCHEMA WITH INTERNALS AND PASSWORDS can only be issued by a superuser"));
}

View File

@@ -49,7 +49,7 @@ future<> cql3::statements::list_permissions_statement::check_access(query_proces
const auto& as = *state.get_auth_service();
const auto user = state.user();
return state.has_superuser().then([this, &as, user](bool has_super) {
return auth::has_superuser(as, *user).then([this, &as, user](bool has_super) {
if (has_super) {
return make_ready_future<>();
}

View File

@@ -74,7 +74,7 @@ cql3::statements::list_users_statement::execute(query_processor& qp, service::qu
const auto& cs = state.get_client_state();
const auto& as = *cs.get_auth_service();
return cs.has_superuser().then([&cs, &as, make_results = std::move(make_results)](bool has_superuser) mutable {
return auth::has_superuser(as, *cs.user()).then([&cs, &as, make_results = std::move(make_results)](bool has_superuser) mutable {
if (has_superuser) {
return as.underlying_role_manager().query_all().then([&as, make_results = std::move(make_results)](std::unordered_set<sstring> roles) mutable {
return make_results(as, std::move(roles));

View File

@@ -268,22 +268,10 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
inc_cql_stats(qs.get_client_state().is_internal());
const auto cl = options.get_consistency();
const query_processor::write_consistency_guardrail_state guardrail_state = qp.check_write_consistency_levels_guardrail(cl);
if (guardrail_state == query_processor::write_consistency_guardrail_state::FAIL) {
co_return coroutine::exception(
std::make_exception_ptr(exceptions::invalid_request_exception(
format("Consistency level {} is not allowed for write operations", cl))));
}
_restrictions->validate_primary_key(options);
if (has_conditions()) {
auto result = co_await execute_with_condition(qp, qs, options);
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
co_return result;
co_return co_await execute_with_condition(qp, qs, options);
}
json_cache_opt json_cache = maybe_prepare_json_cache(options);
@@ -302,9 +290,6 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
}
auto result = seastar::make_shared<cql_transport::messages::result_message::void_message>();
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
}
if (keys_size_one) {
auto&& table = s->table();
if (_may_use_token_aware_routing && table.uses_tablets() && qs.get_client_state().is_protocol_extension_set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1)) {

View File

@@ -94,7 +94,7 @@ future<> create_role_statement::check_access(query_processor& qp, const service:
return;
}
const bool has_superuser = state.has_superuser().get();
const bool has_superuser = auth::has_superuser(*state.get_auth_service(), *state.user()).get();
if (_options.hashed_password && !has_superuser) {
throw exceptions::unauthorized_exception("Only superusers can create a role with a hashed password.");
@@ -213,7 +213,7 @@ future<> alter_role_statement::check_access(query_processor& qp, const service::
auto& as = *state.get_auth_service();
const auto& user = *state.user();
const bool user_is_superuser = state.has_superuser().get();
const bool user_is_superuser = auth::has_superuser(as, user).get();
if (_options.is_superuser) {
if (!user_is_superuser) {
@@ -306,7 +306,7 @@ future<> drop_role_statement::check_access(query_processor& qp, const service::c
auto& as = *state.get_auth_service();
const bool user_is_superuser = state.has_superuser().get();
const bool user_is_superuser = auth::has_superuser(as, *state.user()).get();
const bool role_has_superuser = [this, &as] {
try {
@@ -442,7 +442,7 @@ list_roles_statement::execute(query_processor& qp, service::query_state& state,
const auto& cs = state.get_client_state();
const auto& as = *cs.get_auth_service();
return cs.has_superuser().then([this, &cs, &as, make_results = std::move(make_results)](bool super) mutable {
return auth::has_superuser(as, *cs.user()).then([this, &cs, &as, make_results = std::move(make_results)](bool super) mutable {
auto& rm = as.underlying_role_manager();
const auto& a = as.underlying_authenticator();
const auto query_mode = _recursive ? auth::recursive_role_query::yes : auth::recursive_role_query::no;

View File

@@ -2757,7 +2757,11 @@ select_statement::ordering_comparator_type select_statement::get_ordering_compar
// even if we don't
// ultimately ship them to the client (CASSANDRA-4911).
for (auto&& [column_def, is_descending] : orderings) {
auto index = selection.add_column_for_post_processing(*column_def);
auto index = selection.index_of(*column_def);
if (index < 0) {
index = selection.add_column_for_post_processing(*column_def);
}
sorters.emplace_back(index, column_def->type);
}
@@ -2860,7 +2864,9 @@ void select_statement::ensure_filtering_columns_retrieval(data_dictionary::datab
selection::selection& selection,
const restrictions::statement_restrictions& restrictions) {
for (auto&& cdef : restrictions.get_column_defs_for_filtering(db)) {
selection.add_column_for_post_processing(*cdef);
if (!selection.has_column(*cdef)) {
selection.add_column_for_post_processing(*cdef);
}
}
}

View File

@@ -11,7 +11,6 @@
#pragma once
#include "cql3/statements/statement_type.hh"
#include "db/consistency_level_type.hh"
#include <cstdint>
@@ -88,9 +87,6 @@ struct cql_stats {
uint64_t replication_strategy_warn_list_violations = 0;
uint64_t replication_strategy_fail_list_violations = 0;
uint64_t writes_per_consistency_level[size_t(db::consistency_level::MAX_VALUE) + 1] = {};
uint64_t write_consistency_levels_disallowed_violations = 0;
uint64_t write_consistency_levels_warned_violations = 0;
private:
uint64_t _unpaged_select_queries[(size_t)ks_selector::SIZE] = {0ul};

View File

@@ -199,9 +199,18 @@ class cache_mutation_reader final : public mutation_reader::impl {
return *_snp->schema();
}
gc_clock::time_point get_read_time() {
return _read_context.tombstone_gc_state() ? gc_clock::now() : gc_clock::time_point::min();
}
gc_clock::time_point get_gc_before() {
if (!_gc_before.has_value()) {
_gc_before = _read_context.tombstone_gc_state().with_commitlog_check_disabled().get_gc_before_for_key(_schema, _dk, _read_time);
auto gc_state = _read_context.tombstone_gc_state();
if (gc_state) {
_gc_before = gc_state->with_commitlog_check_disabled().get_gc_before_for_key(_schema, _dk, _read_time);
} else {
_gc_before = gc_clock::time_point::min();
}
}
return *_gc_before;
}
@@ -233,7 +242,7 @@ public:
, _read_context_holder()
, _read_context(ctx) // ctx is owned by the caller, who's responsible for closing it.
, _next_row(*_schema, *_snp, false, _read_context.is_reversed())
, _read_time(gc_clock::now())
, _read_time(get_read_time())
{
clogger.trace("csm {}: table={}.{}, dk={}, reversed={}, snap={}",
fmt::ptr(this),
@@ -792,7 +801,7 @@ void cache_mutation_reader::copy_from_cache_to_buffer() {
if (_next_row_in_range) {
bool remove_row = false;
if (_read_context.tombstone_gc_state().is_gc_enabled() // do not compact rows when set to no_gc() (used in some unit tests)
if (_read_context.tombstone_gc_state() // do not compact rows when tombstone_gc_state is not set (used in some unit tests)
&& !_next_row.dummy()
&& _snp->at_latest_version()
&& _snp->at_oldest_version()) {

View File

@@ -266,13 +266,6 @@ const config_type& config_type_for<std::vector<enum_option<db::replication_strat
return ct;
}
template <>
const config_type& config_type_for<std::vector<enum_option<db::consistency_level_restriction_t>>>() {
static config_type ct(
"consistency level list", printable_vector_to_json<enum_option<db::consistency_level_restriction_t>>);
return ct;
}
template <>
const config_type& config_type_for<enum_option<db::tri_mode_restriction_t>>() {
static config_type ct(
@@ -422,23 +415,6 @@ public:
}
};
template <>
class convert<enum_option<db::consistency_level_restriction_t>> {
public:
static bool decode(const Node& node, enum_option<db::consistency_level_restriction_t>& rhs) {
std::string name;
if (!convert<std::string>::decode(node, name)) {
return false;
}
try {
std::istringstream(name) >> rhs;
} catch (boost::program_options::invalid_option_value&) {
return false;
}
return true;
}
};
template <>
class convert<enum_option<db::tri_mode_restriction_t>> {
public:
@@ -1090,7 +1066,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.")
, native_transport_port(this, "native_transport_port", "cql_port", value_status::Used, 9042,
"Port on which the CQL native transport listens for clients.")
, maintenance_socket(this, "maintenance_socket", value_status::Used, "workdir",
, maintenance_socket(this, "maintenance_socket", value_status::Used, "ignore",
"The Unix Domain Socket the node uses for maintenance socket.\n"
"The possible options are:\n"
"\tignore the node will not open the maintenance socket.\n"
@@ -1539,15 +1515,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Ignored if authentication tables already contain a super user password.")
, auth_certificate_role_queries(this, "auth_certificate_role_queries", value_status::Used, { { { "source", "SUBJECT" }, {"query", "CN=([^,]+)" } } },
"Regular expression used by CertificateAuthenticator to extract role name from an accepted transport authentication certificate subject info.")
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
, minimum_replication_factor_fail_threshold(this, "minimum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, minimum_replication_factor_warn_threshold(this, "minimum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, 3, "")
, maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, maximum_replication_factor_warn_threshold(this, "maximum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, write_consistency_levels_disallowed(this, "write_consistency_levels_disallowed", liveness::LiveUpdate, value_status::Used, {}, "A list of consistency levels that are not allowed for write operations. Requests using these levels will fail.")
, write_consistency_levels_warned(this, "write_consistency_levels_warned", liveness::LiveUpdate, value_status::Used, {}, "A list of consistency levels that will trigger a warning when used in write operations. Requests using these levels will contain a warning in the query response.")
, maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
, tablets_initial_scale_factor(this, "tablets_initial_scale_factor", liveness::LiveUpdate, value_status::Used, 10,
"Minimum average number of tablet replicas per shard per table. Suppressed by tablet options in table's schema: min_per_shard_tablet_count and min_tablet_count")
, tablets_per_shard_goal(this, "tablets_per_shard_goal", liveness::LiveUpdate, value_status::Used, 100,
@@ -1560,6 +1531,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Maximum number of tablets which may be leaving a shard at the same time. Effecting only on topology coordinator. Set to the same value on all nodes.")
, tablet_streaming_write_concurrency_per_shard(this, "tablet_streaming_write_concurrency_per_shard", liveness::LiveUpdate, value_status::Used, 2,
"Maximum number of tablets which may be pending on a shard at the same time. Effecting only on topology coordinator. Set to the same value on all nodes.")
, replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
, service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table")
, audit(this, "audit", value_status::Used, "table",
@@ -1597,6 +1570,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, disk_space_monitor_high_polling_interval_in_seconds(this, "disk_space_monitor_high_polling_interval_in_seconds", value_status::Used, 1, "Disk-space polling interval at or above polling threshold")
, disk_space_monitor_polling_interval_threshold(this, "disk_space_monitor_polling_interval_threshold", value_status::Used, 0.9, "Disk-space polling threshold. Polling interval is increased when disk utilization is greater than or equal to this threshold")
, critical_disk_utilization_level(this, "critical_disk_utilization_level", liveness::LiveUpdate, value_status::Used, 0.98, "Disk utilization level above which mechanisms preventing a node getting out of space are activated")
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
, rf_rack_valid_keyspaces(this, "rf_rack_valid_keyspaces", liveness::MustRestart, value_status::Used, false,
"Enforce RF-rack-valid keyspaces. Additionally, if there are existing RF-rack-invalid "
"keyspaces, attempting to start a node with this option ON will fail. "
@@ -1869,30 +1843,6 @@ std::unordered_map<sstring, locator::replication_strategy_type> db::replication_
{"EverywhereStrategy", locator::replication_strategy_type::everywhere_topology}};
}
std::unordered_map<sstring, db::consistency_level> db::consistency_level_restriction_t::map() {
using cl = db::consistency_level;
std::unordered_map<sstring, cl> result = {
{"ANY", cl::ANY},
{"ONE", cl::ONE},
{"TWO", cl::TWO},
{"THREE", cl::THREE},
{"QUORUM", cl::QUORUM},
{"ALL", cl::ALL},
{"LOCAL_QUORUM", cl::LOCAL_QUORUM},
{"EACH_QUORUM", cl::EACH_QUORUM},
{"SERIAL", cl::SERIAL},
{"LOCAL_SERIAL", cl::LOCAL_SERIAL},
{"LOCAL_ONE", cl::LOCAL_ONE},
};
constexpr auto expected_size = static_cast<size_t>(cl::MAX_VALUE) - static_cast<size_t>(cl::MIN_VALUE) + 1;
if (result.size() != expected_size) {
on_internal_error_noexcept(dblog, format("consistency_level_option::map() has {} entries but expected {}", result.size(), expected_size));
}
return result;
}
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
std::vector<enum_option<db::experimental_features_t>> ret;
for (const auto& f : db::experimental_features_t::map()) {

View File

@@ -24,7 +24,6 @@
#include "utils/error_injection.hh"
#include "message/dict_trainer.hh"
#include "message/advanced_rpc_compressor.hh"
#include "db/consistency_level_type.hh"
#include "db/tri_mode_restriction.hh"
#include "sstables/compressor.hh"
@@ -127,10 +126,6 @@ struct replication_strategy_restriction_t {
static std::unordered_map<sstring, locator::replication_strategy_type> map(); // for enum_option<>
};
struct consistency_level_restriction_t {
static std::unordered_map<sstring, db::consistency_level> map(); // for enum_option<>
};
constexpr unsigned default_murmur3_partitioner_ignore_msb_bits = 12;
struct tablets_mode_t {
@@ -539,16 +534,10 @@ public:
named_value<std::vector<std::unordered_map<sstring, sstring>>> auth_certificate_role_queries;
// guardrails options
named_value<bool> enable_create_table_with_compact_storage;
named_value<int> minimum_replication_factor_fail_threshold;
named_value<int> minimum_replication_factor_warn_threshold;
named_value<int> maximum_replication_factor_fail_threshold;
named_value<int> maximum_replication_factor_warn_threshold;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_fail_list;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;
named_value<std::vector<enum_option<consistency_level_restriction_t>>> write_consistency_levels_disallowed;
named_value<std::vector<enum_option<consistency_level_restriction_t>>> write_consistency_levels_warned;
named_value<int> maximum_replication_factor_fail_threshold;
named_value<double> tablets_initial_scale_factor;
named_value<unsigned> tablets_per_shard_goal;
@@ -556,6 +545,9 @@ public:
named_value<unsigned> tablet_streaming_read_concurrency_per_shard;
named_value<unsigned> tablet_streaming_write_concurrency_per_shard;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_fail_list;
named_value<uint32_t> service_levels_interval;
named_value<sstring> audit;
@@ -606,6 +598,8 @@ public:
named_value<float> disk_space_monitor_polling_interval_threshold;
named_value<float> critical_disk_utilization_level;
named_value<bool> enable_create_table_with_compact_storage;
named_value<bool> rf_rack_valid_keyspaces;
named_value<bool> enforce_rack_list;

View File

@@ -154,10 +154,7 @@ hint_sender::~hint_sender() {
future<> hint_sender::stop(drain should_drain) noexcept {
seastar::thread_attributes attr;
attr.sched_group = _hints_cpu_sched_group;
return seastar::async(std::move(attr), [this, should_drain] {
return seastar::async([this, should_drain] {
set_stopping();
_stop_as.request_abort();
_stopped.get();

View File

@@ -125,7 +125,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
tracing::trace_state_ptr _trace_state;
mutation_reader::forwarding _fwd_mr;
bool _range_query;
tombstone_gc_state _tombstone_gc_state;
const tombstone_gc_state* _tombstone_gc_state;
max_purgeable_fn _get_max_purgeable;
// When reader enters a partition, it must be set up for reading that
// partition from the underlying mutation source (_underlying) in one of two ways:
@@ -149,7 +149,7 @@ public:
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tombstone_gc_state gc_state,
const tombstone_gc_state* gc_state,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
@@ -161,7 +161,7 @@ public:
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _range_query(!query::is_single_partition(range))
, _tombstone_gc_state(std::move(gc_state))
, _tombstone_gc_state(gc_state)
, _get_max_purgeable(std::move(get_max_purgeable))
, _underlying(_cache, *this)
{
@@ -197,7 +197,7 @@ public:
bool partition_exists() const { return _partition_exists; }
void on_underlying_created() { ++_underlying_created; }
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
const tombstone_gc_state& tombstone_gc_state() const { return _tombstone_gc_state; }
const tombstone_gc_state* tombstone_gc_state() const { return _tombstone_gc_state; }
max_purgeable get_max_purgeable(const dht::decorated_key& dk, is_shadowable is) const { return _get_max_purgeable(dk, is); }
public:
future<> ensure_underlying() {

View File

@@ -775,7 +775,7 @@ row_cache::make_reader_opt(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tombstone_gc_state gc_state,
const tombstone_gc_state* gc_state,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,

View File

@@ -373,7 +373,7 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no,
tombstone_gc_state gc_state = tombstone_gc_state::no_gc(),
const tombstone_gc_state* gc_state = nullptr,
max_purgeable_fn get_max_purgeable = can_never_purge) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(get_max_purgeable), std::move(trace_state), fwd, fwd_mr)) {
return std::move(*reader_opt);
@@ -386,7 +386,7 @@ public:
reader_permit permit,
const dht::partition_range&,
const query::partition_slice&,
tombstone_gc_state,
const tombstone_gc_state*,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
@@ -395,7 +395,7 @@ public:
mutation_reader make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range = query::full_partition_range,
tombstone_gc_state gc_state = tombstone_gc_state::no_gc(),
const tombstone_gc_state* gc_state = nullptr,
max_purgeable_fn get_max_purgeable = can_never_purge) {
auto& full_slice = s->full_slice();
return make_reader(std::move(s), std::move(permit), range, full_slice, nullptr,

View File

@@ -1139,17 +1139,14 @@ future<> schema_applier::finalize_tables_and_views() {
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
for (auto& dropped_view : diff.tables_and_views.local().views.dropped) {
auto s = dropped_view.get();
co_await _ss.local().on_cleanup_for_drop_table(s->id());
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}
for (auto& dropped_table : diff.tables_and_views.local().tables.dropped) {
auto s = dropped_table.get();
co_await _ss.local().on_cleanup_for_drop_table(s->id());
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}
for (auto& dropped_cdc : diff.tables_and_views.local().cdc.dropped) {
auto s = dropped_cdc.get();
co_await _ss.local().on_cleanup_for_drop_table(s->id());
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
}

View File

@@ -105,7 +105,7 @@ namespace {
schema_builder::register_schema_initializer([](schema_builder& builder) {
if (builder.ks_name() == schema_tables::NAME) {
// all schema tables are group0 tables
builder.set_is_group0_table();
builder.set_is_group0_table(true);
}
});
}
@@ -2840,15 +2840,20 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks.query_processor().execute_internal(
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
GET_COLUMN_MAPPING_QUERY,
db::consistency_level::LOCAL_ONE,
{table_id.uuid(), version.uuid()},
cql3::query_processor::cache_internal::no
);
if (results->empty()) {
co_return std::nullopt;
// If we don't have a stored column_mapping for an obsolete schema version
// then it means it's way too old and been cleaned up already.
// Fail the whole learn stage in this case.
co_await coroutine::return_exception(std::runtime_error(
format("Failed to look up column mapping for schema version {}",
version)));
}
std::vector<column_definition> static_columns, regular_columns;
for (const auto& row : *results) {
@@ -2876,18 +2881,6 @@ future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_ke
co_return std::move(cm);
}
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
auto cm_opt = co_await schema_tables::get_column_mapping_if_exists(sys_ks, table_id, version);
if (!cm_opt) {
// If we don't have a stored column_mapping for an obsolete schema version
// then it means it's way too old and been cleaned up already.
co_await coroutine::return_exception(std::runtime_error(
format("Failed to look up column mapping for schema version {}",
version)));
}
co_return std::move(*cm_opt);
}
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
GET_COLUMN_MAPPING_QUERY,

View File

@@ -320,8 +320,6 @@ std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const ss
future<> store_column_mapping(sharded<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
/// Query column mapping for a given version of the table locally.
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Returns the same result as `get_column_mapping()` wrapped in optional and returns nullopt if the mapping doesn't exist.
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Check that column mapping exists for a given version of the table
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table

View File

@@ -87,15 +87,31 @@ namespace {
static const std::unordered_set<sstring> tables = {
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
system_keyspace::BROADCAST_KV_STORE,
system_keyspace::CDC_GENERATIONS_V3,
system_keyspace::RAFT,
system_keyspace::RAFT_SNAPSHOTS,
system_keyspace::RAFT_SNAPSHOT_CONFIG,
system_keyspace::GROUP0_HISTORY,
system_keyspace::DISCOVERY,
system_keyspace::TABLETS,
system_keyspace::TOPOLOGY,
system_keyspace::TOPOLOGY_REQUESTS,
system_keyspace::LOCAL,
system_keyspace::PEERS,
system_keyspace::SCYLLA_LOCAL,
system_keyspace::COMMITLOG_CLEANUPS,
system_keyspace::SERVICE_LEVELS_V2,
system_keyspace::VIEW_BUILD_STATUS_V2,
system_keyspace::CDC_STREAMS_STATE,
system_keyspace::CDC_STREAMS_HISTORY,
system_keyspace::ROLES,
system_keyspace::ROLE_MEMBERS,
system_keyspace::ROLE_ATTRIBUTES,
system_keyspace::ROLE_PERMISSIONS,
system_keyspace::CDC_LOCAL,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
system_keyspace::CLIENT_ROUTES,
};
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
builder.enable_schema_commitlog();
@@ -127,7 +143,7 @@ namespace {
system_keyspace::REPAIR_TASKS,
};
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
builder.set_is_group0_table();
builder.set_is_group0_table(true);
}
});
}
@@ -400,7 +416,26 @@ schema_ptr system_keyspace::cdc_streams_history() {
}
schema_ptr system_keyspace::raft() {
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT, true);
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, RAFT);
return schema_builder(NAME, RAFT, std::optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
// raft log part
.with_column("index", long_type, column_kind::clustering_key)
.with_column("term", long_type)
.with_column("data", bytes_type) // decltype(raft::log_entry::data) - serialized variant
// persisted term and vote
.with_column("vote_term", long_type, column_kind::static_column)
.with_column("vote", uuid_type, column_kind::static_column)
// id of the most recent persisted snapshot
.with_column("snapshot_id", uuid_type, column_kind::static_column)
.with_column("commit_idx", long_type, column_kind::static_column)
.set_comment("Persisted RAFT log, votes and snapshot info")
.with_hash_version()
.set_caching_options(caching_options::get_disabled_caching_options())
.build();
}();
return schema;
}
@@ -408,32 +443,35 @@ schema_ptr system_keyspace::raft() {
// on user-provided state machine and could be stored anywhere else in any other form.
// This should be seen as a snapshot descriptor, instead.
schema_ptr system_keyspace::raft_snapshots() {
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_SNAPSHOTS, true);
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, RAFT_SNAPSHOTS);
return schema_builder(NAME, RAFT_SNAPSHOTS, std::optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("snapshot_id", uuid_type)
// Index and term of last entry in the snapshot
.with_column("idx", long_type)
.with_column("term", long_type)
.set_comment("Persisted RAFT snapshot descriptors info")
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr system_keyspace::raft_snapshot_config() {
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_SNAPSHOT_CONFIG, true);
return schema;
}
static thread_local auto schema = [] {
auto id = generate_legacy_id(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG);
return schema_builder(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG, std::optional(id))
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
.with_column("server_id", uuid_type, column_kind::clustering_key)
.with_column("can_vote", boolean_type)
// Raft tables for strongly consistent tablets.
// These tables have partition keys of the form (shard, group_id), allowing the data
// to be co-located with the tablet replica that owns the raft group.
// The raft_groups_partitioner creates tokens that map to the specified shard.
schema_ptr system_keyspace::raft_groups() {
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT_GROUPS, false);
return schema;
}
schema_ptr system_keyspace::raft_groups_snapshots() {
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOTS, false);
return schema;
}
schema_ptr system_keyspace::raft_groups_snapshot_config() {
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOT_CONFIG, false);
.set_comment("RAFT configuration for the latest snapshot descriptor")
.with_hash_version()
.build();
}();
return schema;
}
@@ -2278,29 +2316,21 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
r.insert(r.end(), {sstables_registry()});
}
if (cfg.check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES)) {
r.insert(r.end(), {raft_groups(), raft_groups_snapshots(), raft_groups_snapshot_config()});
}
return r;
}
static bool maybe_write_in_user_memory(schema_ptr s, replica::database& db) {
bool strongly_consistent = db.get_config().check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES);
static bool maybe_write_in_user_memory(schema_ptr s) {
return (s.get() == system_keyspace::batchlog().get())
|| (s.get() == system_keyspace::batchlog_v2().get())
|| (s.get() == system_keyspace::paxos().get())
|| s == system_keyspace::scylla_views_builds_in_progress()
|| (strongly_consistent && s == system_keyspace::raft_groups())
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshots())
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshot_config());
|| s == system_keyspace::scylla_views_builds_in_progress();
}
future<> system_keyspace::make(
locator::effective_replication_map_factory& erm_factory,
replica::database& db) {
for (auto&& table : system_keyspace::all_tables(db.get_config())) {
co_await db.create_local_system_table(table, maybe_write_in_user_memory(table, db), erm_factory);
co_await db.create_local_system_table(table, maybe_write_in_user_memory(table), erm_factory);
co_await db.find_column_family(table).init_storage();
}

View File

@@ -191,9 +191,6 @@ public:
static constexpr auto RAFT = "raft";
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
static constexpr auto RAFT_GROUPS = "raft_groups";
static constexpr auto RAFT_GROUPS_SNAPSHOTS = "raft_groups_snapshots";
static constexpr auto RAFT_GROUPS_SNAPSHOT_CONFIG = "raft_groups_snapshot_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static constexpr auto REPAIR_TASKS = "repair_tasks";
static constexpr auto GROUP0_HISTORY = "group0_history";
@@ -247,9 +244,6 @@ public:
static schema_ptr scylla_local();
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr raft_groups();
static schema_ptr raft_groups_snapshots();
static schema_ptr raft_groups_snapshot_config();
static schema_ptr repair_history();
static schema_ptr repair_tasks();
static schema_ptr group0_history();

View File

@@ -9,7 +9,6 @@
#include "exceptions/exceptions.hh"
#include "db/tablet_options.hh"
#include <seastar/core/bitops.hh>
#include "utils/log.hh"
extern logging::logger dblog;
@@ -24,11 +23,6 @@ tablet_options::tablet_options(const map_type& map) {
min_tablet_count.emplace(value);
}
break;
case tablet_option_type::max_tablet_count:
if (auto value = std::atol(value_str.c_str())) {
max_tablet_count.emplace(value);
}
break;
case tablet_option_type::min_per_shard_tablet_count:
if (auto value = std::atof(value_str.c_str())) {
min_per_shard_tablet_count.emplace(value);
@@ -46,7 +40,6 @@ tablet_options::tablet_options(const map_type& map) {
sstring tablet_options::to_string(tablet_option_type hint) {
switch (hint) {
case tablet_option_type::min_tablet_count: return "min_tablet_count";
case tablet_option_type::max_tablet_count: return "max_tablet_count";
case tablet_option_type::min_per_shard_tablet_count: return "min_per_shard_tablet_count";
case tablet_option_type::expected_data_size_in_gb: return "expected_data_size_in_gb";
}
@@ -55,8 +48,6 @@ sstring tablet_options::to_string(tablet_option_type hint) {
tablet_option_type tablet_options::from_string(sstring hint_desc) {
if (hint_desc == "min_tablet_count") {
return tablet_option_type::min_tablet_count;
} else if (hint_desc == "max_tablet_count") {
return tablet_option_type::max_tablet_count;
} else if (hint_desc == "min_per_shard_tablet_count") {
return tablet_option_type::min_per_shard_tablet_count;
} else if (hint_desc == "expected_data_size_in_gb") {
@@ -71,9 +62,6 @@ std::map<sstring, sstring> tablet_options::to_map() const {
if (min_tablet_count) {
res[to_string(tablet_option_type::min_tablet_count)] = fmt::to_string(*min_tablet_count);
}
if (max_tablet_count) {
res[to_string(tablet_option_type::max_tablet_count)] = fmt::to_string(*max_tablet_count);
}
if (min_per_shard_tablet_count) {
res[to_string(tablet_option_type::min_per_shard_tablet_count)] = fmt::to_string(*min_per_shard_tablet_count);
}
@@ -84,23 +72,11 @@ std::map<sstring, sstring> tablet_options::to_map() const {
}
void tablet_options::validate(const map_type& map) {
std::optional<ssize_t> min_tablets;
std::optional<ssize_t> max_tablets;
for (auto& [key, value_str] : map) {
switch (tablet_options::from_string(key)) {
case tablet_option_type::min_tablet_count:
if (auto value = std::atol(value_str.c_str()); value < 0) {
throw exceptions::configuration_exception(format("Invalid value '{}' for min_tablet_count", value));
} else {
min_tablets = value;
}
break;
case tablet_option_type::max_tablet_count:
if (auto value = std::atol(value_str.c_str()); value <= 0) {
throw exceptions::configuration_exception(format("Invalid value '{}' for max_tablet_count", value));
} else {
max_tablets = value;
}
break;
case tablet_option_type::min_per_shard_tablet_count:
@@ -115,20 +91,6 @@ void tablet_options::validate(const map_type& map) {
break;
}
}
if (min_tablets && max_tablets) {
auto effective_min = 1u << log2ceil(static_cast<size_t>(*min_tablets));
auto effective_max = 1u << log2floor(static_cast<size_t>(*max_tablets));
if (effective_min > effective_max) {
throw exceptions::configuration_exception(
format("Invalid tablet count range: min_tablet_count={} (effective: {}) and max_tablet_count={} (effective: {}) "
"result in conflicting constraints after rounding to powers of 2. "
"Since tablet counts must be powers of 2, min_tablet_count rounds up and max_tablet_count rounds down"
"Please adjust the values so that the smallest power of 2 greater than min_tablet_count is <= the largest power of 2 <= max_tablet_count.",
*min_tablets, effective_min, *max_tablets, effective_max));
}
}
}
} // namespace db

View File

@@ -18,7 +18,6 @@ namespace db {
// Per-table tablet options
enum class tablet_option_type {
min_tablet_count,
max_tablet_count,
min_per_shard_tablet_count,
expected_data_size_in_gb,
};
@@ -27,7 +26,6 @@ struct tablet_options {
using map_type = std::map<sstring, sstring>;
std::optional<ssize_t> min_tablet_count;
std::optional<ssize_t> max_tablet_count;
std::optional<double> min_per_shard_tablet_count;
std::optional<ssize_t> expected_data_size_in_gb;
@@ -35,7 +33,7 @@ struct tablet_options {
explicit tablet_options(const map_type& map);
operator bool() const noexcept {
return min_tablet_count || max_tablet_count || min_per_shard_tablet_count || expected_data_size_in_gb;
return min_tablet_count || min_per_shard_tablet_count || expected_data_size_in_gb;
}
map_type to_map() const;

View File

@@ -932,7 +932,8 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
const row& existing_row = existing.cells();
const row& updated_row = update.cells();
return std::ranges::all_of(_base->regular_columns(), [this, &updated_row, &existing_row] (const column_definition& cdef) {
const bool base_has_nonexpiring_marker = update.marker().is_live() && !update.marker().is_expiring();
return std::ranges::all_of(_base->regular_columns(), [this, &updated_row, &existing_row, base_has_nonexpiring_marker] (const column_definition& cdef) {
const auto view_it = _view->columns_by_name().find(cdef.name());
const bool column_is_selected = view_it != _view->columns_by_name().end();
@@ -940,7 +941,7 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
// as part of its PK, there are NO virtual columns corresponding to the unselected columns in the view.
// Because of that, we don't generate view updates when the value in an unselected column is created
// or changes.
if (!column_is_selected) {
if (!column_is_selected && _base_info.has_base_non_pk_columns_in_view_pk) {
return true;
}
@@ -949,20 +950,40 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
return false;
}
// We cannot skip if the value was created or deleted
// We cannot skip if the value was created or deleted, unless we have a non-expiring marker
const auto* existing_cell = existing_row.find_cell(cdef.id);
const auto* updated_cell = updated_row.find_cell(cdef.id);
if (existing_cell == nullptr || updated_cell == nullptr) {
return existing_cell == updated_cell;
return existing_cell == updated_cell || (!column_is_selected && base_has_nonexpiring_marker);
}
atomic_cell_view existing_cell_view = existing_cell->as_atomic_cell(cdef);
atomic_cell_view updated_cell_view = updated_cell->as_atomic_cell(cdef);
// We cannot skip when a selected column is changed
if (view_it->second->is_view_virtual()) {
return atomic_cells_liveness_equal(existing_cell_view, updated_cell_view);
if (column_is_selected) {
if (view_it->second->is_view_virtual()) {
return atomic_cells_liveness_equal(existing_cell_view, updated_cell_view);
}
return compare_atomic_cell_for_merge(existing_cell_view, updated_cell_view) == 0;
}
return compare_atomic_cell_for_merge(existing_cell_view, updated_cell_view) == 0;
// With non-expiring row marker, liveness checks below are not relevant
if (base_has_nonexpiring_marker) {
return true;
}
if (existing_cell_view.is_live() != updated_cell_view.is_live()) {
return false;
}
// We cannot skip if the change updates TTL
const bool existing_has_ttl = existing_cell_view.is_live_and_has_ttl();
const bool updated_has_ttl = updated_cell_view.is_live_and_has_ttl();
if (existing_has_ttl || updated_has_ttl) {
return existing_has_ttl == updated_has_ttl && existing_cell_view.expiry() == updated_cell_view.expiry();
}
return true;
});
}
@@ -1439,7 +1460,7 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
}
auto dk = dht::decorate_key(*_schema, _key);
const auto gc_state = _base.get_tombstone_gc_state();
const auto& gc_state = _base.get_compaction_manager().get_tombstone_gc_state();
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
// We allow existing to be disengaged, which we treat the same as an empty row.
@@ -1468,7 +1489,7 @@ void view_update_builder::generate_update(static_row&& update, const tombstone&
}
auto dk = dht::decorate_key(*_schema, _key);
const auto gc_state = _base.get_tombstone_gc_state();
const auto& gc_state = _base.get_compaction_manager().get_tombstone_gc_state();
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
// We allow existing to be disengaged, which we treat the same as an empty row.
@@ -3300,7 +3321,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
step.pslice,
batch_size,
query::max_partitions,
tombstone_gc_state::no_gc());
tombstone_gc_state(nullptr));
auto consumer = compact_for_query<view_builder::consumer>(compaction_state, view_builder::consumer{*this, _vug.shared_from_this(), step, now});
auto built = step.reader.consume_in_thread(std::move(consumer));
if (auto ds = std::move(*compaction_state).detach_state()) {

View File

@@ -610,7 +610,7 @@ future<> view_building_worker::do_build_range(table_id base_id, std::vector<tabl
slice,
query::max_rows,
query::max_partitions,
base_cf->get_tombstone_gc_state());
base_cf->get_compaction_manager().get_tombstone_gc_state());
auto consumer = compact_for_query<view_building_worker::consumer>(compaction_state, view_building_worker::consumer(
_db,
views_ids,

View File

@@ -834,10 +834,7 @@ class clients_table : public streaming_virtual_table {
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
if (a->port != b->port) {
return a->port < b->port;
}
return a->client_type_str() < b->client_type_str();
return a->port < b->port || a->client_type_str() < b->client_type_str();
});
for (const auto& cd : clients) {

View File

@@ -4,7 +4,6 @@ add_library(scylla_dht STATIC)
target_sources(scylla_dht
PRIVATE
boot_strapper.cc
fixed_shard.cc
i_partitioner.cc
murmur3_partitioner.cc
range_streamer.cc

View File

@@ -1,156 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/on_internal_error.hh>
#include "dht/fixed_shard.hh"
#include "dht/token.hh"
#include "schema/schema.hh"
#include "sstables/key.hh"
#include "utils/class_registrator.hh"
#include "keys/keys.hh"
#include "keys/compound_compat.hh"
#include "utils/murmur_hash.hh"
#include "utils/log.hh"
namespace dht {
static logging::logger fslog("fixed_shard");
const sstring fixed_shard_partitioner::classname = "com.scylladb.dht.FixedShardPartitioner";
const sstring fixed_shard_partitioner::name() const {
return classname;
}
dht::token fixed_shard_partitioner::token_for_shard(uint16_t shard, uint64_t hash_bits) {
int64_t token_value = (static_cast<int64_t>(shard) << shard_shift) | static_cast<int64_t>(hash_bits & hash_mask);
return dht::token(token_value);
}
unsigned fixed_shard_partitioner::shard_of(dht::token token) {
uint64_t token_bits = static_cast<uint64_t>(token.raw());
return static_cast<unsigned>(token_bits >> shard_shift);
}
// Called with the bytes of the first partition key component, representing the shard.
static uint16_t compute_shard(managed_bytes_view mb) {
if (mb.size() != sizeof(uint16_t)) {
on_internal_error(fslog, format("Invalid shard value size: expected {}, got {}", sizeof(uint16_t), mb.size()));
}
// No need to linearize, 2 bytes are represented as a single fragment
auto shard_bytes = mb.current_fragment();
uint16_t shard_value = net::ntoh(read_unaligned<uint16_t>(shard_bytes.begin()));
if (shard_value > fixed_shard_partitioner::max_shard) {
on_internal_error(fslog, format("Shard value {} exceeds maximum allowed shard {}", shard_value, fixed_shard_partitioner::max_shard));
}
return shard_value;
}
dht::token fixed_shard_partitioner::get_token(const schema& s, partition_key_view key) const {
uint16_t shard_value = compute_shard(*key.begin());
std::array<uint64_t, 2> hash;
auto&& legacy = key.legacy_form(s);
utils::murmur_hash::hash3_x64_128(legacy.begin(), legacy.size(), 0, hash);
auto token = fixed_shard_partitioner::token_for_shard(shard_value, hash[0]);
fslog.trace("get_token: shard={}, token={}", shard_value, token);
return token;
}
dht::token fixed_shard_partitioner::get_token(const sstables::key_view& key) const {
return key.with_linearized([&](bytes_view v) {
auto comp = composite_view(v, true);
uint16_t shard_value = compute_shard(comp.begin()->first);
std::array<uint64_t, 2> hash;
utils::murmur_hash::hash3_x64_128(v, 0, hash);
auto token = fixed_shard_partitioner::token_for_shard(shard_value, hash[0]);
fslog.trace("get_token: shard={}, token={}", shard_value, token);
return token;
});
}
using registry = class_registrator<dht::i_partitioner, fixed_shard_partitioner>;
static registry registrator(fixed_shard_partitioner::classname);
static registry registrator_short_name("FixedShardPartitioner");
fixed_shard_sharder& fixed_shard_sharder::instance() {
static thread_local fixed_shard_sharder sharder;
return sharder;
}
fixed_shard_sharder::fixed_shard_sharder()
: static_sharder(smp::count, 0)
{
}
unsigned fixed_shard_sharder::shard_of(const dht::token& t) const {
if (t.is_minimum()) {
return dht::token::shard_of_minimum_token();
}
if (t.is_maximum()) {
return shard_count() - 1;
}
auto shard = fixed_shard_partitioner::shard_of(t);
fslog.trace("shard_of({}) = {}", t, std::min(shard, shard_count() - 1));
return std::min(shard, shard_count() - 1);
}
std::optional<unsigned> fixed_shard_sharder::try_get_shard_for_reads(const dht::token& t) const {
return shard_of(t);
}
dht::shard_replica_set fixed_shard_sharder::shard_for_writes(const dht::token& t, std::optional<dht::write_replica_set_selector>) const {
// We don't support migrations of the data in raft tables for strongly consistent tables.
// When migrating a strongly consistent tablet, we'll need to move its metadata
// explicitly to the new shard along with its raft group data.
auto shard = try_get_shard_for_reads(t);
if (!shard) {
return {};
}
return { *shard };
}
dht::token fixed_shard_sharder::token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans) const {
return token_for_next_shard_for_reads(t, shard, spans);
}
dht::token fixed_shard_sharder::token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans) const {
// With the fixed_shard_partitioner, there's only one token range per shard, so spans > 1 always overflows.
if (spans > 1 || shard >= shard_count() || t.is_maximum()) {
return dht::maximum_token();
}
int64_t token_value = t.is_minimum() ? 0 : t.raw();
int64_t start = static_cast<int64_t>(shard) << fixed_shard_partitioner::shard_shift;
if (token_value < start) {
return dht::token(start);
}
return dht::maximum_token();
}
std::optional<dht::shard_and_token> fixed_shard_sharder::next_shard(const dht::token& t) const {
auto shard = try_get_shard_for_reads(t);
if (!shard || *shard + 1 >= shard_count()) {
return std::nullopt;
}
auto next_shard = *shard + 1;
auto next_token = token_for_next_shard_for_reads(t, next_shard);
if (next_token.is_maximum()) {
return std::nullopt;
}
return dht::shard_and_token{next_shard, next_token};
}
std::optional<dht::shard_and_token> fixed_shard_sharder::next_shard_for_reads(const dht::token& t) const {
return next_shard(t);
}
} // namespace dht

View File

@@ -1,93 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "dht/i_partitioner.hh"
#include "dht/token-sharding.hh"
class schema;
namespace sstables {
class key_view;
}
namespace dht {
/// A partitioner mainly for Raft metadata tables for strongly consistent tables
/// (raft_groups, raft_groups_snapshots, raft_groups_snapshot_config).
///
/// These tables have partition keys with the shard as the first column.
/// The partitioner creates tokens that will be assigned to the shard specified
/// in the partition key's first column.
///
/// Token encoding:
/// [shard: 16 bits][hash: 48 bits]
///
/// To skip converting between signed and unsigned tokens (biasing), the top bit
/// is always 0, so we can effectively use only 15 bits for the shard.
/// This correlates with the limit enforced by the Raft tables' schema which uses
/// a signed int (smallint) for the shard column, where allowing only positive
/// shards allows up to 32767 (1 << 15 - 1) shards.
///
/// The lower 48 bits is a hash of the entire partition key.
///
/// This encoding is shard-count independent - the shard can be extracted by simple
/// bit shifting regardless of how many shards exist in the cluster.
struct fixed_shard_partitioner final : public dht::i_partitioner {
static constexpr unsigned shard_bits = 16;
static constexpr unsigned shard_shift = 64 - shard_bits;
static constexpr uint16_t max_shard = std::numeric_limits<int16_t>::max();
static constexpr uint64_t hash_mask = (uint64_t(1) << shard_shift) - 1;
static const sstring classname;
fixed_shard_partitioner() = default;
virtual const sstring name() const override;
virtual dht::token get_token(const schema& s, partition_key_view key) const override;
virtual dht::token get_token(const sstables::key_view& key) const override;
static dht::token token_for_shard(uint16_t shard, uint64_t hash_bits);
static unsigned shard_of(dht::token token);
};
/// A sharder for Raft metadata tables for strongly consistent tables (raft_groups,
/// raft_groups_snapshots, raft_groups_snapshot_config).
///
/// These tables store raft group state for all tablets of strongly consistent tables.
/// The sharder allows specifying the shard where the metadata should be located by
/// including the shard id in the partition key.
///
/// The shard is encoded in the token by fixed_shard_partitioner. The sharder extracts
/// the shard by decoding the token bits used for shard encoding.
///
/// We inherit from static_sharder because that's what we use for system tables.
class fixed_shard_sharder : public dht::static_sharder {
public:
/// Singleton instance for the raft tablet sharder.
static fixed_shard_sharder& instance();
fixed_shard_sharder();
virtual ~fixed_shard_sharder() = default;
/// Returns the shard for a token by extracting it from the token's high bits.
/// This overrides static_sharder::shard_of to use the bit-based encoding.
virtual unsigned shard_of(const dht::token& t) const override;
virtual std::optional<unsigned> try_get_shard_for_reads(const dht::token& t) const override;
virtual dht::shard_replica_set shard_for_writes(const dht::token& t, std::optional<dht::write_replica_set_selector> sel) const override;
virtual dht::token token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans = 1) const override;
virtual dht::token token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans = 1) const override;
virtual std::optional<dht::shard_and_token> next_shard(const dht::token& t) const override;
virtual std::optional<dht::shard_and_token> next_shard_for_reads(const dht::token& t) const override;
};
} // namespace dht

View File

@@ -31,23 +31,6 @@ EOS
sysctl -p /etc/sysctl.d/99-scylla-perfevent.conf
fi
# Tune tcp_mem to max out at 3% of total system memory.
# Seastar defaults to allocating 93% of physical memory. The kernel's default
# allocation for TCP is ~9%. This adds up to 102%. Reduce the TCP allocation
# to 3% to avoid OOM.
PAGE_SIZE=$(getconf PAGE_SIZE)
TOTAL_MEM_KB=$(sed -n 's/^MemTotal:[[:space:]]*\([0-9]*\).*/\1/p' /proc/meminfo)
TOTAL_MEM_BYTES=$((TOTAL_MEM_KB * 1024))
TCP_MEM_MAX=$((TOTAL_MEM_BYTES * 3 / 100))
TCP_MEM_MAX_PAGES=$((TCP_MEM_MAX / PAGE_SIZE))
TCP_MEM_MID_PAGES=$((TCP_MEM_MAX * 2 / 3 / PAGE_SIZE))
TCP_MEM_MIN_PAGES=$((TCP_MEM_MAX / 2 / PAGE_SIZE))
cat << EOS > /etc/sysctl.d/99-scylla-tcp.conf
# Scylla: limit TCP memory to 3% of total system memory
net.ipv4.tcp_mem = $TCP_MEM_MIN_PAGES $TCP_MEM_MID_PAGES $TCP_MEM_MAX_PAGES
EOS
sysctl -p /etc/sysctl.d/99-scylla-tcp.conf || :
if [ ! -d /run/systemd/system ]; then
exit 0
fi

View File

@@ -39,7 +39,7 @@ Description: debugging symbols for %{product}-server
Package: %{product}-kernel-conf
Architecture: any
Depends: procps, sed
Depends: procps
Replaces: scylla-enterprise-kernel-conf (<< 2025.1.0~)
Breaks: scylla-enterprise-kernel-conf (<< 2025.1.0~)
Description: Scylla kernel tuning configuration

View File

@@ -6,7 +6,6 @@ case "$1" in
purge|remove)
if [ "$1" = "purge" ]; then
rm -f /etc/sysctl.d/99-scylla-perfevent.conf
rm -f /etc/sysctl.d/99-scylla-tcp.conf
fi
;;
esac

View File

@@ -186,7 +186,7 @@ This package contains the main scylla configuration file.
%package kernel-conf
Group: Applications/Databases
Summary: Scylla configuration package for the Linux kernel
Requires: kmod sed
Requires: kmod
# tuned overwrites our sysctl settings
Obsoletes: tuned >= 2.11.0
Provides: scylla-enterprise-kernel-conf = %{version}-%{release}
@@ -220,7 +220,6 @@ fi
%{_unitdir}/scylla-tune-sched.service
/opt/scylladb/kernel_conf/*
%ghost /etc/sysctl.d/99-scylla-perfevent.conf
%ghost /etc/sysctl.d/99-scylla-tcp.conf
%package node-exporter

View File

@@ -316,17 +316,6 @@ experimental:
example, a single PutItem is represented by a REMOVE + MODIFY event,
instead of just a single MODIFY or INSERT.
<https://github.com/scylladb/scylla/issues/6930>
* Alternator Streams cannot always distinguish between INSERT and MODIFY
events - the distinction depends on whether the item existed before the
change. Alternator Streams may also produce spurious REMOVE or MODIFY
events when a non-existent item is deleted or when an item is set to the
same value it already had.
This incompatibility can be resolved by setting the configuration option
``alternator_streams_increased_compatibility=true``, but this comes with
a performance penalty because Alternator needs to read the old value of
the item during data-modifying operations on tables with Alternator
Streams enabled. By default (``alternator_streams_increased_compatibility=false``),
this incompatibility remains.
<https://github.com/scylladb/scylla/issues/6918>
* In GetRecords responses, Alternator sets `eventSource` to
`scylladb:alternator`, rather than `aws:dynamodb`, and doesn't set the

View File

@@ -106,8 +106,6 @@ The computed number of tablets a table will have is based on several parameters
tablets on average. See :ref:`Per-table tablet options <cql-per-table-tablet-options>` for details.
* Table-level option ``'min_tablet_count'``. This option sets the minimal number of tablets for the given table.
See :ref:`Per-table tablet options <cql-per-table-tablet-options>` for details.
* Table-level option ``'max_tablet_count'``. This option sets the maximum number of tablets for the given table
See :ref:`Per-table tablet options <cql-per-table-tablet-options>` for details.
* Config option ``'tablets_initial_scale_factor'``. This option sets the minimal number of tablets per shard
per table globally. This option can be overridden by the table-level option: ``'min_per_shard_tablet_count'``.
``'tablets_initial_scale_factor'`` is ignored if either the keyspace option ``'initial'`` or table-level
@@ -121,18 +119,6 @@ half the target tablet size, it will be merged (the number of tablets will be ha
Each of these factors is taken into consideration, and the one producing the largest number of tablets wins, and
will be used as the number of tablets for the given table.
.. note::
When both ``'min_tablet_count'`` and ``'max_tablet_count'`` are set together, ScyllaDB validates the
combination by computing **effective** bounds:
* The **effective minimum** is the smallest power of 2 that is greater than or equal to ``min_tablet_count``.
* The **effective maximum** is the largest power of 2 that is less than or equal to ``max_tablet_count``.
ScyllaDB validates that the effective minimum does not exceed the effective maximum. If it does,
the ``CREATE TABLE`` statement will be rejected with an error. To avoid ambiguity, it is recommended
to use power-of-2 values for both options.
As the last step, in order to avoid having too many tablets per shard, which could potentially lead to overload
and performance degradation, ScyllaDB will run the following algorithm to respect the ``tablets_per_shard_goal``
config option:

View File

@@ -1075,9 +1075,9 @@ The following tombstone gc modes are available:
* - Mode
- Description
* - ``timeout``
- Tombstone GC is performed after the wait time specified with ``gc_grace_seconds``.
- Tombstone GC is performed after the wait time specified with ``gc_grace_seconds`` (default).
* - ``repair``
- Tombstone GC is performed after repair is run (default).
- Tombstone GC is performed after repair is run.
* - ``disabled``
- Tombstone GC is never performed. This mode may be useful when loading data to the database, to avoid tombstone GC when part of the data is not yet available.
* - ``immediate``
@@ -1085,9 +1085,6 @@ The following tombstone gc modes are available:
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
The default tombstone-gc mode is ``repair`` for all tables that use ``NetworkTopologyStrategy`` or ``SimpleStrategy``, except for :term:`Colocated Tables <Colocated Table>`.
Tables which have a single replica (RF=1) don't need repair (and cannot be repaired either). For such tables, tombstone-gc mode ``repair`` acts the same as ``immediate`` mode would: all tombstones are immediately collectible.
.. _cql-per-table-tablet-options:
Per-table tablet options
@@ -1131,13 +1128,6 @@ if its data size, or performance requirements are known in advance.
function of the tablet count, the replication factor in the datacenter, and the number
of nodes and shards in the datacenter. It is recommended to use higher-level options
such as ``expected_data_size_in_gb`` or ``min_per_shard_tablet_count`` instead.
``max_tablet_count`` 0 Sets the maximum number of tablets for the table. When set, the tablet count
will not exceed this value, even if the table grows large enough to normally
trigger tablet splits. This option is mainly intended for use during restore
operations, to ensure that each SSTable fits entirely within a single tablet.
This enables efficient file-based streaming during restore. Setting both
``min_tablet_count`` and ``max_tablet_count`` to the same value fixes the
tablet count for the table.
=============================== =============== ===================================================================================
When allocating tablets for a new table, ScyllaDB uses the maximum of the ``initial`` tablets configured for the keyspace

View File

@@ -147,7 +147,7 @@ CREATE TABLE ks.t_scylla_cdc_log (
AND comment = 'CDC log for ks.t'
AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '60', 'compaction_window_unit': 'MINUTES', 'expired_sstable_check_frequency_seconds': '1800'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND tablets = {'expected_data_size_in_gb': '250', 'min_per_shard_tablet_count': '0.8', 'min_tablet_count': '1', 'max_tablet_count': '8'}
AND tablets = {'expected_data_size_in_gb': '250', 'min_per_shard_tablet_count': '0.8', 'min_tablet_count': '1'}
AND crc_check_chance = 1
AND default_time_to_live = 0
AND gc_grace_seconds = 0

View File

@@ -1,69 +0,0 @@
# Introduction
This document describes the implementation details and design choices for the
strongly-consistent tables feature
The feature is heavily based on the existing implementation of Raft in scylla, which
is described in [docs/dev/raft-in-scylla.md](raft-in-scylla.md).
# Raft metadata persistence
## Group0 persistence context
The Raft groups for strongly consistent tables differ from Raft group0 particularly
in the extend of where their Raft group members can be located. For group0, all
group members (Raft servers) are on shard 0. For groups for strongly consistent tablets,
the group members may be located on any shard. In the future, they will even be able
to move alongside their corresponding tablets.
That's why, when adding the Raft metadata persistence layer for strongly consistent tables,
we can't reuse the existing approach for group 0. Group0's persistence stores all Raft state
on shard 0. This approach can't be used for strongly consistent tables, because raft groups
for strongly consistent tables can occupy many different shards and their metadata may be
updated often. Storing all data on a single shard would at the same time make this shard
a bottleneck and it would require performing cross-shard operations for most strongly
consistent writes, which would also diminish their performance on its own.
Instead, we want to store the metadata for a Raft group on the same shard where this group's
server is located, avoiding any cross-shard operations and evenly distributing the work
related to writing metadata to all shards.
## Strongly consistent table persistence
We introduce a separate set of Raft system tables for strongly consistent tablets:
- `system.raft_groups`
- `system.raft_groups_snapshots`
- `system.raft_groups_snapshot_config`
These tables mirror the logical contents of the existing `system.raft`, `system.raft_snapshots`,
`system.raft_snapshot_config` tables, but their partition key is a composite `(shard, group_id)`
rather than just `group_id`.
To make “(shard, group_id) belongs to shard X” true at the storage layer, we use:
- a dedicated partitioner (`service::strong_consistency::raft_groups_partitioner`)
which encodes the shard into the token, and
- a dedicated sharder (`service::strong_consistency::raft_groups_sharder`) which extracts
that shard from the token.
As a result, reads and writes for a given groups persistence are routed to the same shard
where the Raft server instance runs.
## Token encoding
The partitioner encodes the destination shard in the tokens high bits:
- token layout: `[shard: 16 bits][group_id_hash: 48 bits]`
- the shard value is constrained to fit the `smallint` column used in the schema.
it also needs to be non-negative, so it's effectively limited to range `[0, 32767]`
- the lower 48 bits are derived by hashing the `group_id` (timeuuid)
The key property is that shard extraction is a pure bit operation and does not depend on
the clusters shard count.
## No direct migration support
`raft_groups_sharder::shard_for_writes()` returns up to one shard - it does not support
migrations using double writes. Instead, for a given Raft group, when a tablet is migrated,
the Raft metadata needs to be erased from the former location and added in the new location.

File diff suppressed because it is too large Load Diff

View File

@@ -15,35 +15,39 @@ Ensure your platform is supported by the ScyllaDB version you want to install.
See `OS Support <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
for information about supported Linux distributions and versions.
Note that if you're on CentOS 7, only root offline installation is supported.
Download and Install
-----------------------
#. Download the latest tar.gz file for ScyllaDB version (x86 or ARM) from ``https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-<version>/``.
**Example** for version 2025.1:
- Go to https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-2025.1/
- Download the ``scylla-unified`` file for the patch version you want to
install. For example, to install 2025.1.9 (x86), download
``scylla-unified-2025.1.9-0.20251010.6c539463bbda.x86_64.tar.gz``.
Example for version 6.1: https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-6.1/
#. Uncompress the downloaded package.
**Example** for version 2025.1.9 (x86) (downloaded in the previous step):
The following example shows the package for ScyllaDB 6.1.1 (x86):
.. code::
.. code:: console
tar xvfz scylla-unified-2025.1.9-0.20251010.6c539463bbda.x86_64.tar.gz
tar xvfz scylla-unified-6.1.1-0.20240814.8d90b817660a.x86_64.tar.gz
#. (Root offline installation only) For root offline installation on Debian-like
systems, two additional packages, ``xfsprogs`` and ``mdadm``, should be
installed to be used in RAID setup.
#. Install OpenJDK 8 or 11.
The following example shows Java installation on a CentOS-like system:
.. code:: console
sudo yum install -y java-11-openjdk-headless
For root offline installation on Debian-like systems, two additional packages, ``xfsprogs``
and ``mdadm``, should be installed to be used in RAID setup.
#. Install ScyllaDB as a user with non-root privileges:
.. code:: console
./install.sh --nonroot
./install.sh --nonroot --python3 ~/scylladb/python3/bin/python3
Configure and Run ScyllaDB
----------------------------
@@ -73,14 +77,19 @@ Run nodetool:
.. code:: console
~/scylladb/bin/nodetool nodetool status
~/scylladb/share/cassandra/bin/nodetool status
Run cqlsh:
.. code:: console
~/scylladb/bin/cqlsh
~/scylladb/share/cassandra/bin/cqlsh
Run cassandra-stress:
.. code:: console
~/scylladb/share/cassandra/bin/cassandra-stress write
.. note::
@@ -111,7 +120,7 @@ Nonroot install
./install.sh --upgrade --nonroot
.. note:: The installation script does not upgrade scylla-tools. You will have to upgrade them separately.
.. note:: The installation script does not upgrade scylla-jmx and scylla-tools. You will have to upgrade them separately.
Uninstall
===========
@@ -141,4 +150,4 @@ Next Steps
* Manage your clusters with `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_
* Monitor your cluster and data with `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_
* Get familiar with ScyllaDBs :doc:`command line reference guide </operating-scylla/nodetool>`.
* Learn about ScyllaDB at `ScyllaDB University <https://university.scylladb.com/>`_
* Learn about ScyllaDB at `ScyllaDB University <https://university.scylladb.com/>`_

View File

@@ -19,15 +19,21 @@ Procedure
authenticator: PasswordAuthenticator
#. Restart ScyllaDB.
#. Restart ScyllaDB.
.. include:: /rst_include/scylla-commands-restart-index.rst
#. Start cqlsh over the maintenance socket and create a new superuser. See :ref:`Setting Up a Superuser Using the Maintenance Socket <create-superuser-using-maintenance-socket>` for instructions.
#. Start cqlsh with the default superuser username and password.
.. code-block:: cql
cqlsh <maintenance_socket_path>
cqlsh -u cassandra -p cassandra
.. note::
Before proceeding to the next step, we recommend creating a custom superuser
to improve security.
See :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>` for instructions.
#. If you want to create users and roles, continue to :doc:`Enable Authorization </operating-scylla/security/enable-authorization>`.

View File

@@ -1,100 +1,111 @@
================================
Creating a Superuser
Creating a Custom Superuser
================================
There is no default superuser role in ScyllaDB.
Users with a superuser role have full access to the database and can run
The default ScyllaDB superuser role is ``cassandra`` with password ``cassandra``.
Users with the ``cassandra`` role have full access to the database and can run
any CQL command on the database resources.
There are two ways you can create a superuser in ScyllaDB:
To improve security, we recommend creating a custom superuser. You should:
- :ref:`Using the ScyllaDB Maintenance Socket to create a superuser role <create-superuser-using-maintenance-socket>`
- :ref:`Using an existing superuser account to create a new superuser role <create-superuser-using-existing-superuser>`
#. Use the default ``cassandra`` superuser to log in.
#. Create a custom superuser.
#. Log in as the custom superuser.
#. Remove the ``cassandra`` role.
When setting up a new cluster, use the maintenance socket approach to create the first superuser.
In the above procedure, you only need to use the ``cassandra`` superuser once, during
the initial RBAC set up.
To completely eliminate the need to use ``cassandra``, you can :ref:`configure the initial
custom superuser in the scylla.yaml configuration file <create-superuser-in-config-file>`.
.. _create-superuser-procedure:
.. _create-superuser-using-maintenance-socket:
Procedure
-----------
Setting Up a Superuser Using the Maintenance Socket
#. Start cqlsh with the default superuser settings:
.. code::
cqlsh -u cassandra -p cassandra
#. Create a new superuser:
.. code::
CREATE ROLE <custom_superuser name> WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '<custom_superuser_password>';
For example:
.. code::
:class: hide-copy-button
CREATE ROLE dba WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '39fksah!';
.. warning::
You must set a PASSWORD when creating a role with LOGIN privileges.
Otherwise, you will not be able to log in to the database using that role.
#. Exit cqlsh:
.. code::
EXIT;
#. Log in as the new superuser:
.. code::
cqlsh -u <custom_superuser name> -p <custom_superuser_password>
For example:
.. code::
:class: hide-copy-button
cqlsh -u dba -p 39fksah!
#. Show all the roles to verify that the new superuser was created:
.. code::
LIST ROLES;
#. Remove the cassandra superuser:
.. code::
DROP ROLE cassandra;
#. Show all the roles to verify that the cassandra role was deleted:
.. code::
LIST ROLES;
.. _create-superuser-in-config-file:
Setting Custom Superuser Credentials in scylla.yaml
------------------------------------------------------
If no superuser account exists in the cluster, which is the case for new clusters, you can create a superuser using the ScyllaDB Maintenance Socket.
In order to do that, the node must have the maintenance socket enabled.
See :doc:`Admin Tools: Maintenance Socket </operating-scylla/admin-tools/maintenance-socket/>`.
Operating ScyllaDB using the default superuser ``cassandra`` with password ``cassandra``
is insecure and impacts performance. For this reason, the default should be used only once -
to create a custom superuser role, following the CQL :ref:`procedure <create-superuser-procedure>` above.
To create a superuser using the maintenance socket, you should:
To avoid executing with the default credentials for the period before you can make
the CQL modifications, you can configure the custom superuser name and password
in the ``scylla.yaml`` configuration file:
1. Connect to the node using ``cqlsh`` over the maintenance socket.
.. code-block:: yaml
auth_superuser_name: <superuser name>
auth_superuser_salted_password: <superuser salted password as processed by mkpassword or similar - cleartext is not allowed>
.. code-block:: shell
.. caution::
cqlsh <maintenance_socket_path>
The superuser credentials in the ``scylla.yaml`` file will be ignored:
Replace ``<maintenance_socket_path>`` with the socket path configured in ``scylla.yaml``.
2. Create new superuser role using ``CREATE ROLE`` command.
.. code-block:: cql
CREATE ROLE <new_superuser> WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '<new_superuser_password>';
3. Verify that you can log in to your node using ``cqlsh`` command with the new password.
.. code-block:: shell
cqlsh -u <new_superuser>
Password:
.. note::
Enter the value of `<new_superuser_password>` password when prompted. The input is not displayed.
4. Show all the roles to verify that the new superuser was created:
.. code-block:: cql
LIST ROLES;
.. _create-superuser-using-existing-superuser:
Setting Up a Superuser Using an Existing Superuser Account
-------------------------------------------------------------
To create a superuser using an existing superuser account, you should:
1. Log in to cqlsh using an existing superuser account.
.. code-block:: shell
cqlsh -u <existing_superuser>
Password:
.. note::
Enter the value of `<existing_superuser_password>` password when prompted. The input is not displayed.
2. Create a new superuser.
.. code-block:: cql
CREATE ROLE <new_superuser> WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '<new_superuser_password>';
3. Verify that you can log in to your node using ``cqlsh`` command with the new password.
.. code-block:: shell
cqlsh -u <new_superuser>
Password:
.. note::
Enter the value of `<new_superuser_password>` password when prompted. The input is not displayed.
4. Show all the roles to verify that the new superuser was created:
.. code-block:: cql
LIST ROLES;
* If any superuser other than ``cassandra`` is already defined in the cluster.
* After you create a custom superuser with the CQL :ref:`procedure <create-superuser-procedure>`.

View File

@@ -28,6 +28,7 @@ The following assumes that Authentication has already been enabled via the proce
2. Set your credentials as the `superuser`_
3. Login to cqlsh as the superuser and set `roles`_ and privileges for your users
4. Confirm users can `access`_ the client with their new credentials.
5. `Remove`_ Cassandra default user / passwords
.. _authorizer:
@@ -50,11 +51,19 @@ It is highly recommended to perform this action on a node that is not processing
.. _superuser:
Create a Superuser
Set a Superuser
.........................
There is no default superuser in ScyllaDB. You should create a superuser before creating additional roles.
See :doc:`Creating a Superuser </operating-scylla/security/create-superuser/>` for instructions.
The default ScyllaDB superuser role is ``cassandra`` with password ``cassandra``. Using the default
superuser is unsafe and may significantly impact performance.
If you haven't created a custom superuser while enabling authentication, you should create a custom superuser
before creating additional roles.
See :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>` for instructions.
.. note::
We recommend creating a custom superuser to improve security.
.. _roles:
@@ -71,12 +80,7 @@ Validate you have the credentials for the superuser for your system for yourself
.. code-block:: none
cqlsh -u dba
Password:
.. note::
Enter the password when prompted. The input is not displayed.
cqlsh -u dba -p 39fksah!
2. Configure the appropriate access privileges for clients using :ref:`GRANT PERMISSION <grant-permission-statement>` statements. For additional examples, consult the :doc:`RBAC example </operating-scylla/security/rbac-usecase>`.
@@ -106,8 +110,30 @@ The following should be noted:
* When initiating a connection, clients will need to use the user name and password that you assign
* Confirm all clients can connect before removing the Cassandra default password and user.
2. To remove permission from any role or user, see :ref:`REVOKE PERMISSION <revoke-permission-statement>`.
.. _remove:
Remove Cassandra Default Password and User
..........................................
To prevent others from entering with the old superuser password, you can and should delete it.
.. code-block:: cql
DROP ROLE [ IF EXISTS ] 'old-username';
For example
.. code-block:: cql
DROP ROLE [ IF EXISTS ] 'cassandra';
Additional References
---------------------

View File

@@ -38,7 +38,7 @@ Security
* :doc:`Enable Authentication </operating-scylla/security/authentication/>`
* :doc:`Enable and Disable Authentication Without Downtime </operating-scylla/security/runtime-authentication/>`
* :doc:`Creating a Superuser </operating-scylla/security/create-superuser/>`
* :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>`
* :doc:`Generate a cqlshrc File <gen-cqlsh-file>`
* :doc:`Enable Authorization</operating-scylla/security/enable-authorization/>`
* :doc:`Role Based Access Control (RBAC) </operating-scylla/security/rbac-usecase/>`

View File

@@ -25,19 +25,20 @@ Procedure
.. include:: /rst_include/scylla-commands-restart-index.rst
#. Login over the maintenance socket and create an authenticated user with strong password.
#. Login with the default superuser credentials and create an authenticated user with strong password.
See :ref:`Setting Up a Superuser Using the Maintenance Socket <create-superuser-using-maintenance-socket>` for instructions.
For example:
.. code-block:: cql
cqlsh /path/to/maintenance/socket/cql.m
cqlsh -ucassandra -pcassandra
cassandra@cqlsh> CREATE ROLE scylla WITH PASSWORD = '123456' AND LOGIN = true AND SUPERUSER = true;
cassandra@cqlsh> LIST ROLES;
name |super
----------+-------
cassandra |True
scylla |True
Optionally, assign the role to your user. For example:
@@ -46,6 +47,20 @@ Procedure
cassandra@cqlsh> GRANT scylla TO myuser
#. Login with the new user created and drop the superuser cassandra.
.. code-block:: cql
cqlsh -u scylla -p 123456
scylla@cqlsh> DROP ROLE cassandra;
scylla@cqlsh> LIST ROLES;
name |super
----------+-------
scylla |True
#. Update the ``authenticator`` parameter in ``scylla.yaml`` for all the nodes in the cluster: Change ``authenticator: com.scylladb.auth.TransitionalAuthenticator`` to ``authenticator: PasswordAuthenticator``.
.. code-block:: yaml

View File

@@ -1,59 +1,37 @@
Reset Authenticator Password
============================
This procedure describes what to do when a user loses their password and can not reset it with a superuser role.
If a node has maintenance socket available, there is no node or cluster downtime required.
If a node does not have maintenance socket available, node has to be restarted with maintenance socket enabled, but no cluster downtime is required.
Prerequisites
.............
Enable maintenance socket on the node. If already done, skip to the ``Procedure`` section.
To check if the maintenance socket is enabled, see :doc:`Admin Tools: Maintenance Socket </operating-scylla/admin-tools/maintenance-socket/>` for details.
1. Stop the ScyllaDB node.
.. code-block:: shell
sudo systemctl stop scylla-server
2. Edit ``/etc/scylla/scylla.yaml`` file and configure the maintenance socket.
See :doc:`Admin Tools: Maintenance Socket </operating-scylla/admin-tools/maintenance-socket/>` for details.
3. Start the ScyllaDB node.
.. code-block:: shell
sudo systemctl start scylla-server
This procedure describes what to do when a user loses his password and can not reset it with a superuser role.
The procedure requires cluster downtime and as a result, all auth data is deleted.
Procedure
.........
1. Connect to the node using ``cqlsh`` over the maintenance socket.
| 1. Stop ScyllaDB nodes (**Stop all the nodes in the cluster**).
.. code-block:: shell
.. code-block:: shell
cqlsh <maintenance_socket_path>
sudo systemctl stop scylla-server
Replace ``<maintenance_socket_path>`` with the socket path configured in ``scylla.yaml``.
| 2. Remove system tables starting with ``role`` prefix from ``/var/lib/scylla/data/system`` directory.
2. Reset the password for the user using ``ALTER ROLE`` command.
.. code-block:: shell
rm -rf /var/lib/scylla/data/system/role*
| 3. Start ScyllaDB nodes.
.. code-block:: shell
sudo systemctl start scylla-server
| 4. Verify that you can log in to your node using ``cqlsh`` command.
| The access is only possible using ScyllaDB superuser.
.. code-block:: cql
cqlsh -u cassandra -p cassandra
ALTER ROLE username WITH PASSWORD '<new_password>';
3. Verify that you can log in to your node using ``cqlsh`` command with the new password.
.. code-block:: shell
cqlsh -u username
Password:
.. note::
Enter the value of `<new_password>` password when prompted. The input is not displayed.
| 5. Recreate the users
.. include:: /troubleshooting/_common/ts-return.rst

View File

@@ -154,7 +154,6 @@ fedora_packages=(
podman
buildah
slirp4netns
# for cassandra-stress
java-openjdk-headless

View File

@@ -327,6 +327,9 @@ future<tablet_map> network_topology_strategy::reallocate_tablets(schema_ptr s, t
auto tinfo = tablets.get_tablet_info(tb);
tinfo.replicas = co_await reallocate_tablets(s, tm, load, tablets, tb);
if (tablets.has_raft_info()) {
for (auto& r: tinfo.replicas) {
r.shard = 0;
}
if (!tablets.get_tablet_raft_info(tb).group_id) {
tablets.set_tablet_raft_info(tb, tablet_raft_info {
.group_id = raft::group_id{utils::UUID_gen::get_time_UUID()}

47
main.cc
View File

@@ -19,7 +19,6 @@
#include "gms/inet_address.hh"
#include "auth/allow_all_authenticator.hh"
#include "auth/allow_all_authorizer.hh"
#include "auth/maintenance_socket_authenticator.hh"
#include "auth/maintenance_socket_role_manager.hh"
#include <seastar/core/future.hh>
#include <seastar/core/signal.hh>
@@ -1831,7 +1830,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
checkpoint(stop_signal, "initializing strongly consistent groups manager");
sharded<service::strong_consistency::groups_manager> groups_manager;
groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp),
std::ref(db), std::ref(mm), std::ref(sys_ks), std::ref(feature_service)).get();
std::ref(db), std::ref(feature_service)).get();
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] {
groups_manager.stop().get();
});
@@ -2094,15 +2093,17 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
if (cfg->maintenance_socket() != "ignore") {
checkpoint(stop_signal, "starting maintenance auth service");
maintenance_auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier),
auth::make_authorizer_factory(auth::allow_all_authorizer_name, qp, group0_client, mm),
auth::make_maintenance_socket_authenticator_factory(qp, group0_client, mm, auth_cache),
auth::make_maintenance_socket_role_manager_factory(qp, group0_client, mm, auth_cache),
maintenance_socket_enabled::yes, std::ref(auth_cache)).get();
auth::service_config maintenance_auth_config;
maintenance_auth_config.authorizer_java_name = sstring{auth::allow_all_authorizer_name};
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
maintenance_auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache)).get();
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
start_auth_service(maintenance_auth_service, stop_maintenance_auth_service, "maintenance auth service");
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
}
checkpoint(stop_signal, "starting REST API");
@@ -2132,14 +2133,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
if (cfg->maintenance_mode()) {
checkpoint(stop_signal, "entering maintenance mode");
// Notify maintenance auth service that maintenance mode is starting
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
auth::set_maintenance_mode(svc);
return make_ready_future<>();
}).get();
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
ss.local().start_maintenance_mode().get();
seastar::set_abort_on_ebadf(cfg->abort_on_ebadf());
@@ -2267,15 +2260,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
stop_signal.ready(false);
if (cfg->maintenance_socket() != "ignore") {
// Enable role operations now that node joined the cluster
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
return auth::ensure_role_operations_are_enabled(svc);
}).get();
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
}
// At this point, `locator::topology` should be stable, i.e. we should have complete information
// about the layout of the cluster (= list of nodes along with the racks/DCs).
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
@@ -2364,6 +2348,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
vb.init_virtual_table();
}).get();
const qualified_name qualified_authorizer_name(auth::meta::AUTH_PACKAGE_NAME, cfg->authorizer());
const qualified_name qualified_authenticator_name(auth::meta::AUTH_PACKAGE_NAME, cfg->authenticator());
const qualified_name qualified_role_manager_name(auth::meta::AUTH_PACKAGE_NAME, cfg->role_manager());
// Reproducer of scylladb/scylladb#24792.
auto i24792_reproducer = defer([] {
if (utils::get_local_injector().enter("reload_service_level_cache_after_auth_service_is_stopped")) {
@@ -2372,11 +2360,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
});
checkpoint(stop_signal, "starting auth service");
auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier),
auth::make_authorizer_factory(cfg->authorizer(), qp, group0_client, mm),
auth::make_authenticator_factory(cfg->authenticator(), qp, group0_client, mm, auth_cache),
auth::make_role_manager_factory(cfg->role_manager(), qp, group0_client, mm, auth_cache),
maintenance_socket_enabled::no, std::ref(auth_cache)).get();
auth::service_config auth_config;
auth_config.authorizer_java_name = qualified_authorizer_name;
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache)).get();
std::any stop_auth_service;
// Has to be called after node joined the cluster (join_cluster())

View File

@@ -180,7 +180,7 @@ mutation mutation::sliced(const query::clustering_row_ranges& ranges) const {
mutation mutation::compacted() const {
auto m = *this;
m.partition().compact_for_compaction(*schema(), always_gc, m.decorated_key(), gc_clock::time_point::min(), tombstone_gc_state::no_gc());
m.partition().compact_for_compaction(*schema(), always_gc, m.decorated_key(), gc_clock::time_point::min(), tombstone_gc_state(nullptr));
return m;
}

View File

@@ -1344,7 +1344,7 @@ mutation_partition::compact_for_query(
bool drop_tombstones_unconditionally = false;
// Replicas should only send non-purgeable tombstones already,
// so we can expect to not have to actually purge any tombstones here.
return do_compact(s, dk, query_time, row_ranges, always_return_static_content, row_limit, always_gc, drop_tombstones_unconditionally, tombstone_gc_state::no_gc());
return do_compact(s, dk, query_time, row_ranges, always_return_static_content, row_limit, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr));
}
void mutation_partition::compact_for_compaction(const schema& s,
@@ -1368,7 +1368,7 @@ void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(
};
bool drop_tombstones_unconditionally = true;
auto compaction_time = gc_clock::time_point::max();
do_compact(s, dk, compaction_time, all_rows, true, query::partition_max_rows, always_gc, drop_tombstones_unconditionally, tombstone_gc_state::gc_all());
do_compact(s, dk, compaction_time, all_rows, true, query::partition_max_rows, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr));
}
// Returns true if the mutation_partition represents no writes.
@@ -2157,7 +2157,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
// This result was already built with a limit, don't apply another one.
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
auto consumer = compact_for_query<query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
max_partitions, tombstone_gc_state::no_gc(), query_result_builder(*s, builder));
max_partitions, tombstone_gc_state(nullptr), query_result_builder(*s, builder));
auto compaction_state = consumer.get_state();
frozen_mutation_consumer_adaptor adaptor(s, consumer);
for (const partition& p : r.partitions()) {
@@ -2176,7 +2176,7 @@ query::result
query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) {
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
auto consumer = compact_for_query<query_result_builder>(*m.schema(), now, slice, row_limit,
query::max_partitions, tombstone_gc_state::no_gc(), query_result_builder(*m.schema(), builder));
query::max_partitions, tombstone_gc_state(nullptr), query_result_builder(*m.schema(), builder));
auto compaction_state = consumer.get_state();
std::move(m).consume(consumer, consume_in_reverse::no);
return builder.build(compaction_state->current_full_position());
@@ -2405,7 +2405,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
auto r_a_r = std::make_unique<range_and_reader>(s, source, std::move(permit), dk, slice, std::move(trace_ptr));
auto cwqrb = counter_write_query_result_builder(*s);
auto cfq = compact_for_query<counter_write_query_result_builder>(
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, tombstone_gc_state::no_gc(), std::move(cwqrb));
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, tombstone_gc_state(nullptr), std::move(cwqrb));
auto f = r_a_r->reader.consume(std::move(cfq));
return f.finally([r_a_r = std::move(r_a_r)] {
return r_a_r->reader.close();

View File

@@ -8,7 +8,6 @@
#include "fsm.hh"
#include <random>
#include <seastar/core/coroutine.hh>
#include "raft/raft.hh"
#include "utils/assert.hh"
#include "utils/error_injection.hh"
@@ -206,11 +205,6 @@ void fsm::become_follower(server_id leader) {
}
void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
if (utils::get_local_injector().enter("avoid_being_raft_leader")) {
become_follower(server_id{});
return;
}
if (!std::holds_alternative<candidate>(_state)) {
_output.state_changed = true;
}
@@ -1114,14 +1108,6 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
return std::make_pair(id, _commit_idx);
}
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
// read_idx from the leader might not be replicated to the local node yet.
const bool in_local_log = read_idx <= _log.last_idx();
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
advance_commit_idx(read_idx);
}
}
void fsm::stop() {
if (is_leader()) {
// Become follower to stop accepting requests

View File

@@ -480,15 +480,6 @@ public:
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
// read index belongs to the current term.
//
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
// Completeness Property). Hence, updating the commit index is safe.
void maybe_update_commit_idx_for_read(index_t read_idx);
size_t in_memory_log_size() const {
return _log.in_memory_size();
}

View File

@@ -167,7 +167,7 @@ public:
// and non matching term is in second
std::pair<bool, term_t> match_term(index_t idx, term_t term) const;
// Return term number of the entry matching the index. If the
// entry is in the truncated part of the log and does not match snapshot index,
// entry is not in the log and does not match snapshot index,
// return an empty optional.
// Used to validate the log matching rule.
std::optional<term_t> term_for(index_t idx) const;

View File

@@ -505,7 +505,7 @@ using add_entry_reply = std::variant<entry_id, transient_error, commit_status_un
// std::monostate {} if the leader cannot execute the barrier because
// it did not commit any entries yet
// raft::not_a_leader if the node is not a leader
// index_t index that is safe to read once applied without breaking linearizability
// index_t index that is safe to read without breaking linearizability
using read_barrier_reply = std::variant<std::monostate, index_t, raft::not_a_leader>;
using rpc_message = std::variant<append_request,

View File

@@ -436,8 +436,6 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
}
future<> server_impl::wait_for_leader(seastar::abort_source* as) {
check_not_aborted();
if (_fsm->current_leader()) {
co_return;
}
@@ -456,8 +454,6 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
}
future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
check_not_aborted();
if (!_state_change_promise) {
_state_change_promise.emplace();
}
@@ -752,8 +748,6 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
}
_stats.add_command++;
check_not_aborted();
logger.trace("[{}] an entry is submitted", id());
if (!_config.enable_forwarding) {
if (const auto leader = _fsm->current_leader(); leader != _id) {
@@ -864,8 +858,6 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
}
future<> server_impl::modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) {
check_not_aborted();
utils::get_local_injector().inject("raft/throw_commit_status_unknown_in_modify_config", [] {
throw raft::commit_status_unknown();
});
@@ -1561,7 +1553,6 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
co_return stop_iteration::no;
}
read_idx = std::get<index_t>(res);
_fsm->maybe_update_commit_idx_for_read(read_idx);
co_return stop_iteration::yes;
});

View File

@@ -77,7 +77,7 @@ public:
// committed locally means simply that the commit index is beyond this entry's index.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// If it passes nullptr, the operation is unabortable.
// It it passes nullptr, the operation is unabortable.
//
// Successful `add_entry` with `wait_type::committed` does not guarantee that `state_machine::apply` will be called
// locally for this entry. Between the commit and the application we may receive a snapshot containing this entry,
@@ -125,7 +125,7 @@ public:
// returned even in case of a successful config change.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// If it passes nullptr, the operation is unabortable.
// It it passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::conf_change_in_progress
@@ -206,7 +206,7 @@ public:
// future has resolved successfully.
//
// The caller may pass a pointer to an abort_source to make the operation abortable.
// If it passes nullptr, the operation is unabortable.
// It it passes nullptr, the operation is unabortable.
//
// Exceptions:
// raft::request_aborted
@@ -251,11 +251,9 @@ public:
// the call as before, but term should be different.
//
// The caller may pass a pointer to an abort_source to make the function abortable.
// If it passes nullptr, the function is unabortable.
// It it passes nullptr, the function is unabortable.
//
// Exceptions:
// raft::stopped_error
// Thrown if abort() was called on the server instance.
// raft::request_aborted
// Thrown if abort is requested before the operation finishes.
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;
@@ -267,11 +265,9 @@ public:
// `raft::server_id`.
//
// The caller may pass a pointer to an abort_source to make the function abortable.
// If it passes nullptr, the function is unabortable.
// It it passes nullptr, the function is unabortable.
//
// Exceptions:
// raft::stopped_error
// Thrown if abort() was called on the server instance.
// raft::request_aborted
// Thrown if abort is requested before the operation finishes.
virtual future<> wait_for_leader(seastar::abort_source* as) = 0;

View File

@@ -1211,7 +1211,6 @@ private:
}
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
rlogger.debug("Disabling compaction for range={} for incremental repair", _range);
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
_rs._repair_compaction_locks[gid].push_back(std::move(lock_holder));
@@ -1241,8 +1240,6 @@ private:
// compaction.
reenablers_and_holders.cres.clear();
rlogger.info("Re-enabled compaction for range={} for incremental repair", _range);
co_await utils::get_local_injector().inject("wait_after_prepare_sstables_for_incremental_repair", utils::wait_for_message(5min));
}
// Read rows from sstable until the size of rows exceeds _max_row_buf_size - current_size
@@ -3956,19 +3953,3 @@ future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_ta
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
co_return progress;
}
void repair_service::on_cleanup_for_drop_table(const table_id& id) {
// Prevent repair lock from being leaked in repair_service when table is dropped midway.
// The RPC verb that removes the lock on success path will not be called by coordinator after table was dropped.
// We also cannot move the lock from repair_service to repair_meta, since the lock must outlive the latter.
// Since tablet metadata has been erased at this point, we can simply erase all instances for the dropped table.
rlogger.debug("Cleaning up state for dropped table {}", id);
for (auto it = _repair_compaction_locks.begin(); it != _repair_compaction_locks.end();) {
auto& [global_tid, _] = *it;
if (global_tid.table == id) {
it = _repair_compaction_locks.erase(it);
} else {
it++;
}
}
}

View File

@@ -318,8 +318,6 @@ public:
future<uint32_t> get_next_repair_meta_id();
void on_cleanup_for_drop_table(const table_id& id);
friend class repair::user_requested_repair_task_impl;
friend class repair::data_sync_repair_task_impl;
friend class repair::tablet_repair_task_impl;

View File

@@ -448,7 +448,6 @@ public:
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) = 0;
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;
virtual future<> wait_for_background_tablet_resize_work() = 0;
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;
};

View File

@@ -2415,7 +2415,9 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t
if (!s->is_synced()) {
on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
}
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{});
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable {
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{});
});
}
keyspace::config

View File

@@ -971,8 +971,6 @@ public:
[[gnu::always_inline]] bool uses_tablets() const;
int64_t calculate_tablet_count() const;
private:
void update_tombstone_gc_rf_one();
future<> clear_inactive_reads_for_tablet(database& db, storage_group& sg);
future<> stop_compaction_groups(storage_group& sg);
future<> flush_compaction_groups(storage_group& sg);
@@ -1358,8 +1356,6 @@ public:
// If leave_unsealead is set, all the destination sstables will be left unsealed.
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed);
tombstone_gc_state get_tombstone_gc_state() const;
friend class compaction_group;
friend class compaction::compaction_task_impl;
@@ -1370,6 +1366,8 @@ public:
future<compaction_reenablers_and_lock_holders> get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
const service::frozen_topology_guard& guard, dht::token_range range);
future<uint64_t> estimated_partitions_in_range(dht::token_range tr) const;
private:
future<std::vector<compaction::compaction_group_view*>> get_compaction_group_views_for_repair(dht::token_range range);
};
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&);

View File

@@ -778,7 +778,7 @@ future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> do_query
auto erm = table.get_effective_replication_map();
auto ctx = seastar::make_shared<read_context>(db, s, erm, cmd, ranges, trace_state, timeout);
tombstone_gc_state gc_state = tombstone_gc_enabled ? table.get_tombstone_gc_state() : tombstone_gc_state::no_gc();
tombstone_gc_state gc_state = tombstone_gc_enabled ? table.get_compaction_manager().get_tombstone_gc_state() : tombstone_gc_state::no_gc();
// Use nested coroutine so each step can fail without exceptions using try_future.
auto f = co_await coroutine::as_future(std::invoke([&] -> future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> {

View File

@@ -609,7 +609,7 @@ future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
auto accounter = co_await db.local().get_result_memory_limiter().new_data_read(permit.max_result_size(), short_read_allowed);
query_state qs(output_schema, cmd, opts, prs, std::move(accounter));
auto compaction_state = make_lw_shared<compact_for_query_state>(*output_schema, qs.cmd.timestamp, qs.cmd.slice, qs.remaining_rows(), qs.remaining_partitions(), tombstone_gc_state::no_gc());
auto compaction_state = make_lw_shared<compact_for_query_state>(*output_schema, qs.cmd.timestamp, qs.cmd.slice, qs.remaining_rows(), qs.remaining_partitions(), tombstone_gc_state(nullptr));
auto partition_key_generator = make_partition_key_generator(db, underlying_schema, prs, ts, timeout);
auto dk_opt = co_await partition_key_generator();

View File

@@ -257,7 +257,7 @@ table::make_mutation_reader(schema_ptr s,
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
if (cache_enabled() && !bypass_cache) {
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, get_tombstone_gc_state(),
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(),
get_max_purgeable_fn_for_cache_underlying_reader(), std::move(trace_state), fwd, fwd_mr)) {
readers.emplace_back(std::move(*reader_opt));
}
@@ -286,7 +286,7 @@ sstables::shared_sstable table::make_streaming_staging_sstable() {
return newtab;
}
static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, tombstone_gc_state gc_state,
static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, const compaction::compaction_manager& cm,
gc_clock::time_point compaction_time, bool compaction_enabled, bool compaction_can_gc) {
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_enabled", fmt::to_string(compaction_enabled));
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_can_gc", fmt::to_string(compaction_can_gc));
@@ -297,7 +297,7 @@ static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, t
std::move(underlying),
compaction_time,
compaction_can_gc ? can_always_purge : can_never_purge,
gc_state,
cm.get_tombstone_gc_state(),
streamed_mutation::forwarding::no);
}
@@ -320,7 +320,7 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
return maybe_compact_for_streaming(
make_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, nullptr, mutation_reader::forwarding::no),
get_tombstone_gc_state(),
get_compaction_manager(),
compaction_time,
_config.enable_compacting_data_for_streaming_and_repair(),
_config.enable_tombstone_gc_for_streaming_and_repair());
@@ -339,7 +339,7 @@ mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit pe
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
return maybe_compact_for_streaming(
make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr),
get_tombstone_gc_state(),
get_compaction_manager(),
compaction_time,
_config.enable_compacting_data_for_streaming_and_repair(),
_config.enable_tombstone_gc_for_streaming_and_repair());
@@ -354,7 +354,7 @@ mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit pe
return maybe_compact_for_streaming(
sstables->make_range_sstable_reader(std::move(schema), std::move(permit), range, slice,
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), sstables::integrity_check::yes),
get_tombstone_gc_state(),
get_compaction_manager(),
compaction_time,
_config.enable_compacting_data_for_streaming_and_repair(),
_config.enable_tombstone_gc_for_streaming_and_repair());
@@ -750,7 +750,6 @@ public:
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
}
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
future<> wait_for_background_tablet_resize_work() override { return make_ready_future<>(); }
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
return get_compaction_group().make_sstable_set();
@@ -769,13 +768,6 @@ class tablet_storage_group_manager final : public storage_group_manager {
locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
future<> _merge_completion_fiber;
condition_variable _merge_completion_event;
// Ensures that processes such as incremental repair will wait for pending work from
// merge fiber before proceeding. This guarantees stability on the compaction groups.
// NOTE: it's important that we don't await on the barrier with any compaction group
// gate held, since merge fiber will stop groups that in turn await on gate,
// potentially causing an ABBA deadlock.
utils::phased_barrier _merge_fiber_barrier;
std::optional<utils::phased_barrier::operation> _pending_merge_fiber_work;
// Holds compaction reenabler which disables compaction temporarily during tablet merge
std::vector<compaction::compaction_reenabler> _compaction_reenablers_for_merging;
private:
@@ -864,7 +856,6 @@ public:
, _my_host_id(erm.get_token_metadata().get_my_id())
, _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id()))
, _merge_completion_fiber(merge_completion_fiber())
, _merge_fiber_barrier(format("[table {}.{}] merge_fiber_barrier", _t.schema()->ks_name(), _t.schema()->cf_name()))
{
storage_group_map ret;
@@ -917,10 +908,6 @@ public:
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
return tablet_map().get_token_range_after_split(token);
}
future<> wait_for_background_tablet_resize_work() override {
co_await _merge_fiber_barrier.advance_and_await();
co_return;
}
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
@@ -2130,31 +2117,33 @@ compaction_group::update_repaired_at_for_merge() {
});
}
future<std::vector<compaction::compaction_group_view*>> table::get_compaction_group_views_for_repair(dht::token_range range) {
std::vector<compaction::compaction_group_view*> ret;
auto sgs = storage_groups_for_token_range(range);
for (auto& sg : sgs) {
co_await coroutine::maybe_yield();
sg->for_each_compaction_group([&ret] (const compaction_group_ptr& cg) {
ret.push_back(&cg->view_for_unrepaired_data());
});
}
co_return ret;
}
future<compaction_reenablers_and_lock_holders> table::get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
const service::frozen_topology_guard& guard, dht::token_range range) {
auto ret = compaction_reenablers_and_lock_holders();
// Waits for background tablet resize work like merge that might destroy compaction groups,
// providing stability. Essentially, serializes tablet merge completion handling with
// the start of incremental repair, from the replica side.
co_await _sg_manager->wait_for_background_tablet_resize_work();
for (auto sg : storage_groups_for_token_range(range)) {
// FIXME: indentation
auto cgs = sg->compaction_groups_immediate();
for (auto& cg : cgs) {
auto gate_holder = cg->async_gate().hold();
auto& view = cg->view_for_unrepaired_data();
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(view);
auto views = co_await get_compaction_group_views_for_repair(range);
for (auto view : views) {
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(*view);
tlogger.info("Disabled compaction for range={} session_id={} for incremental repair", range, guard);
ret.cres.push_back(std::make_unique<compaction::compaction_reenabler>(std::move(cre)));
// This lock prevents the unrepaired compaction started by major compaction to run in parallel with repair.
// The unrepaired compaction started by minor compaction does not need to take the lock since it ignores
// sstables being repaired, so it can run in parallel with repair.
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(view, "row_level_repair");
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(*view, "row_level_repair");
tlogger.info("Got unrepaired compaction and repair lock for range={} session_id={} for incremental repair", range, guard);
ret.lock_holders.push_back(std::move(lock_holder));
}
}
co_return ret;
}
@@ -2758,8 +2747,8 @@ public:
bool tombstone_gc_enabled() const noexcept override {
return _t.tombstone_gc_enabled() && _cg.tombstone_gc_enabled();
}
tombstone_gc_state get_tombstone_gc_state() const noexcept override {
return _t.get_tombstone_gc_state();
const tombstone_gc_state& get_tombstone_gc_state() const noexcept override {
return _t.get_compaction_manager().get_tombstone_gc_state();
}
compaction::compaction_backlog_tracker& get_backlog_tracker() override {
return _cg.get_backlog_tracker();
@@ -2917,8 +2906,6 @@ table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_optio
recalculate_tablet_count_stats();
set_metrics();
update_tombstone_gc_rf_one();
}
void table::on_flush_timer() {
@@ -3028,7 +3015,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
while (!_t.async_gate().is_closed()) {
try {
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(5min));
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(60s));
auto ks_name = schema()->ks_name();
auto cf_name = schema()->cf_name();
// Enable compaction after merge is done.
@@ -3062,7 +3049,6 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
utils::get_local_injector().inject("replica_merge_completion_wait", [] () {
tlogger.info("Merge completion fiber finished, about to sleep");
});
_pending_merge_fiber_work.reset();
co_await _merge_completion_event.wait();
tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name());
}
@@ -3121,7 +3107,6 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
new_storage_groups[new_tid] = std::move(new_sg);
}
_storage_groups = std::move(new_storage_groups);
_pending_merge_fiber_work = _merge_fiber_barrier.start();
_merge_completion_event.signal();
}
@@ -3138,9 +3123,6 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
} else if (new_tablet_count < old_tablet_count) {
tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets",
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
if (utils::get_local_injector().is_enabled("tablet_force_tablet_count_decrease_once")) {
utils::get_local_injector().disable("tablet_force_tablet_count_decrease");
}
handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map);
}
@@ -3234,17 +3216,6 @@ void table::update_effective_replication_map(locator::effective_replication_map_
}
recalculate_tablet_count_stats();
update_tombstone_gc_rf_one();
}
void table::update_tombstone_gc_rf_one() {
auto& st = _compaction_manager.get_shared_tombstone_gc_state();
if (_erm && _erm->get_replication_factor() == 1) {
st.set_table_rf_one(_schema->id());
} else {
st.set_table_rf_n(_schema->id());
}
}
void table::recalculate_tablet_count_stats() {
@@ -3306,7 +3277,7 @@ table::sstables_as_snapshot_source() {
std::move(reader),
gc_clock::now(),
get_max_purgeable_fn_for_cache_underlying_reader(),
get_tombstone_gc_state().with_commitlog_check_disabled(),
_compaction_manager.get_tombstone_gc_state().with_commitlog_check_disabled(),
fwd);
}, [this, sst_set] {
return make_partition_presence_checker(sst_set);
@@ -3316,10 +3287,9 @@ table::sstables_as_snapshot_source() {
// define in .cc, since sstable is forward-declared in .hh
table::~table() {
auto& st = _compaction_manager.get_shared_tombstone_gc_state();
st.remove_table_from_rf_registry(_schema->id());
}
logalloc::occupancy_stats table::occupancy() const {
logalloc::occupancy_stats res;
for_each_compaction_group([&] (const compaction_group& cg) {
@@ -4499,7 +4469,7 @@ table::query(schema_ptr query_schema,
if (!querier_opt) {
querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_tombstone_gc_state(), conf);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_compaction_manager().get_tombstone_gc_state(), conf);
}
auto& q = *querier_opt;
@@ -4551,7 +4521,7 @@ table::mutation_query(schema_ptr query_schema,
querier_opt = std::move(*saved_querier);
}
if (!querier_opt) {
auto tombstone_gc_state = tombstone_gc_enabled ? get_tombstone_gc_state() : tombstone_gc_state::no_gc();
auto tombstone_gc_state = tombstone_gc_enabled ? get_compaction_manager().get_tombstone_gc_state() : tombstone_gc_state::no_gc();
querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, tombstone_gc_state, conf);
}
@@ -5127,8 +5097,4 @@ future<uint64_t> table::estimated_partitions_in_range(dht::token_range tr) const
co_return partition_count;
}
tombstone_gc_state table::get_tombstone_gc_state() const {
return tombstone_gc_state(_compaction_manager.get_shared_tombstone_gc_state());
}
} // namespace replica

View File

@@ -25,7 +25,6 @@
#include "dht/token.hh"
#include "mutation/async_utils.hh"
#include "compaction/compaction_manager.hh"
#include "dht/fixed_shard.hh"
namespace replica {
@@ -98,103 +97,6 @@ schema_ptr make_tablets_schema() {
.build();
}
schema_ptr make_raft_schema(sstring name, bool is_group0) {
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
if (!is_group0) {
if (!strongly_consistent_tables_enabled) {
on_internal_error(tablet_logger, "Can't create raft table for strongly consistent tablets when the feature is disabled");
}
builder.with_column("shard", short_type, column_kind::partition_key);
}
builder
.with_column("group_id", timeuuid_type, column_kind::partition_key)
// raft log part
.with_column("index", long_type, column_kind::clustering_key)
.with_column("term", long_type)
.with_column("data", bytes_type) // decltype(raft::log_entry::data) - serialized variant
// persisted term and vote
.with_column("vote_term", long_type, column_kind::static_column)
.with_column("vote", uuid_type, column_kind::static_column)
// id of the most recent persisted snapshot
.with_column("snapshot_id", uuid_type, column_kind::static_column)
.with_column("commit_idx", long_type, column_kind::static_column)
.with_hash_version()
.set_caching_options(caching_options::get_disabled_caching_options());
if (is_group0) {
return builder
.set_comment("Persisted RAFT log, votes and snapshot info")
.build();
} else {
return builder
.set_comment("Persisted RAFT log, votes and snapshot info for strongly consistent tablets")
.with_partitioner(dht::fixed_shard_partitioner::classname)
.with_sharder(dht::fixed_shard_sharder::instance())
.build();
}
}
schema_ptr make_raft_snapshots_schema(sstring name, bool is_group0) {
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
if (!is_group0) {
if (!strongly_consistent_tables_enabled) {
on_internal_error(tablet_logger, "Can't create raft snapshots table for strongly consistent tablets when the feature is disabled");
}
builder.with_column("shard", short_type, column_kind::partition_key);
}
builder
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("snapshot_id", uuid_type)
// Index and term of last entry in the snapshot
.with_column("idx", long_type)
.with_column("term", long_type)
.with_hash_version();
if (is_group0) {
return builder
.set_comment("Persisted RAFT snapshots for strongly consistent tablets")
.build();
} else {
return builder
.set_comment("Persisted RAFT snapshot descriptors info for strongly consistent tablets")
.with_partitioner(dht::fixed_shard_partitioner::classname)
.with_sharder(dht::fixed_shard_sharder::instance())
.build();
}
}
schema_ptr make_raft_snapshot_config_schema(sstring name, bool is_group0) {
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
if (!is_group0) {
if (!strongly_consistent_tables_enabled) {
on_internal_error(tablet_logger, "Can't create raft snapshot config table for strongly consistent tablets when the feature is disabled");
}
builder.with_column("shard", short_type, column_kind::partition_key);
}
builder
.with_column("group_id", timeuuid_type, column_kind::partition_key)
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
.with_column("server_id", uuid_type, column_kind::clustering_key)
.with_column("can_vote", boolean_type)
.with_hash_version();
if (is_group0) {
return builder
.set_comment("RAFT configuration for the latest snapshot descriptor")
.build();
} else {
return builder
.set_comment("RAFT configuration for the snapshot descriptor for strongly consistent tablets")
.with_partitioner(dht::fixed_shard_partitioner::classname)
.with_sharder(dht::fixed_shard_sharder::instance())
.build();
}
}
std::vector<data_value> replicas_to_data_value(const tablet_replica_set& replicas) {
std::vector<data_value> result;
result.reserve(replicas.size());

Some files were not shown because too many files have changed in this diff Show More