Compare commits
151 Commits
copilot/do
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af2d1adcdc | ||
|
|
029712dc56 | ||
|
|
f177259316 | ||
|
|
6bba4f7ca1 | ||
|
|
47e8206482 | ||
|
|
6fb5ab78eb | ||
|
|
46b7170347 | ||
|
|
4c8dba15f1 | ||
|
|
4150c62f29 | ||
|
|
977bdd6260 | ||
|
|
fda68811e8 | ||
|
|
5feed00caa | ||
|
|
f12e4ea42b | ||
|
|
d8cf2c5f23 | ||
|
|
4fdc0a5316 | ||
|
|
d817e56e87 | ||
|
|
5d7a73cc5b | ||
|
|
01498a00d5 | ||
|
|
2be4d8074d | ||
|
|
f9d213547f | ||
|
|
c8c57850d9 | ||
|
|
c3aa4ed23c | ||
|
|
dd75687251 | ||
|
|
52940c4f31 | ||
|
|
ab3d3d8638 | ||
|
|
37bbbd3a27 | ||
|
|
6aef4d3541 | ||
|
|
4795f5840f | ||
|
|
3548b7ad38 | ||
|
|
b0cffb2e81 | ||
|
|
b90fe19a42 | ||
|
|
509f2af8db | ||
|
|
f1978d8a22 | ||
|
|
5a43695f6a | ||
|
|
1ae2ae50a6 | ||
|
|
1cbd0da519 | ||
|
|
33a16940be | ||
|
|
b32ef8ecd5 | ||
|
|
a66565cc42 | ||
|
|
73f1a65203 | ||
|
|
855c503c63 | ||
|
|
d25be9e389 | ||
|
|
7b30a3981b | ||
|
|
7564a56dc8 | ||
|
|
a2669e9983 | ||
|
|
5ae40caa6d | ||
|
|
bb1a798c2c | ||
|
|
c3f59e4fa1 | ||
|
|
aa85f5a9c3 | ||
|
|
8e32d97be6 | ||
|
|
9b2242c752 | ||
|
|
af07718fff | ||
|
|
5e7b966d37 | ||
|
|
25fc8ef14c | ||
|
|
35aab75256 | ||
|
|
2a3476094e | ||
|
|
156c29f962 | ||
|
|
5306e26b83 | ||
|
|
690b2c4142 | ||
|
|
6b3b174704 | ||
|
|
57f1e46204 | ||
|
|
d44fc00c4c | ||
|
|
c200d6ab4f | ||
|
|
9697b6013f | ||
|
|
212bd6ae1a | ||
|
|
221b78cb81 | ||
|
|
527c4141da | ||
|
|
a31cb18324 | ||
|
|
c331796d28 | ||
|
|
4c4673e8f9 | ||
|
|
c7d3f80863 | ||
|
|
85dcbfae9a | ||
|
|
5793e305b5 | ||
|
|
ffbd9a3218 | ||
|
|
c1b0ac141b | ||
|
|
ea17c26fd9 | ||
|
|
e7487c21e4 | ||
|
|
5998a859f7 | ||
|
|
c0e94828de | ||
|
|
038f89ede4 | ||
|
|
ec42fdfd01 | ||
|
|
446539f12f | ||
|
|
85bd6d0114 | ||
|
|
2fb981413a | ||
|
|
38f02b8d76 | ||
|
|
0eb5603ebd | ||
|
|
f156bcddab | ||
|
|
bb359b3b78 | ||
|
|
225b10b683 | ||
|
|
1d8903d9f7 | ||
|
|
06af4480ea | ||
|
|
6e83fb5029 | ||
|
|
afafb8a8fa | ||
|
|
3db74aaf5f | ||
|
|
f74fe22386 | ||
|
|
0e5ddec2a8 | ||
|
|
fd17dcbec8 | ||
|
|
9dc1deccf3 | ||
|
|
45628cf041 | ||
|
|
6a1edab2ac | ||
|
|
d765b5b309 | ||
|
|
b68656b59f | ||
|
|
3bef493a35 | ||
|
|
eab24ff3b0 | ||
|
|
bfff07eacb | ||
|
|
e8e00c874b | ||
|
|
e5218157de | ||
|
|
dc9a90d7cb | ||
|
|
371cdb3c81 | ||
|
|
3606934458 | ||
|
|
7fd083e329 | ||
|
|
b768753c0f | ||
|
|
b8ae9ede63 | ||
|
|
45477d9c6b | ||
|
|
ab6c222fc4 | ||
|
|
4f5310bc72 | ||
|
|
7c2c63ab43 | ||
|
|
074006749c | ||
|
|
d6e2d44759 | ||
|
|
5fd9fc3056 | ||
|
|
a785c0cf41 | ||
|
|
d10e622a3b | ||
|
|
6004e84f18 | ||
|
|
3c34598d88 | ||
|
|
04b001daa6 | ||
|
|
6364e35403 | ||
|
|
f3ee6a0bd1 | ||
|
|
83e20d920e | ||
|
|
041ab593c7 | ||
|
|
5c84a76b28 | ||
|
|
80b74d7df2 | ||
|
|
d95939d69a | ||
|
|
91126eb2fb | ||
|
|
647172d4b8 | ||
|
|
f2308b000f | ||
|
|
5db971c2f9 | ||
|
|
0b4f28ae21 | ||
|
|
af6371c11f | ||
|
|
6c21e5f80c | ||
|
|
d1ff8f1db3 | ||
|
|
97dc88d6b6 | ||
|
|
f841c0522d | ||
|
|
ffe32e8e4d | ||
|
|
16977d7aa0 | ||
|
|
654fe4b1ca | ||
|
|
cb0caea8bf | ||
|
|
e2c4b0a733 | ||
|
|
c1b3fec11a | ||
|
|
c5a44b0f88 | ||
|
|
83e64b516a | ||
|
|
727f1be11c |
@@ -9,6 +9,6 @@ jobs:
|
||||
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
|
||||
with:
|
||||
# Comma-separated list of Jira project keys
|
||||
jira_project_keys: "SCYLLADB,CUSTOMER,SMI"
|
||||
jira_project_keys: "SCYLLADB,CUSTOMER,SMI,RELENG"
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
24
.github/workflows/trigger-scylla-ci.yaml
vendored
24
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -14,14 +14,20 @@ jobs:
|
||||
steps:
|
||||
- name: Verify Org Membership
|
||||
id: verify_author
|
||||
env:
|
||||
EVENT_NAME: ${{ github.event_name }}
|
||||
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
|
||||
PR_ASSOCIATION: ${{ github.event.pull_request.author_association }}
|
||||
COMMENT_AUTHOR: ${{ github.event.comment.user.login }}
|
||||
COMMENT_ASSOCIATION: ${{ github.event.comment.author_association }}
|
||||
shell: bash
|
||||
run: |
|
||||
if [[ "${{ github.event_name }}" == "pull_request_target" ]]; then
|
||||
AUTHOR="${{ github.event.pull_request.user.login }}"
|
||||
ASSOCIATION="${{ github.event.pull_request.author_association }}"
|
||||
if [[ "$EVENT_NAME" == "pull_request_target" ]]; then
|
||||
AUTHOR="$PR_AUTHOR"
|
||||
ASSOCIATION="$PR_ASSOCIATION"
|
||||
else
|
||||
AUTHOR="${{ github.event.comment.user.login }}"
|
||||
ASSOCIATION="${{ github.event.comment.author_association }}"
|
||||
AUTHOR="$COMMENT_AUTHOR"
|
||||
ASSOCIATION="$COMMENT_ASSOCIATION"
|
||||
fi
|
||||
if [[ "$ASSOCIATION" == "MEMBER" || "$ASSOCIATION" == "OWNER" ]]; then
|
||||
echo "member=true" >> $GITHUB_OUTPUT
|
||||
@@ -33,13 +39,11 @@ jobs:
|
||||
- name: Validate Comment Trigger
|
||||
if: github.event_name == 'issue_comment'
|
||||
id: verify_comment
|
||||
env:
|
||||
COMMENT_BODY: ${{ github.event.comment.body }}
|
||||
shell: bash
|
||||
run: |
|
||||
BODY=$(cat << 'EOF'
|
||||
${{ github.event.comment.body }}
|
||||
EOF
|
||||
)
|
||||
CLEAN_BODY=$(echo "$BODY" | grep -v '^[[:space:]]*>')
|
||||
CLEAN_BODY=$(echo "$COMMENT_BODY" | grep -v '^[[:space:]]*>')
|
||||
|
||||
if echo "$CLEAN_BODY" | grep -qi '@scylladbbot' && echo "$CLEAN_BODY" | grep -qi 'trigger-ci'; then
|
||||
echo "trigger=true" >> $GITHUB_OUTPUT
|
||||
|
||||
@@ -25,6 +25,7 @@ target_sources(scylla_auth
|
||||
service.cc
|
||||
standard_role_manager.cc
|
||||
transitional.cc
|
||||
maintenance_socket_authenticator.cc
|
||||
maintenance_socket_role_manager.cc)
|
||||
target_include_directories(scylla_auth
|
||||
PUBLIC
|
||||
|
||||
@@ -9,19 +9,9 @@
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
constexpr std::string_view allow_all_authenticator_name("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<
|
||||
authenticator,
|
||||
allow_all_authenticator,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
}
|
||||
|
||||
@@ -9,18 +9,9 @@
|
||||
#include "auth/allow_all_authorizer.hh"
|
||||
|
||||
#include "auth/common.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
constexpr std::string_view allow_all_authorizer_name("org.apache.cassandra.auth.AllowAllAuthorizer");
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<
|
||||
authorizer,
|
||||
allow_all_authorizer,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthorizer");
|
||||
|
||||
}
|
||||
|
||||
@@ -13,14 +13,11 @@
|
||||
#include <boost/regex.hpp>
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "utils/to_string.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
static const auto CERT_AUTH_NAME = "com.scylladb.auth.CertificateAuthenticator";
|
||||
const std::string_view auth::certificate_authenticator_name(CERT_AUTH_NAME);
|
||||
|
||||
static logging::logger clogger("certificate_authenticator");
|
||||
|
||||
@@ -30,13 +27,6 @@ static const std::string cfg_query_attr = "query";
|
||||
static const std::string cfg_source_subject = "SUBJECT";
|
||||
static const std::string cfg_source_altname = "ALTNAME";
|
||||
|
||||
static const class_registrator<auth::authenticator
|
||||
, auth::certificate_authenticator
|
||||
, cql3::query_processor&
|
||||
, ::service::raft_group0_client&
|
||||
, ::service::migration_manager&
|
||||
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
|
||||
enum class auth::certificate_authenticator::query_source {
|
||||
subject, altname
|
||||
};
|
||||
@@ -99,7 +89,7 @@ future<> auth::certificate_authenticator::stop() {
|
||||
}
|
||||
|
||||
std::string_view auth::certificate_authenticator::qualified_java_name() const {
|
||||
return certificate_authenticator_name;
|
||||
return "com.scylladb.auth.CertificateAuthenticator";
|
||||
}
|
||||
|
||||
bool auth::certificate_authenticator::require_authentication() const {
|
||||
|
||||
@@ -27,8 +27,6 @@ namespace auth {
|
||||
|
||||
class cache;
|
||||
|
||||
extern const std::string_view certificate_authenticator_name;
|
||||
|
||||
class certificate_authenticator : public authenticator {
|
||||
enum class query_source;
|
||||
std::vector<std::pair<query_source, boost::regex>> _queries;
|
||||
|
||||
@@ -26,7 +26,6 @@ extern "C" {
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -40,14 +39,6 @@ static constexpr std::string_view PERMISSIONS_NAME = "permissions";
|
||||
|
||||
static logging::logger alogger("default_authorizer");
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<
|
||||
authorizer,
|
||||
default_authorizer,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.CassandraAuthorizer");
|
||||
|
||||
default_authorizer::default_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
|
||||
: _qp(qp)
|
||||
, _migration_manager(mm) {
|
||||
|
||||
@@ -24,7 +24,6 @@
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "db/config.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
|
||||
@@ -72,20 +71,10 @@ std::vector<sstring> get_attr_values(LDAP* ld, LDAPMessage* res, const char* att
|
||||
return values;
|
||||
}
|
||||
|
||||
const char* ldap_role_manager_full_name = "com.scylladb.auth.LDAPRoleManager";
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
namespace auth {
|
||||
|
||||
static const class_registrator<
|
||||
role_manager,
|
||||
ldap_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> registration(ldap_role_manager_full_name);
|
||||
|
||||
ldap_role_manager::ldap_role_manager(
|
||||
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
|
||||
uint32_t permissions_update_interval_in_ms,
|
||||
@@ -115,7 +104,7 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
|
||||
}
|
||||
|
||||
std::string_view ldap_role_manager::qualified_java_name() const noexcept {
|
||||
return ldap_role_manager_full_name;
|
||||
return "com.scylladb.auth.LDAPRoleManager";
|
||||
}
|
||||
|
||||
const resource_set& ldap_role_manager::protected_resources() const {
|
||||
|
||||
@@ -57,8 +57,7 @@ class ldap_role_manager : public role_manager {
|
||||
cache& cache ///< Passed to standard_role_manager.
|
||||
);
|
||||
|
||||
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
|
||||
/// class_registrator<role_manager>.
|
||||
/// Retrieves LDAP configuration entries from qp and invokes the other constructor.
|
||||
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
|
||||
|
||||
/// Thrown when query-template parsing fails.
|
||||
|
||||
31
auth/maintenance_socket_authenticator.cc
Normal file
31
auth/maintenance_socket_authenticator.cc
Normal file
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include "auth/maintenance_socket_authenticator.hh"
|
||||
|
||||
|
||||
namespace auth {
|
||||
|
||||
maintenance_socket_authenticator::~maintenance_socket_authenticator() {
|
||||
}
|
||||
|
||||
future<> maintenance_socket_authenticator::start() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> maintenance_socket_authenticator::ensure_superuser_is_created() const {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool maintenance_socket_authenticator::require_authentication() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace auth
|
||||
36
auth/maintenance_socket_authenticator.hh
Normal file
36
auth/maintenance_socket_authenticator.hh
Normal file
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/shared_future.hh>
|
||||
|
||||
#include "password_authenticator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
// maintenance_socket_authenticator is used for clients connecting to the
|
||||
// maintenance socket. It does not require authentication,
|
||||
// while still allowing the managing of roles and their credentials.
|
||||
class maintenance_socket_authenticator : public password_authenticator {
|
||||
public:
|
||||
using password_authenticator::password_authenticator;
|
||||
|
||||
virtual ~maintenance_socket_authenticator();
|
||||
|
||||
virtual future<> start() override;
|
||||
|
||||
virtual future<> ensure_superuser_is_created() const override;
|
||||
|
||||
bool require_authentication() const override;
|
||||
};
|
||||
|
||||
} // namespace auth
|
||||
|
||||
@@ -13,23 +13,48 @@
|
||||
#include <string_view>
|
||||
#include "auth/cache.hh"
|
||||
#include "cql3/description.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "utils/on_internal_error.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
constexpr std::string_view maintenance_socket_role_manager_name = "com.scylladb.auth.MaintenanceSocketRoleManager";
|
||||
static logging::logger log("maintenance_socket_role_manager");
|
||||
|
||||
static const class_registrator<
|
||||
role_manager,
|
||||
maintenance_socket_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> registration(sstring{maintenance_socket_role_manager_name});
|
||||
future<> maintenance_socket_role_manager::ensure_role_operations_are_enabled() {
|
||||
if (_is_maintenance_mode) {
|
||||
on_internal_error(log, "enabling role operations not allowed in maintenance mode");
|
||||
}
|
||||
|
||||
if (_std_mgr.has_value()) {
|
||||
on_internal_error(log, "role operations are already enabled");
|
||||
}
|
||||
|
||||
_std_mgr.emplace(_qp, _group0_client, _migration_manager, _cache);
|
||||
return _std_mgr->start();
|
||||
}
|
||||
|
||||
void maintenance_socket_role_manager::set_maintenance_mode() {
|
||||
if (_std_mgr.has_value()) {
|
||||
on_internal_error(log, "cannot enter maintenance mode after role operations have been enabled");
|
||||
}
|
||||
_is_maintenance_mode = true;
|
||||
}
|
||||
|
||||
maintenance_socket_role_manager::maintenance_socket_role_manager(
|
||||
cql3::query_processor& qp,
|
||||
::service::raft_group0_client& rg0c,
|
||||
::service::migration_manager& mm,
|
||||
cache& c)
|
||||
: _qp(qp)
|
||||
, _group0_client(rg0c)
|
||||
, _migration_manager(mm)
|
||||
, _cache(c)
|
||||
, _std_mgr(std::nullopt)
|
||||
, _is_maintenance_mode(false) {
|
||||
}
|
||||
|
||||
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {
|
||||
return maintenance_socket_role_manager_name;
|
||||
return "com.scylladb.auth.MaintenanceSocketRoleManager";
|
||||
}
|
||||
|
||||
const resource_set& maintenance_socket_role_manager::protected_resources() const {
|
||||
@@ -43,81 +68,161 @@ future<> maintenance_socket_role_manager::start() {
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::stop() {
|
||||
return make_ready_future<>();
|
||||
return _std_mgr ? _std_mgr->stop() : make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::ensure_superuser_is_created() {
|
||||
return make_ready_future<>();
|
||||
return _std_mgr ? _std_mgr->ensure_superuser_is_created() : make_ready_future<>();
|
||||
}
|
||||
|
||||
template<typename T = void>
|
||||
future<T> operation_not_supported_exception(std::string_view operation) {
|
||||
future<T> operation_not_available_in_maintenance_mode_exception(std::string_view operation) {
|
||||
return make_exception_future<T>(
|
||||
std::runtime_error(fmt::format("role manager: {} operation not supported through maintenance socket", operation)));
|
||||
std::runtime_error(fmt::format("role manager: {} operation not available through maintenance socket in maintenance mode", operation)));
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::create(std::string_view role_name, const role_config&, ::service::group0_batch&) {
|
||||
return operation_not_supported_exception("CREATE");
|
||||
template<typename T = void>
|
||||
future<T> manager_not_ready_exception(std::string_view operation) {
|
||||
return make_exception_future<T>(
|
||||
std::runtime_error(fmt::format("role manager: {} operation not available because manager not ready yet (role operations not enabled)", operation)));
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::validate_operation(std::string_view name) const {
|
||||
if (_is_maintenance_mode) {
|
||||
return operation_not_available_in_maintenance_mode_exception(name);
|
||||
}
|
||||
if (!_std_mgr) {
|
||||
return manager_not_ready_exception(name);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::create(std::string_view role_name, const role_config& c, ::service::group0_batch& mc) {
|
||||
auto f = validate_operation("CREATE");
|
||||
if (f.failed()) {
|
||||
return f;
|
||||
}
|
||||
return _std_mgr->create(role_name, c, mc);
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::drop(std::string_view role_name, ::service::group0_batch& mc) {
|
||||
return operation_not_supported_exception("DROP");
|
||||
auto f = validate_operation("DROP");
|
||||
if (f.failed()) {
|
||||
return f;
|
||||
}
|
||||
return _std_mgr->drop(role_name, mc);
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::alter(std::string_view role_name, const role_config_update&, ::service::group0_batch&) {
|
||||
return operation_not_supported_exception("ALTER");
|
||||
future<> maintenance_socket_role_manager::alter(std::string_view role_name, const role_config_update& u, ::service::group0_batch& mc) {
|
||||
auto f = validate_operation("ALTER");
|
||||
if (f.failed()) {
|
||||
return f;
|
||||
}
|
||||
return _std_mgr->alter(role_name, u, mc);
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::grant(std::string_view grantee_name, std::string_view role_name, ::service::group0_batch& mc) {
|
||||
return operation_not_supported_exception("GRANT");
|
||||
auto f = validate_operation("GRANT");
|
||||
if (f.failed()) {
|
||||
return f;
|
||||
}
|
||||
return _std_mgr->grant(grantee_name, role_name, mc);
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::revoke(std::string_view revokee_name, std::string_view role_name, ::service::group0_batch& mc) {
|
||||
return operation_not_supported_exception("REVOKE");
|
||||
auto f = validate_operation("REVOKE");
|
||||
if (f.failed()) {
|
||||
return f;
|
||||
}
|
||||
return _std_mgr->revoke(revokee_name, role_name, mc);
|
||||
}
|
||||
|
||||
future<role_set> maintenance_socket_role_manager::query_granted(std::string_view grantee_name, recursive_role_query) {
|
||||
return operation_not_supported_exception<role_set>("QUERY GRANTED");
|
||||
future<role_set> maintenance_socket_role_manager::query_granted(std::string_view grantee_name, recursive_role_query m) {
|
||||
auto f = validate_operation("QUERY GRANTED");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<role_set>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->query_granted(grantee_name, m);
|
||||
}
|
||||
|
||||
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state&) {
|
||||
return operation_not_supported_exception<role_to_directly_granted_map>("QUERY ALL DIRECTLY GRANTED");
|
||||
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state& qs) {
|
||||
auto f = validate_operation("QUERY ALL DIRECTLY GRANTED");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<role_to_directly_granted_map>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->query_all_directly_granted(qs);
|
||||
}
|
||||
|
||||
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state&) {
|
||||
return operation_not_supported_exception<role_set>("QUERY ALL");
|
||||
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state& qs) {
|
||||
auto f = validate_operation("QUERY ALL");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<role_set>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->query_all(qs);
|
||||
}
|
||||
|
||||
future<bool> maintenance_socket_role_manager::exists(std::string_view role_name) {
|
||||
return operation_not_supported_exception<bool>("EXISTS");
|
||||
auto f = validate_operation("EXISTS");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<bool>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->exists(role_name);
|
||||
}
|
||||
|
||||
future<bool> maintenance_socket_role_manager::is_superuser(std::string_view role_name) {
|
||||
return make_ready_future<bool>(true);
|
||||
auto f = validate_operation("IS SUPERUSER");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<bool>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->is_superuser(role_name);
|
||||
}
|
||||
|
||||
future<bool> maintenance_socket_role_manager::can_login(std::string_view role_name) {
|
||||
return make_ready_future<bool>(true);
|
||||
auto f = validate_operation("CAN LOGIN");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<bool>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->can_login(role_name);
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) {
|
||||
return operation_not_supported_exception<std::optional<sstring>>("GET ATTRIBUTE");
|
||||
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
auto f = validate_operation("GET ATTRIBUTE");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<std::optional<sstring>>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->get_attribute(role_name, attribute_name, qs);
|
||||
}
|
||||
|
||||
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) {
|
||||
return operation_not_supported_exception<role_manager::attribute_vals>("QUERY ATTRIBUTE");
|
||||
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) {
|
||||
auto f = validate_operation("QUERY ATTRIBUTE FOR ALL");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<role_manager::attribute_vals>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->query_attribute_for_all(attribute_name, qs);
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) {
|
||||
return operation_not_supported_exception("SET ATTRIBUTE");
|
||||
auto f = validate_operation("SET ATTRIBUTE");
|
||||
if (f.failed()) {
|
||||
return f;
|
||||
}
|
||||
return _std_mgr->set_attribute(role_name, attribute_name, attribute_value, mc);
|
||||
}
|
||||
|
||||
future<> maintenance_socket_role_manager::remove_attribute(std::string_view role_name, std::string_view attribute_name, ::service::group0_batch& mc) {
|
||||
return operation_not_supported_exception("REMOVE ATTRIBUTE");
|
||||
auto f = validate_operation("REMOVE ATTRIBUTE");
|
||||
if (f.failed()) {
|
||||
return f;
|
||||
}
|
||||
return _std_mgr->remove_attribute(role_name, attribute_name, mc);
|
||||
}
|
||||
|
||||
future<std::vector<cql3::description>> maintenance_socket_role_manager::describe_role_grants() {
|
||||
return operation_not_supported_exception<std::vector<cql3::description>>("DESCRIBE SCHEMA WITH INTERNALS");
|
||||
auto f = validate_operation("DESCRIBE ROLE GRANTS");
|
||||
if (f.failed()) {
|
||||
return make_exception_future<std::vector<cql3::description>>(f.get_exception());
|
||||
}
|
||||
return _std_mgr->describe_role_grants();
|
||||
}
|
||||
|
||||
} // namespace auth
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/resource.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include "auth/standard_role_manager.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
|
||||
namespace cql3 {
|
||||
@@ -24,13 +25,26 @@ class raft_group0_client;
|
||||
|
||||
namespace auth {
|
||||
|
||||
extern const std::string_view maintenance_socket_role_manager_name;
|
||||
|
||||
// This role manager is used by the maintenance socket. It has disabled all role management operations to not depend on
|
||||
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
|
||||
// This role manager is used by the maintenance socket. It has disabled all role management operations
|
||||
// in maintenance mode. In normal mode it delegates all operations to a standard_role_manager,
|
||||
// which is created on demand when the node joins the cluster.
|
||||
class maintenance_socket_role_manager final : public role_manager {
|
||||
cql3::query_processor& _qp;
|
||||
::service::raft_group0_client& _group0_client;
|
||||
::service::migration_manager& _migration_manager;
|
||||
cache& _cache;
|
||||
std::optional<standard_role_manager> _std_mgr;
|
||||
bool _is_maintenance_mode;
|
||||
|
||||
public:
|
||||
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
|
||||
void set_maintenance_mode() override;
|
||||
|
||||
// Ensures role management operations are enabled.
|
||||
// It must be called once the node has joined the cluster.
|
||||
// In the meantime all role management operations will fail.
|
||||
future<> ensure_role_operations_are_enabled() override;
|
||||
|
||||
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
|
||||
virtual std::string_view qualified_java_name() const noexcept override;
|
||||
|
||||
@@ -42,21 +56,21 @@ public:
|
||||
|
||||
virtual future<> ensure_superuser_is_created() override;
|
||||
|
||||
virtual future<> create(std::string_view role_name, const role_config&, ::service::group0_batch&) override;
|
||||
virtual future<> create(std::string_view role_name, const role_config& c, ::service::group0_batch& mc) override;
|
||||
|
||||
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
|
||||
|
||||
virtual future<> alter(std::string_view role_name, const role_config_update&, ::service::group0_batch&) override;
|
||||
virtual future<> alter(std::string_view role_name, const role_config_update& u, ::service::group0_batch& mc) override;
|
||||
|
||||
virtual future<> grant(std::string_view grantee_name, std::string_view role_name, ::service::group0_batch& mc) override;
|
||||
|
||||
virtual future<> revoke(std::string_view revokee_name, std::string_view role_name, ::service::group0_batch& mc) override;
|
||||
|
||||
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
|
||||
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query m) override;
|
||||
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
|
||||
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state& qs) override;
|
||||
|
||||
virtual future<role_set> query_all(::service::query_state&) override;
|
||||
virtual future<role_set> query_all(::service::query_state& qs) override;
|
||||
|
||||
virtual future<bool> exists(std::string_view role_name) override;
|
||||
|
||||
@@ -64,15 +78,19 @@ public:
|
||||
|
||||
virtual future<bool> can_login(std::string_view role_name) override;
|
||||
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
|
||||
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) override;
|
||||
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
|
||||
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) override;
|
||||
|
||||
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;
|
||||
|
||||
virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name, ::service::group0_batch& mc) override;
|
||||
|
||||
virtual future<std::vector<cql3::description>> describe_role_grants() override;
|
||||
|
||||
private:
|
||||
future<> validate_operation(std::string_view name) const;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "db/config.hh"
|
||||
@@ -37,27 +36,18 @@ constexpr std::string_view password_authenticator_name("org.apache.cassandra.aut
|
||||
|
||||
// name of the hash column.
|
||||
static constexpr std::string_view SALTED_HASH = "salted_hash";
|
||||
static constexpr std::string_view DEFAULT_USER_NAME = meta::DEFAULT_SUPERUSER_NAME;
|
||||
static const sstring DEFAULT_USER_PASSWORD = sstring(meta::DEFAULT_SUPERUSER_NAME);
|
||||
|
||||
static logging::logger plogger("password_authenticator");
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<
|
||||
authenticator,
|
||||
password_authenticator,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
|
||||
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
|
||||
|
||||
static std::string_view get_config_value(std::string_view value, std::string_view def) {
|
||||
return value.empty() ? def : value;
|
||||
}
|
||||
std::string password_authenticator::default_superuser(const db::config& cfg) {
|
||||
return std::string(get_config_value(cfg.auth_superuser_name(), DEFAULT_USER_NAME));
|
||||
std::string password_authenticator::default_superuser(cql3::query_processor& qp) {
|
||||
if (legacy_mode(qp)) {
|
||||
return std::string(meta::DEFAULT_SUPERUSER_NAME);
|
||||
}
|
||||
|
||||
return qp.db().get_config().auth_superuser_name();
|
||||
}
|
||||
|
||||
password_authenticator::~password_authenticator() {
|
||||
@@ -69,7 +59,6 @@ password_authenticator::password_authenticator(cql3::query_processor& qp, ::serv
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(default_superuser(qp.db().get_config()))
|
||||
{}
|
||||
|
||||
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
|
||||
@@ -123,11 +112,14 @@ future<> password_authenticator::migrate_legacy_metadata() const {
|
||||
}
|
||||
|
||||
future<> password_authenticator::legacy_create_default_if_missing() {
|
||||
if (_superuser.empty()) {
|
||||
on_internal_error(plogger, "Legacy auth default superuser name is empty");
|
||||
}
|
||||
const auto exists = co_await legacy::default_role_row_satisfies(_qp, &has_salted_hash, _superuser);
|
||||
if (exists) {
|
||||
co_return;
|
||||
}
|
||||
std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
|
||||
std::string salted_pwd(_qp.db().get_config().auth_superuser_salted_password());
|
||||
if (salted_pwd.empty()) {
|
||||
salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt, _scheme);
|
||||
}
|
||||
@@ -147,6 +139,9 @@ future<> password_authenticator::legacy_create_default_if_missing() {
|
||||
|
||||
future<> password_authenticator::maybe_create_default_password() {
|
||||
auto needs_password = [this] () -> future<bool> {
|
||||
if (_superuser.empty()) {
|
||||
co_return false;
|
||||
}
|
||||
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE is_superuser = true ALLOW FILTERING", get_auth_ks_name(_qp), meta::roles_table::name);
|
||||
auto results = co_await _qp.execute_internal(query,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
@@ -178,9 +173,9 @@ future<> password_authenticator::maybe_create_default_password() {
|
||||
co_return;
|
||||
}
|
||||
// Set default superuser's password.
|
||||
std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
|
||||
std::string salted_pwd(_qp.db().get_config().auth_superuser_salted_password());
|
||||
if (salted_pwd.empty()) {
|
||||
salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt, _scheme);
|
||||
co_return;
|
||||
}
|
||||
const auto update_query = update_row_query();
|
||||
co_await collect_mutations(_qp, batch, update_query, {salted_pwd, _superuser});
|
||||
@@ -210,6 +205,8 @@ future<> password_authenticator::maybe_create_default_password_with_retries() {
|
||||
|
||||
future<> password_authenticator::start() {
|
||||
return once_among_shards([this] {
|
||||
_superuser = default_superuser(_qp);
|
||||
|
||||
// Verify that at least one hashing scheme is supported.
|
||||
passwords::detail::verify_scheme(_scheme);
|
||||
plogger.info("Using password hashing scheme: {}", passwords::detail::prefix_for_scheme(_scheme));
|
||||
@@ -217,6 +214,9 @@ future<> password_authenticator::start() {
|
||||
_stopped = do_after_system_ready(_as, [this] {
|
||||
return async([this] {
|
||||
if (legacy_mode(_qp)) {
|
||||
if (_superuser.empty()) {
|
||||
on_internal_error(plogger, "Legacy auth default superuser name is empty");
|
||||
}
|
||||
if (!_superuser_created_promise.available()) {
|
||||
// Counterintuitively, we mark promise as ready before any startup work
|
||||
// because wait_for_schema_agreement() below will block indefinitely
|
||||
@@ -251,6 +251,9 @@ future<> password_authenticator::start() {
|
||||
});
|
||||
|
||||
if (legacy_mode(_qp)) {
|
||||
if (_superuser.empty()) {
|
||||
on_internal_error(plogger, "Legacy auth default superuser name is empty");
|
||||
}
|
||||
static const sstring create_roles_query = fmt::format(
|
||||
"CREATE TABLE {}.{} ("
|
||||
" {} text PRIMARY KEY,"
|
||||
@@ -280,7 +283,7 @@ future<> password_authenticator::stop() {
|
||||
db::consistency_level password_authenticator::consistency_for_user(std::string_view role_name) {
|
||||
// TODO: this is plain dung. Why treat hardcoded default special, but for example a user-created
|
||||
// super user uses plain LOCAL_ONE?
|
||||
if (role_name == DEFAULT_USER_NAME) {
|
||||
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
|
||||
return db::consistency_level::QUORUM;
|
||||
}
|
||||
return db::consistency_level::LOCAL_ONE;
|
||||
|
||||
@@ -51,7 +51,7 @@ class password_authenticator : public authenticator {
|
||||
|
||||
public:
|
||||
static db::consistency_level consistency_for_user(std::string_view role_name);
|
||||
static std::string default_superuser(const db::config&);
|
||||
static std::string default_superuser(cql3::query_processor& qp);
|
||||
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
|
||||
|
||||
@@ -112,6 +112,11 @@ public:
|
||||
|
||||
virtual future<> stop() = 0;
|
||||
|
||||
///
|
||||
/// Notify that the maintenance mode is starting.
|
||||
///
|
||||
virtual void set_maintenance_mode() {}
|
||||
|
||||
///
|
||||
/// Ensure that superuser role exists.
|
||||
///
|
||||
@@ -119,6 +124,11 @@ public:
|
||||
///
|
||||
virtual future<> ensure_superuser_is_created() = 0;
|
||||
|
||||
///
|
||||
/// Ensure role management operations are enabled. Some role managers may defer initialization.
|
||||
///
|
||||
virtual future<> ensure_role_operations_are_enabled() { return make_ready_future<>(); }
|
||||
|
||||
///
|
||||
/// \returns an exceptional future with \ref role_already_exists for a role that has previously been created.
|
||||
///
|
||||
|
||||
@@ -22,21 +22,11 @@
|
||||
#include "db/config.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
static logging::logger mylog("saslauthd_authenticator");
|
||||
|
||||
// To ensure correct initialization order, we unfortunately need to use a string literal.
|
||||
static const class_registrator<
|
||||
authenticator,
|
||||
saslauthd_authenticator,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
|
||||
: _socket_path(qp.db().get_config().saslauthd_socket_path())
|
||||
{}
|
||||
|
||||
146
auth/service.cc
146
auth/service.cc
@@ -16,6 +16,8 @@
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
@@ -23,8 +25,17 @@
|
||||
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
#include "auth/allow_all_authorizer.hh"
|
||||
#include "auth/certificate_authenticator.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "auth/default_authorizer.hh"
|
||||
#include "auth/ldap_role_manager.hh"
|
||||
#include "auth/maintenance_socket_authenticator.hh"
|
||||
#include "auth/maintenance_socket_role_manager.hh"
|
||||
#include "auth/password_authenticator.hh"
|
||||
#include "auth/role_or_anonymous.hh"
|
||||
#include "auth/saslauthd_authenticator.hh"
|
||||
#include "auth/standard_role_manager.hh"
|
||||
#include "auth/transitional.hh"
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/description.hh"
|
||||
@@ -43,7 +54,6 @@
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "mutation/timestamp.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "data_dictionary/keyspace_metadata.hh"
|
||||
#include "service/storage_service.hh"
|
||||
@@ -176,8 +186,9 @@ service::service(
|
||||
cql3::query_processor& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
::service::migration_notifier& mn,
|
||||
::service::migration_manager& mm,
|
||||
const service_config& sc,
|
||||
authorizer_factory authorizer_factory,
|
||||
authenticator_factory authenticator_factory,
|
||||
role_manager_factory role_manager_factory,
|
||||
maintenance_socket_enabled used_by_maintenance_socket,
|
||||
cache& cache)
|
||||
: service(
|
||||
@@ -185,9 +196,9 @@ service::service(
|
||||
qp,
|
||||
g0,
|
||||
mn,
|
||||
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
|
||||
authorizer_factory(),
|
||||
authenticator_factory(),
|
||||
role_manager_factory(),
|
||||
used_by_maintenance_socket) {
|
||||
}
|
||||
|
||||
@@ -307,6 +318,10 @@ future<permission_set> service::get_permissions(const role_or_anonymous& maybe_r
|
||||
return _cache.get_permissions(maybe_role, r);
|
||||
}
|
||||
|
||||
void service::set_maintenance_mode() {
|
||||
_role_manager->set_maintenance_mode();
|
||||
}
|
||||
|
||||
future<bool> service::has_superuser(std::string_view role_name, const role_set& roles) const {
|
||||
for (const auto& role : roles) {
|
||||
if (co_await _role_manager->is_superuser(role)) {
|
||||
@@ -342,6 +357,10 @@ static void validate_authentication_options_are_supported(
|
||||
}
|
||||
}
|
||||
|
||||
future<> service::ensure_role_operations_are_enabled() {
|
||||
return _role_manager->ensure_role_operations_are_enabled();
|
||||
}
|
||||
|
||||
future<> service::create_role(std::string_view name,
|
||||
const role_config& config,
|
||||
const authentication_options& options,
|
||||
@@ -659,6 +678,10 @@ future<std::vector<cql3::description>> service::describe_auth(bool with_hashed_p
|
||||
// Free functions.
|
||||
//
|
||||
|
||||
void set_maintenance_mode(service& ser) {
|
||||
ser.set_maintenance_mode();
|
||||
}
|
||||
|
||||
future<bool> has_superuser(const service& ser, const authenticated_user& u) {
|
||||
if (is_anonymous(u)) {
|
||||
return make_ready_future<bool>(false);
|
||||
@@ -667,6 +690,10 @@ future<bool> has_superuser(const service& ser, const authenticated_user& u) {
|
||||
return ser.has_superuser(*u.name);
|
||||
}
|
||||
|
||||
future<> ensure_role_operations_are_enabled(service& ser) {
|
||||
return ser.underlying_role_manager().ensure_role_operations_are_enabled();
|
||||
}
|
||||
|
||||
future<role_set> get_roles(const service& ser, const authenticated_user& u) {
|
||||
if (is_anonymous(u)) {
|
||||
return make_ready_future<role_set>();
|
||||
@@ -928,4 +955,111 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
std::nullopt);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
std::string_view get_short_name(std::string_view name) {
|
||||
auto pos = name.find_last_of('.');
|
||||
if (pos == std::string_view::npos) {
|
||||
return name;
|
||||
}
|
||||
return name.substr(pos + 1);
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
authorizer_factory make_authorizer_factory(
|
||||
std::string_view name,
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm) {
|
||||
std::string_view short_name = get_short_name(name);
|
||||
|
||||
if (boost::iequals(short_name, "AllowAllAuthorizer")) {
|
||||
return [&qp, &g0, &mm] {
|
||||
return std::make_unique<allow_all_authorizer>(qp.local(), g0, mm.local());
|
||||
};
|
||||
} else if (boost::iequals(short_name, "CassandraAuthorizer")) {
|
||||
return [&qp, &g0, &mm] {
|
||||
return std::make_unique<default_authorizer>(qp.local(), g0, mm.local());
|
||||
};
|
||||
} else if (boost::iequals(short_name, "TransitionalAuthorizer")) {
|
||||
return [&qp, &g0, &mm] {
|
||||
return std::make_unique<transitional_authorizer>(qp.local(), g0, mm.local());
|
||||
};
|
||||
}
|
||||
throw std::invalid_argument(fmt::format("Unknown authorizer: {}", name));
|
||||
}
|
||||
|
||||
authenticator_factory make_authenticator_factory(
|
||||
std::string_view name,
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& auth_cache) {
|
||||
std::string_view short_name = get_short_name(name);
|
||||
|
||||
if (boost::iequals(short_name, "AllowAllAuthenticator")) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<allow_all_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
} else if (boost::iequals(short_name, "PasswordAuthenticator")) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<password_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
} else if (boost::iequals(short_name, "CertificateAuthenticator")) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<certificate_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
} else if (boost::iequals(short_name, "SaslauthdAuthenticator")) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<saslauthd_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
} else if (boost::iequals(short_name, "TransitionalAuthenticator")) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<transitional_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
}
|
||||
throw std::invalid_argument(fmt::format("Unknown authenticator: {}", name));
|
||||
}
|
||||
|
||||
role_manager_factory make_role_manager_factory(
|
||||
std::string_view name,
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& auth_cache) {
|
||||
std::string_view short_name = get_short_name(name);
|
||||
|
||||
if (boost::iequals(short_name, "CassandraRoleManager")) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<standard_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
} else if (boost::iequals(short_name, "LDAPRoleManager")) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<ldap_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
}
|
||||
throw std::invalid_argument(fmt::format("Unknown role manager: {}", name));
|
||||
}
|
||||
|
||||
authenticator_factory make_maintenance_socket_authenticator_factory(
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& auth_cache) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<maintenance_socket_authenticator>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
}
|
||||
|
||||
role_manager_factory make_maintenance_socket_role_manager_factory(
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& auth_cache) {
|
||||
return [&qp, &g0, &mm, &auth_cache] {
|
||||
return std::make_unique<maintenance_socket_role_manager>(qp.local(), g0, mm.local(), auth_cache.local());
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -44,11 +44,10 @@ namespace auth {
|
||||
|
||||
class role_or_anonymous;
|
||||
|
||||
struct service_config final {
|
||||
sstring authorizer_java_name;
|
||||
sstring authenticator_java_name;
|
||||
sstring role_manager_java_name;
|
||||
};
|
||||
/// Factory function types for creating auth module instances on each shard.
|
||||
using authorizer_factory = std::function<std::unique_ptr<authorizer>()>;
|
||||
using authenticator_factory = std::function<std::unique_ptr<authenticator>()>;
|
||||
using role_manager_factory = std::function<std::unique_ptr<role_manager>()>;
|
||||
|
||||
///
|
||||
/// Due to poor (in this author's opinion) decisions of Apache Cassandra, certain choices of one role-manager,
|
||||
@@ -108,15 +107,16 @@ public:
|
||||
|
||||
///
|
||||
/// This constructor is intended to be used when the class is sharded via \ref seastar::sharded. In that case, the
|
||||
/// arguments must be copyable, which is why we delay construction with instance-construction instructions instead
|
||||
/// arguments must be copyable, which is why we delay construction with instance-construction factories instead
|
||||
/// of the instances themselves.
|
||||
///
|
||||
service(
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_notifier&,
|
||||
::service::migration_manager&,
|
||||
const service_config&,
|
||||
authorizer_factory,
|
||||
authenticator_factory,
|
||||
role_manager_factory,
|
||||
maintenance_socket_enabled,
|
||||
cache&);
|
||||
|
||||
@@ -138,6 +138,11 @@ public:
|
||||
///
|
||||
future<permission_set> get_uncached_permissions(const role_or_anonymous&, const resource&) const;
|
||||
|
||||
///
|
||||
/// Notify the service that the node is entering maintenance mode.
|
||||
///
|
||||
void set_maintenance_mode();
|
||||
|
||||
///
|
||||
/// Query whether the named role has been granted a role that is a superuser.
|
||||
///
|
||||
@@ -147,6 +152,11 @@ public:
|
||||
///
|
||||
future<bool> has_superuser(std::string_view role_name) const;
|
||||
|
||||
///
|
||||
/// Ensure that the role operations are enabled. Some role managers defer initialization.
|
||||
///
|
||||
future<> ensure_role_operations_are_enabled();
|
||||
|
||||
///
|
||||
/// Create a role with optional authentication information.
|
||||
///
|
||||
@@ -208,8 +218,12 @@ private:
|
||||
future<std::vector<cql3::description>> describe_permissions() const;
|
||||
};
|
||||
|
||||
void set_maintenance_mode(service&);
|
||||
|
||||
future<bool> has_superuser(const service&, const authenticated_user&);
|
||||
|
||||
future<> ensure_role_operations_are_enabled(service&);
|
||||
|
||||
future<role_set> get_roles(const service&, const authenticated_user&);
|
||||
|
||||
future<permission_set> get_permissions(const service&, const authenticated_user&, const resource&);
|
||||
@@ -396,4 +410,52 @@ future<> commit_mutations(service& ser, ::service::group0_batch&& mc);
|
||||
// Migrates data from old keyspace to new one which supports linearizable writes via raft.
|
||||
future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_client& g0, start_operation_func_t start_operation_func, abort_source& as);
|
||||
|
||||
///
|
||||
/// Factory helper functions for creating auth module instances.
|
||||
/// These are intended for use with sharded<service>::start() where copyable arguments are required.
|
||||
/// The returned factories capture the sharded references and call .local() when invoked on each shard.
|
||||
///
|
||||
|
||||
/// Creates an authorizer factory for config-selectable authorizer types.
|
||||
/// @param name The authorizer class name (e.g., "CassandraAuthorizer", "AllowAllAuthorizer")
|
||||
authorizer_factory make_authorizer_factory(
|
||||
std::string_view name,
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm);
|
||||
|
||||
/// Creates an authenticator factory for config-selectable authenticator types.
|
||||
/// @param name The authenticator class name (e.g., "PasswordAuthenticator", "AllowAllAuthenticator")
|
||||
authenticator_factory make_authenticator_factory(
|
||||
std::string_view name,
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& cache);
|
||||
|
||||
/// Creates a role_manager factory for config-selectable role manager types.
|
||||
/// @param name The role manager class name (e.g., "CassandraRoleManager")
|
||||
role_manager_factory make_role_manager_factory(
|
||||
std::string_view name,
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& cache);
|
||||
|
||||
/// Creates a factory for the maintenance socket authenticator.
|
||||
/// This authenticator is not config-selectable and is only used for the maintenance socket.
|
||||
authenticator_factory make_maintenance_socket_authenticator_factory(
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& cache);
|
||||
|
||||
/// Creates a factory for the maintenance socket role manager.
|
||||
/// This role manager is not config-selectable and is only used for the maintenance socket.
|
||||
role_manager_factory make_maintenance_socket_role_manager_factory(
|
||||
sharded<cql3::query_processor>& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
sharded<::service::migration_manager>& mm,
|
||||
sharded<cache>& cache);
|
||||
|
||||
}
|
||||
|
||||
@@ -34,7 +34,6 @@
|
||||
#include <seastar/core/loop.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "password_authenticator.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
@@ -44,14 +43,6 @@ namespace auth {
|
||||
|
||||
static logging::logger log("standard_role_manager");
|
||||
|
||||
static const class_registrator<
|
||||
role_manager,
|
||||
standard_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
|
||||
|
||||
static db::consistency_level consistency_for_role(std::string_view role_name) noexcept {
|
||||
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
|
||||
return db::consistency_level::QUORUM;
|
||||
@@ -123,7 +114,6 @@ standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::servic
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
|
||||
{}
|
||||
|
||||
std::string_view standard_role_manager::qualified_java_name() const noexcept {
|
||||
@@ -186,6 +176,9 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
|
||||
}
|
||||
|
||||
future<> standard_role_manager::legacy_create_default_role_if_missing() {
|
||||
if (_superuser.empty()) {
|
||||
on_internal_error(log, "Legacy auth default superuser name is empty");
|
||||
}
|
||||
try {
|
||||
const auto exists = co_await legacy::default_role_row_satisfies(_qp, &has_can_login, _superuser);
|
||||
if (exists) {
|
||||
@@ -209,6 +202,9 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
|
||||
}
|
||||
|
||||
future<> standard_role_manager::maybe_create_default_role() {
|
||||
if (_superuser.empty()) {
|
||||
co_return;
|
||||
}
|
||||
auto has_superuser = [this] () -> future<bool> {
|
||||
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE is_superuser = true ALLOW FILTERING", get_auth_ks_name(_qp), meta::roles_table::name);
|
||||
auto results = co_await _qp.execute_internal(query, db::consistency_level::LOCAL_ONE,
|
||||
@@ -300,6 +296,8 @@ future<> standard_role_manager::migrate_legacy_metadata() {
|
||||
|
||||
future<> standard_role_manager::start() {
|
||||
return once_among_shards([this] () -> future<> {
|
||||
_superuser = password_authenticator::default_superuser(_qp);
|
||||
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await create_legacy_metadata_tables_if_missing();
|
||||
}
|
||||
|
||||
@@ -8,244 +8,200 @@
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include "auth/transitional.hh"
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/default_authorizer.hh"
|
||||
#include "auth/password_authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
static const sstring PACKAGE_NAME("com.scylladb.auth.");
|
||||
|
||||
static const sstring& transitional_authenticator_name() {
|
||||
static const sstring name = PACKAGE_NAME + "TransitionalAuthenticator";
|
||||
return name;
|
||||
transitional_authenticator::transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
|
||||
}
|
||||
|
||||
static const sstring& transitional_authorizer_name() {
|
||||
static const sstring name = PACKAGE_NAME + "TransitionalAuthorizer";
|
||||
return name;
|
||||
transitional_authenticator::transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a)) {
|
||||
}
|
||||
|
||||
class transitional_authenticator : public authenticator {
|
||||
std::unique_ptr<authenticator> _authenticator;
|
||||
future<> transitional_authenticator::start() {
|
||||
return _authenticator->start();
|
||||
}
|
||||
|
||||
public:
|
||||
static const sstring PASSWORD_AUTHENTICATOR_NAME;
|
||||
future<> transitional_authenticator::stop() {
|
||||
return _authenticator->stop();
|
||||
}
|
||||
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
|
||||
std::string_view transitional_authenticator::qualified_java_name() const {
|
||||
return "com.scylladb.auth.TransitionalAuthenticator";
|
||||
}
|
||||
|
||||
bool transitional_authenticator::require_authentication() const {
|
||||
return true;
|
||||
}
|
||||
|
||||
authentication_option_set transitional_authenticator::supported_options() const {
|
||||
return _authenticator->supported_options();
|
||||
}
|
||||
|
||||
authentication_option_set transitional_authenticator::alterable_options() const {
|
||||
return _authenticator->alterable_options();
|
||||
}
|
||||
|
||||
future<authenticated_user> transitional_authenticator::authenticate(const credentials_map& credentials) const {
|
||||
auto i = credentials.find(authenticator::USERNAME_KEY);
|
||||
if ((i == credentials.end() || i->second.empty())
|
||||
&& (!credentials.contains(PASSWORD_KEY) || credentials.at(PASSWORD_KEY).empty())) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a)) {
|
||||
}
|
||||
|
||||
virtual future<> start() override {
|
||||
return _authenticator->start();
|
||||
}
|
||||
|
||||
virtual future<> stop() override {
|
||||
return _authenticator->stop();
|
||||
}
|
||||
|
||||
virtual std::string_view qualified_java_name() const override {
|
||||
return transitional_authenticator_name();
|
||||
}
|
||||
|
||||
virtual bool require_authentication() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual authentication_option_set supported_options() const override {
|
||||
return _authenticator->supported_options();
|
||||
}
|
||||
|
||||
virtual authentication_option_set alterable_options() const override {
|
||||
return _authenticator->alterable_options();
|
||||
}
|
||||
|
||||
virtual future<authenticated_user> authenticate(const credentials_map& credentials) const override {
|
||||
auto i = credentials.find(authenticator::USERNAME_KEY);
|
||||
if ((i == credentials.end() || i->second.empty())
|
||||
&& (!credentials.contains(PASSWORD_KEY) || credentials.at(PASSWORD_KEY).empty())) {
|
||||
return make_ready_future().then([this, &credentials] {
|
||||
return _authenticator->authenticate(credentials);
|
||||
}).handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
return make_ready_future().then([this, &credentials] {
|
||||
return _authenticator->authenticate(credentials);
|
||||
}).handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
virtual future<> create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override {
|
||||
return _authenticator->create(role_name, options, mc);
|
||||
}
|
||||
|
||||
virtual future<> alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override {
|
||||
return _authenticator->alter(role_name, options, mc);
|
||||
}
|
||||
|
||||
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override {
|
||||
return _authenticator->drop(role_name, mc);
|
||||
}
|
||||
|
||||
virtual future<custom_options> query_custom_options(std::string_view role_name) const override {
|
||||
return _authenticator->query_custom_options(role_name);
|
||||
}
|
||||
|
||||
virtual bool uses_password_hashes() const override {
|
||||
return _authenticator->uses_password_hashes();
|
||||
}
|
||||
|
||||
virtual future<std::optional<sstring>> get_password_hash(std::string_view role_name) const override {
|
||||
return _authenticator->get_password_hash(role_name);
|
||||
}
|
||||
|
||||
virtual const resource_set& protected_resources() const override {
|
||||
return _authenticator->protected_resources();
|
||||
}
|
||||
|
||||
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override {
|
||||
class sasl_wrapper : public sasl_challenge {
|
||||
public:
|
||||
sasl_wrapper(::shared_ptr<sasl_challenge> sasl)
|
||||
: _sasl(std::move(sasl)) {
|
||||
}
|
||||
|
||||
virtual bytes evaluate_response(bytes_view client_response) override {
|
||||
try {
|
||||
return _sasl->evaluate_response(client_response);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
_complete = true;
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool is_complete() const override {
|
||||
return _complete || _sasl->is_complete();
|
||||
}
|
||||
|
||||
virtual future<authenticated_user> get_authenticated_user() const override {
|
||||
return futurize_invoke([this] {
|
||||
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const sstring& get_username() const override {
|
||||
return _sasl->get_username();
|
||||
}
|
||||
|
||||
private:
|
||||
::shared_ptr<sasl_challenge> _sasl;
|
||||
|
||||
bool _complete = false;
|
||||
};
|
||||
return ::make_shared<sasl_wrapper>(_authenticator->new_sasl_challenge());
|
||||
}
|
||||
|
||||
virtual future<> ensure_superuser_is_created() const override {
|
||||
return _authenticator->ensure_superuser_is_created();
|
||||
}
|
||||
};
|
||||
|
||||
class transitional_authorizer : public authorizer {
|
||||
std::unique_ptr<authorizer> _authorizer;
|
||||
|
||||
public:
|
||||
transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
|
||||
: transitional_authorizer(std::make_unique<default_authorizer>(qp, g0, mm)) {
|
||||
}
|
||||
transitional_authorizer(std::unique_ptr<authorizer> a)
|
||||
: _authorizer(std::move(a)) {
|
||||
}
|
||||
|
||||
~transitional_authorizer() {
|
||||
}
|
||||
|
||||
virtual future<> start() override {
|
||||
return _authorizer->start();
|
||||
}
|
||||
|
||||
virtual future<> stop() override {
|
||||
return _authorizer->stop();
|
||||
}
|
||||
|
||||
virtual std::string_view qualified_java_name() const override {
|
||||
return transitional_authorizer_name();
|
||||
}
|
||||
|
||||
virtual future<permission_set> authorize(const role_or_anonymous&, const resource&) const override {
|
||||
static const permission_set transitional_permissions =
|
||||
permission_set::of<
|
||||
permission::CREATE,
|
||||
permission::ALTER,
|
||||
permission::DROP,
|
||||
permission::SELECT,
|
||||
permission::MODIFY>();
|
||||
|
||||
return make_ready_future<permission_set>(transitional_permissions);
|
||||
}
|
||||
|
||||
virtual future<> grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override {
|
||||
return _authorizer->grant(s, std::move(ps), r, mc);
|
||||
}
|
||||
|
||||
virtual future<> revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override {
|
||||
return _authorizer->revoke(s, std::move(ps), r, mc);
|
||||
}
|
||||
|
||||
virtual future<std::vector<permission_details>> list_all() const override {
|
||||
return _authorizer->list_all();
|
||||
}
|
||||
|
||||
virtual future<> revoke_all(std::string_view s, ::service::group0_batch& mc) override {
|
||||
return _authorizer->revoke_all(s, mc);
|
||||
}
|
||||
|
||||
virtual future<> revoke_all(const resource& r, ::service::group0_batch& mc) override {
|
||||
return _authorizer->revoke_all(r, mc);
|
||||
}
|
||||
|
||||
virtual const resource_set& protected_resources() const override {
|
||||
return _authorizer->protected_resources();
|
||||
}
|
||||
};
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
//
|
||||
// To ensure correct initialization order, we unfortunately need to use string literals.
|
||||
//
|
||||
future<> transitional_authenticator::create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) {
|
||||
return _authenticator->create(role_name, options, mc);
|
||||
}
|
||||
|
||||
static const class_registrator<
|
||||
auth::authenticator,
|
||||
auth::transitional_authenticator,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
future<> transitional_authenticator::alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) {
|
||||
return _authenticator->alter(role_name, options, mc);
|
||||
}
|
||||
|
||||
static const class_registrator<
|
||||
auth::authorizer,
|
||||
auth::transitional_authorizer,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> transitional_authorizer_reg(auth::PACKAGE_NAME + "TransitionalAuthorizer");
|
||||
future<> transitional_authenticator::drop(std::string_view role_name, ::service::group0_batch& mc) {
|
||||
return _authenticator->drop(role_name, mc);
|
||||
}
|
||||
|
||||
future<custom_options> transitional_authenticator::query_custom_options(std::string_view role_name) const {
|
||||
return _authenticator->query_custom_options(role_name);
|
||||
}
|
||||
|
||||
bool transitional_authenticator::uses_password_hashes() const {
|
||||
return _authenticator->uses_password_hashes();
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> transitional_authenticator::get_password_hash(std::string_view role_name) const {
|
||||
return _authenticator->get_password_hash(role_name);
|
||||
}
|
||||
|
||||
const resource_set& transitional_authenticator::protected_resources() const {
|
||||
return _authenticator->protected_resources();
|
||||
}
|
||||
|
||||
::shared_ptr<sasl_challenge> transitional_authenticator::new_sasl_challenge() const {
|
||||
class sasl_wrapper : public sasl_challenge {
|
||||
public:
|
||||
sasl_wrapper(::shared_ptr<sasl_challenge> sasl)
|
||||
: _sasl(std::move(sasl)) {
|
||||
}
|
||||
|
||||
virtual bytes evaluate_response(bytes_view client_response) override {
|
||||
try {
|
||||
return _sasl->evaluate_response(client_response);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
_complete = true;
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool is_complete() const override {
|
||||
return _complete || _sasl->is_complete();
|
||||
}
|
||||
|
||||
virtual future<authenticated_user> get_authenticated_user() const override {
|
||||
return futurize_invoke([this] {
|
||||
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const sstring& get_username() const override {
|
||||
return _sasl->get_username();
|
||||
}
|
||||
|
||||
private:
|
||||
::shared_ptr<sasl_challenge> _sasl;
|
||||
|
||||
bool _complete = false;
|
||||
};
|
||||
return ::make_shared<sasl_wrapper>(_authenticator->new_sasl_challenge());
|
||||
}
|
||||
|
||||
future<> transitional_authenticator::ensure_superuser_is_created() const {
|
||||
return _authenticator->ensure_superuser_is_created();
|
||||
}
|
||||
|
||||
transitional_authorizer::transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
|
||||
: transitional_authorizer(std::make_unique<default_authorizer>(qp, g0, mm)) {
|
||||
}
|
||||
|
||||
transitional_authorizer::transitional_authorizer(std::unique_ptr<authorizer> a)
|
||||
: _authorizer(std::move(a)) {
|
||||
}
|
||||
|
||||
transitional_authorizer::~transitional_authorizer() {
|
||||
}
|
||||
|
||||
future<> transitional_authorizer::start() {
|
||||
return _authorizer->start();
|
||||
}
|
||||
|
||||
future<> transitional_authorizer::stop() {
|
||||
return _authorizer->stop();
|
||||
}
|
||||
|
||||
std::string_view transitional_authorizer::qualified_java_name() const {
|
||||
return "com.scylladb.auth.TransitionalAuthorizer";
|
||||
}
|
||||
|
||||
future<permission_set> transitional_authorizer::authorize(const role_or_anonymous&, const resource&) const {
|
||||
static const permission_set transitional_permissions =
|
||||
permission_set::of<
|
||||
permission::CREATE,
|
||||
permission::ALTER,
|
||||
permission::DROP,
|
||||
permission::SELECT,
|
||||
permission::MODIFY>();
|
||||
|
||||
return make_ready_future<permission_set>(transitional_permissions);
|
||||
}
|
||||
|
||||
future<> transitional_authorizer::grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) {
|
||||
return _authorizer->grant(s, std::move(ps), r, mc);
|
||||
}
|
||||
|
||||
future<> transitional_authorizer::revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) {
|
||||
return _authorizer->revoke(s, std::move(ps), r, mc);
|
||||
}
|
||||
|
||||
future<std::vector<permission_details>> transitional_authorizer::list_all() const {
|
||||
return _authorizer->list_all();
|
||||
}
|
||||
|
||||
future<> transitional_authorizer::revoke_all(std::string_view s, ::service::group0_batch& mc) {
|
||||
return _authorizer->revoke_all(s, mc);
|
||||
}
|
||||
|
||||
future<> transitional_authorizer::revoke_all(const resource& r, ::service::group0_batch& mc) {
|
||||
return _authorizer->revoke_all(r, mc);
|
||||
}
|
||||
|
||||
const resource_set& transitional_authorizer::protected_resources() const {
|
||||
return _authorizer->protected_resources();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
81
auth/transitional.hh
Normal file
81
auth/transitional.hh
Normal file
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/cache.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
class raft_group0_client;
|
||||
class migration_manager;
|
||||
}
|
||||
|
||||
namespace auth {
|
||||
|
||||
///
|
||||
/// Transitional authenticator that allows anonymous access when credentials are not provided
|
||||
/// or authentication fails. Used for migration scenarios.
|
||||
///
|
||||
class transitional_authenticator : public authenticator {
|
||||
std::unique_ptr<authenticator> _authenticator;
|
||||
|
||||
public:
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache);
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a);
|
||||
|
||||
virtual future<> start() override;
|
||||
virtual future<> stop() override;
|
||||
virtual std::string_view qualified_java_name() const override;
|
||||
virtual bool require_authentication() const override;
|
||||
virtual authentication_option_set supported_options() const override;
|
||||
virtual authentication_option_set alterable_options() const override;
|
||||
virtual future<authenticated_user> authenticate(const credentials_map& credentials) const override;
|
||||
virtual future<> create(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override;
|
||||
virtual future<> alter(std::string_view role_name, const authentication_options& options, ::service::group0_batch& mc) override;
|
||||
virtual future<> drop(std::string_view role_name, ::service::group0_batch& mc) override;
|
||||
virtual future<custom_options> query_custom_options(std::string_view role_name) const override;
|
||||
virtual bool uses_password_hashes() const override;
|
||||
virtual future<std::optional<sstring>> get_password_hash(std::string_view role_name) const override;
|
||||
virtual const resource_set& protected_resources() const override;
|
||||
virtual ::shared_ptr<sasl_challenge> new_sasl_challenge() const override;
|
||||
virtual future<> ensure_superuser_is_created() const override;
|
||||
};
|
||||
|
||||
///
|
||||
/// Transitional authorizer that grants a fixed set of permissions to all users.
|
||||
/// Used for migration scenarios.
|
||||
///
|
||||
class transitional_authorizer : public authorizer {
|
||||
std::unique_ptr<authorizer> _authorizer;
|
||||
|
||||
public:
|
||||
transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm);
|
||||
transitional_authorizer(std::unique_ptr<authorizer> a);
|
||||
~transitional_authorizer();
|
||||
|
||||
virtual future<> start() override;
|
||||
virtual future<> stop() override;
|
||||
virtual std::string_view qualified_java_name() const override;
|
||||
virtual future<permission_set> authorize(const role_or_anonymous&, const resource&) const override;
|
||||
virtual future<> grant(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override;
|
||||
virtual future<> revoke(std::string_view s, permission_set ps, const resource& r, ::service::group0_batch& mc) override;
|
||||
virtual future<std::vector<permission_details>> list_all() const override;
|
||||
virtual future<> revoke_all(std::string_view s, ::service::group0_batch& mc) override;
|
||||
virtual future<> revoke_all(const resource& r, ::service::group0_batch& mc) override;
|
||||
virtual const resource_set& protected_resources() const override;
|
||||
};
|
||||
|
||||
} // namespace auth
|
||||
@@ -618,7 +618,7 @@ static void set_default_properties_log_table(schema_builder& b, const schema& s,
|
||||
b.set_caching_options(caching_options::get_disabled_caching_options());
|
||||
|
||||
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
|
||||
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, db.get_token_metadata(), false));
|
||||
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, false));
|
||||
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
|
||||
}
|
||||
|
||||
|
||||
@@ -598,8 +598,7 @@ protected:
|
||||
// Garbage collected sstables that were added to SSTable set and should be eventually removed from it.
|
||||
std::vector<sstables::shared_sstable> _used_garbage_collected_sstables;
|
||||
utils::observable<> _stop_request_observable;
|
||||
// optional tombstone_gc_state that is used when gc has to check only the compacting sstables to collect tombstones.
|
||||
std::optional<tombstone_gc_state> _tombstone_gc_state_with_commitlog_check_disabled;
|
||||
tombstone_gc_state _tombstone_gc_state;
|
||||
int64_t _output_repaired_at = 0;
|
||||
private:
|
||||
// Keeps track of monitors for input sstable.
|
||||
@@ -649,9 +648,12 @@ protected:
|
||||
, _owned_ranges(std::move(descriptor.owned_ranges))
|
||||
, _sharder(descriptor.sharder)
|
||||
, _owned_ranges_checker(_owned_ranges ? std::optional<dht::incremental_owned_ranges_checker>(*_owned_ranges) : std::nullopt)
|
||||
, _tombstone_gc_state_with_commitlog_check_disabled(descriptor.gc_check_only_compacting_sstables ? std::make_optional(_table_s.get_tombstone_gc_state().with_commitlog_check_disabled()) : std::nullopt)
|
||||
, _tombstone_gc_state(_table_s.get_tombstone_gc_state())
|
||||
, _progress_monitor(progress_monitor)
|
||||
{
|
||||
if (descriptor.gc_check_only_compacting_sstables) {
|
||||
_tombstone_gc_state = _tombstone_gc_state.with_commitlog_check_disabled();
|
||||
}
|
||||
std::unordered_set<sstables::run_id> ssts_run_ids;
|
||||
_contains_multi_fragment_runs = std::any_of(_sstables.begin(), _sstables.end(), [&ssts_run_ids] (sstables::shared_sstable& sst) {
|
||||
return !ssts_run_ids.insert(sst->run_identifier()).second;
|
||||
@@ -849,8 +851,8 @@ private:
|
||||
return _table_s.get_compaction_strategy().make_sstable_set(_table_s);
|
||||
}
|
||||
|
||||
const tombstone_gc_state& get_tombstone_gc_state() const {
|
||||
return _tombstone_gc_state_with_commitlog_check_disabled ? _tombstone_gc_state_with_commitlog_check_disabled.value() : _table_s.get_tombstone_gc_state();
|
||||
tombstone_gc_state get_tombstone_gc_state() const {
|
||||
return _tombstone_gc_state;
|
||||
}
|
||||
|
||||
future<> setup() {
|
||||
@@ -1050,7 +1052,7 @@ private:
|
||||
return can_never_purge;
|
||||
}
|
||||
return [this] (const dht::decorated_key& dk, is_shadowable is_shadowable) {
|
||||
return get_max_purgeable_timestamp(_table_s, *_selector, _compacting_for_max_purgeable_func, dk, _bloom_filter_checks, _compacting_max_timestamp, _tombstone_gc_state_with_commitlog_check_disabled.has_value(), is_shadowable);
|
||||
return get_max_purgeable_timestamp(_table_s, *_selector, _compacting_for_max_purgeable_func, dk, _bloom_filter_checks, _compacting_max_timestamp, !_tombstone_gc_state.is_commitlog_check_enabled(), is_shadowable);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -54,7 +54,7 @@ public:
|
||||
virtual future<> on_compaction_completion(compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0;
|
||||
virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0;
|
||||
virtual bool tombstone_gc_enabled() const noexcept = 0;
|
||||
virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0;
|
||||
virtual tombstone_gc_state get_tombstone_gc_state() const noexcept = 0;
|
||||
virtual compaction_backlog_tracker& get_backlog_tracker() = 0;
|
||||
virtual const std::string get_group_id() const noexcept = 0;
|
||||
virtual seastar::condition_variable& get_staging_done_condition() noexcept = 0;
|
||||
|
||||
@@ -778,6 +778,7 @@ compaction_manager::get_incremental_repair_read_lock(compaction::compaction_grou
|
||||
cmlog.debug("Get get_incremental_repair_read_lock for {} started", reason);
|
||||
}
|
||||
compaction::compaction_state& cs = get_compaction_state(&t);
|
||||
auto gh = cs.gate.hold();
|
||||
auto ret = co_await cs.incremental_repair_lock.hold_read_lock();
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_read_lock for {} done", reason);
|
||||
@@ -791,6 +792,7 @@ compaction_manager::get_incremental_repair_write_lock(compaction::compaction_gro
|
||||
cmlog.debug("Get get_incremental_repair_write_lock for {} started", reason);
|
||||
}
|
||||
compaction::compaction_state& cs = get_compaction_state(&t);
|
||||
auto gh = cs.gate.hold();
|
||||
auto ret = co_await cs.incremental_repair_lock.hold_write_lock();
|
||||
if (!reason.empty()) {
|
||||
cmlog.debug("Get get_incremental_repair_write_lock for {} done", reason);
|
||||
@@ -1040,7 +1042,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
|
||||
_compaction_controller.set_max_shares(max_shares);
|
||||
}))
|
||||
, _strategy_control(std::make_unique<strategy_control>(*this))
|
||||
, _tombstone_gc_state(_shared_tombstone_gc_state) {
|
||||
{
|
||||
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
|
||||
register_metrics();
|
||||
// Bandwidth throttling is node-wide, updater is needed on single shard
|
||||
@@ -1064,7 +1066,7 @@ compaction_manager::compaction_manager(tasks::task_manager& tm)
|
||||
, _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer()))
|
||||
, _compaction_max_shares_observer(_cfg.max_shares.observe([] (const float& max_shares) {}))
|
||||
, _strategy_control(std::make_unique<strategy_control>(*this))
|
||||
, _tombstone_gc_state(_shared_tombstone_gc_state) {
|
||||
{
|
||||
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
|
||||
// No metric registration because this constructor is supposed to be used only by the testing
|
||||
// infrastructure.
|
||||
@@ -2387,6 +2389,8 @@ future<> compaction_manager::remove(compaction_group_view& t, sstring reason) no
|
||||
if (!c_state.gate.is_closed()) {
|
||||
auto close_gate = c_state.gate.close();
|
||||
co_await stop_ongoing_compactions(reason, &t);
|
||||
// Wait for users of incremental repair lock (can be either repair itself or maintenance compactions).
|
||||
co_await c_state.incremental_repair_lock.write_lock();
|
||||
co_await std::move(close_gate);
|
||||
}
|
||||
|
||||
|
||||
@@ -167,10 +167,6 @@ private:
|
||||
std::unique_ptr<strategy_control> _strategy_control;
|
||||
|
||||
shared_tombstone_gc_state _shared_tombstone_gc_state;
|
||||
// TODO: tombstone_gc_state should now have value semantics, but the code
|
||||
// still uses it with reference semantics (inconsistently though).
|
||||
// Drop this member, once the code is converted into using value semantics.
|
||||
tombstone_gc_state _tombstone_gc_state;
|
||||
|
||||
utils::disk_space_monitor::subscription _out_of_space_subscription;
|
||||
private:
|
||||
@@ -456,10 +452,6 @@ public:
|
||||
|
||||
compaction::strategy_control& get_strategy_control() const noexcept;
|
||||
|
||||
const tombstone_gc_state& get_tombstone_gc_state() const noexcept {
|
||||
return _tombstone_gc_state;
|
||||
};
|
||||
|
||||
shared_tombstone_gc_state& get_shared_tombstone_gc_state() noexcept {
|
||||
return _shared_tombstone_gc_state;
|
||||
};
|
||||
|
||||
@@ -639,7 +639,7 @@ strict_is_not_null_in_views: true
|
||||
# * workdir: the node will open the maintenance socket on the path <scylla's workdir>/cql.m,
|
||||
# where <scylla's workdir> is a path defined by the workdir configuration option,
|
||||
# * <socket path>: the node will open the maintenance socket on the path <socket path>.
|
||||
maintenance_socket: ignore
|
||||
maintenance_socket: workdir
|
||||
|
||||
# If set to true, configuration parameters defined with LiveUpdate option can be updated in runtime with CQL
|
||||
# by updating system.config virtual table. If we don't want any configuration parameter to be changed in runtime
|
||||
@@ -648,10 +648,9 @@ maintenance_socket: ignore
|
||||
# e.g. for cloud users, for whom scylla's configuration should be changed only by support engineers.
|
||||
# live_updatable_config_params_changeable_via_cql: true
|
||||
|
||||
# ****************
|
||||
# * GUARDRAILS *
|
||||
# ****************
|
||||
|
||||
#
|
||||
# Guardrails options
|
||||
#
|
||||
# Guardrails to warn or fail when Replication Factor is smaller/greater than the threshold.
|
||||
# Please note that the value of 0 is always allowed,
|
||||
# which means that having no replication at all, i.e. RF = 0, is always valid.
|
||||
@@ -661,6 +660,27 @@ maintenance_socket: ignore
|
||||
# minimum_replication_factor_warn_threshold: 3
|
||||
# maximum_replication_factor_warn_threshold: -1
|
||||
# maximum_replication_factor_fail_threshold: -1
|
||||
#
|
||||
# Guardrails to warn about or disallow creating a keyspace with specific replication strategy.
|
||||
# Each of these 2 settings is a list storing replication strategies considered harmful.
|
||||
# The replication strategies to choose from are:
|
||||
# 1) SimpleStrategy,
|
||||
# 2) NetworkTopologyStrategy,
|
||||
# 3) LocalStrategy,
|
||||
# 4) EverywhereStrategy
|
||||
#
|
||||
# replication_strategy_warn_list:
|
||||
# - SimpleStrategy
|
||||
# replication_strategy_fail_list:
|
||||
#
|
||||
# Guardrail to enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE.
|
||||
# enable_create_table_with_compact_storage: false
|
||||
#
|
||||
# Guardrails to limit usage of selected consistency levels for writes.
|
||||
# Adding a warning to a CQL query response can significantly increase network
|
||||
# traffic and decrease overall throughput.
|
||||
# write_consistency_levels_warned: []
|
||||
# write_consistency_levels_disallowed: []
|
||||
|
||||
#
|
||||
# System information encryption settings
|
||||
@@ -838,21 +858,6 @@ maintenance_socket: ignore
|
||||
# key_namespace: <kmip key namespace> (optional)
|
||||
#
|
||||
|
||||
# Guardrails to warn about or disallow creating a keyspace with specific replication strategy.
|
||||
# Each of these 2 settings is a list storing replication strategies considered harmful.
|
||||
# The replication strategies to choose from are:
|
||||
# 1) SimpleStrategy,
|
||||
# 2) NetworkTopologyStrategy,
|
||||
# 3) LocalStrategy,
|
||||
# 4) EverywhereStrategy
|
||||
#
|
||||
# replication_strategy_warn_list:
|
||||
# - SimpleStrategy
|
||||
# replication_strategy_fail_list:
|
||||
|
||||
# Guardrail to enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE.
|
||||
# enable_create_table_with_compact_storage: false
|
||||
|
||||
# Control tablets for new keyspaces.
|
||||
# Can be set to: disabled|enabled|enforced
|
||||
#
|
||||
|
||||
@@ -1204,6 +1204,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'gms/application_state.cc',
|
||||
'gms/inet_address.cc',
|
||||
'dht/i_partitioner.cc',
|
||||
'dht/fixed_shard.cc',
|
||||
'dht/token.cc',
|
||||
'dht/murmur3_partitioner.cc',
|
||||
'dht/boot_strapper.cc',
|
||||
@@ -1275,6 +1276,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'auth/resource.cc',
|
||||
'auth/roles-metadata.cc',
|
||||
'auth/passwords.cc',
|
||||
'auth/maintenance_socket_authenticator.cc',
|
||||
'auth/password_authenticator.cc',
|
||||
'auth/permission.cc',
|
||||
'auth/service.cc',
|
||||
@@ -1340,6 +1342,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'service/strong_consistency/groups_manager.cc',
|
||||
'service/strong_consistency/coordinator.cc',
|
||||
'service/strong_consistency/state_machine.cc',
|
||||
'service/strong_consistency/raft_groups_storage.cc',
|
||||
'service/raft/group0_state_id_handler.cc',
|
||||
'service/raft/group0_state_machine.cc',
|
||||
'service/raft/group0_state_machine_merger.cc',
|
||||
|
||||
@@ -10,8 +10,9 @@
|
||||
#include "types/types.hh"
|
||||
#include "types/vector.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include <span>
|
||||
#include <bit>
|
||||
#include <span>
|
||||
#include <seastar/core/byteorder.hh>
|
||||
|
||||
namespace cql3 {
|
||||
namespace functions {
|
||||
@@ -30,14 +31,10 @@ std::vector<float> extract_float_vector(const bytes_opt& param, vector_dimension
|
||||
expected_size, dimension, param->size()));
|
||||
}
|
||||
|
||||
std::vector<float> result;
|
||||
result.reserve(dimension);
|
||||
|
||||
bytes_view view(*param);
|
||||
std::vector<float> result(dimension);
|
||||
const char* p = reinterpret_cast<const char*>(param->data());
|
||||
for (size_t i = 0; i < dimension; ++i) {
|
||||
// read_simple handles network byte order (big-endian) conversion
|
||||
uint32_t raw = read_simple<uint32_t>(view);
|
||||
result.push_back(std::bit_cast<float>(raw));
|
||||
result[i] = std::bit_cast<float>(consume_be<uint32_t>(p));
|
||||
}
|
||||
|
||||
return result;
|
||||
@@ -55,13 +52,14 @@ namespace {
|
||||
// You should only use this function if you need to preserve the original vectors and cannot normalize
|
||||
// them in advance.
|
||||
float compute_cosine_similarity(std::span<const float> v1, std::span<const float> v2) {
|
||||
double dot_product = 0.0;
|
||||
double squared_norm_a = 0.0;
|
||||
double squared_norm_b = 0.0;
|
||||
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
|
||||
float dot_product = 0.0;
|
||||
float squared_norm_a = 0.0;
|
||||
float squared_norm_b = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = v1[i];
|
||||
double b = v2[i];
|
||||
float a = v1[i];
|
||||
float b = v2[i];
|
||||
|
||||
dot_product += a * b;
|
||||
squared_norm_a += a * a;
|
||||
@@ -79,13 +77,14 @@ float compute_cosine_similarity(std::span<const float> v1, std::span<const float
|
||||
}
|
||||
|
||||
float compute_euclidean_similarity(std::span<const float> v1, std::span<const float> v2) {
|
||||
double sum = 0.0;
|
||||
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
|
||||
float sum = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = v1[i];
|
||||
double b = v2[i];
|
||||
float a = v1[i];
|
||||
float b = v2[i];
|
||||
|
||||
double diff = a - b;
|
||||
float diff = a - b;
|
||||
sum += diff * diff;
|
||||
}
|
||||
|
||||
@@ -98,11 +97,12 @@ float compute_euclidean_similarity(std::span<const float> v1, std::span<const fl
|
||||
// Assumes that both vectors are L2-normalized.
|
||||
// This similarity is intended as an optimized way to perform cosine similarity calculation.
|
||||
float compute_dot_product_similarity(std::span<const float> v1, std::span<const float> v2) {
|
||||
double dot_product = 0.0;
|
||||
#pragma clang fp contract(fast) reassociate(on) // Allow the compiler to optimize the loop.
|
||||
float dot_product = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = v1[i];
|
||||
double b = v2[i];
|
||||
float a = v1[i];
|
||||
float b = v2[i];
|
||||
dot_product += a * b;
|
||||
}
|
||||
|
||||
|
||||
@@ -91,7 +91,11 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
|
||||
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
|
||||
, _authorized_prepared_cache_validity_in_ms_observer(_db.get_config().permissions_validity_in_ms.observe(_auth_prepared_cache_cfg_cb))
|
||||
, _lang_manager(langm)
|
||||
, _write_consistency_levels_warned_observer(_db.get_config().write_consistency_levels_warned.observe([this](const auto& v) { _write_consistency_levels_warned = to_consistency_level_set(v); }))
|
||||
, _write_consistency_levels_disallowed_observer(_db.get_config().write_consistency_levels_disallowed.observe([this](const auto& v) { _write_consistency_levels_disallowed = to_consistency_level_set(v); }))
|
||||
{
|
||||
_write_consistency_levels_warned = to_consistency_level_set(_db.get_config().write_consistency_levels_warned());
|
||||
_write_consistency_levels_disallowed = to_consistency_level_set(_db.get_config().write_consistency_levels_disallowed());
|
||||
namespace sm = seastar::metrics;
|
||||
namespace stm = statements;
|
||||
using clevel = db::consistency_level;
|
||||
@@ -508,6 +512,32 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
|
||||
"i.e. attempts to set a forbidden replication strategy in a keyspace via CREATE/ALTER KEYSPACE.")).set_skip_when_empty(),
|
||||
});
|
||||
|
||||
std::vector<sm::metric_definition> cql_cl_group;
|
||||
for (auto cl = size_t(clevel::MIN_VALUE); cl <= size_t(clevel::MAX_VALUE); ++cl) {
|
||||
cql_cl_group.push_back(
|
||||
sm::make_counter(
|
||||
"writes_per_consistency_level",
|
||||
_cql_stats.writes_per_consistency_level[cl],
|
||||
sm::description("Counts the number of writes for each consistency level."),
|
||||
{cl_label(clevel(cl)), basic_level}).set_skip_when_empty());
|
||||
}
|
||||
_metrics.add_group("cql", cql_cl_group);
|
||||
|
||||
_metrics.add_group("cql", {
|
||||
sm::make_counter(
|
||||
"write_consistency_levels_disallowed_violations",
|
||||
_cql_stats.write_consistency_levels_disallowed_violations,
|
||||
sm::description("Counts the number of write_consistency_levels_disallowed guardrail violations, "
|
||||
"i.e. attempts to write with a forbidden consistency level."),
|
||||
{basic_level}),
|
||||
sm::make_counter(
|
||||
"write_consistency_levels_warned_violations",
|
||||
_cql_stats.write_consistency_levels_warned_violations,
|
||||
sm::description("Counts the number of write_consistency_levels_warned guardrail violations, "
|
||||
"i.e. attempts to write with a discouraged consistency level."),
|
||||
{basic_level}),
|
||||
});
|
||||
|
||||
_mnotifier.register_listener(_migration_subscriber.get());
|
||||
}
|
||||
|
||||
@@ -1233,6 +1263,14 @@ shared_ptr<cql_transport::messages::result_message> query_processor::bounce_to_s
|
||||
return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(shard, std::move(cached_fn_calls));
|
||||
}
|
||||
|
||||
query_processor::consistency_level_set query_processor::to_consistency_level_set(const query_processor::cl_option_list& levels) {
|
||||
query_processor::consistency_level_set result;
|
||||
for (const auto& opt : levels) {
|
||||
result.set(static_cast<db::consistency_level>(opt));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void query_processor::update_authorized_prepared_cache_config() {
|
||||
utils::loading_cache_config cfg;
|
||||
cfg.max_size = _mcfg.authorized_prepared_cache_size;
|
||||
|
||||
@@ -34,6 +34,9 @@
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "types/types.hh"
|
||||
#include "db/auth_version.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/config.hh"
|
||||
#include "utils/enum_option.hh"
|
||||
#include "service/storage_proxy_fwd.hh"
|
||||
|
||||
|
||||
@@ -142,6 +145,30 @@ private:
|
||||
std::unordered_map<sstring, std::unique_ptr<statements::prepared_statement>> _internal_statements;
|
||||
|
||||
lang::manager& _lang_manager;
|
||||
|
||||
using cl_option_list = std::vector<enum_option<db::consistency_level_restriction_t>>;
|
||||
|
||||
/// Efficient bitmask-based set of consistency levels.
|
||||
using consistency_level_set = enum_set<super_enum<db::consistency_level,
|
||||
db::consistency_level::ANY,
|
||||
db::consistency_level::ONE,
|
||||
db::consistency_level::TWO,
|
||||
db::consistency_level::THREE,
|
||||
db::consistency_level::QUORUM,
|
||||
db::consistency_level::ALL,
|
||||
db::consistency_level::LOCAL_QUORUM,
|
||||
db::consistency_level::EACH_QUORUM,
|
||||
db::consistency_level::SERIAL,
|
||||
db::consistency_level::LOCAL_SERIAL,
|
||||
db::consistency_level::LOCAL_ONE>>;
|
||||
|
||||
|
||||
consistency_level_set _write_consistency_levels_warned;
|
||||
consistency_level_set _write_consistency_levels_disallowed;
|
||||
utils::observer<cl_option_list> _write_consistency_levels_warned_observer;
|
||||
utils::observer<cl_option_list> _write_consistency_levels_disallowed_observer;
|
||||
|
||||
static consistency_level_set to_consistency_level_set(const cl_option_list& levels);
|
||||
public:
|
||||
static const sstring CQL_VERSION;
|
||||
|
||||
@@ -493,6 +520,21 @@ public:
|
||||
int32_t page_size = -1,
|
||||
service::node_local_only node_local_only = service::node_local_only::no) const;
|
||||
|
||||
enum class write_consistency_guardrail_state { NONE, WARN, FAIL };
|
||||
inline write_consistency_guardrail_state check_write_consistency_levels_guardrail(db::consistency_level cl) {
|
||||
_cql_stats.writes_per_consistency_level[size_t(cl)]++;
|
||||
|
||||
if (_write_consistency_levels_disallowed.contains(cl)) [[unlikely]] {
|
||||
_cql_stats.write_consistency_levels_disallowed_violations++;
|
||||
return write_consistency_guardrail_state::FAIL;
|
||||
}
|
||||
if (_write_consistency_levels_warned.contains(cl)) [[unlikely]] {
|
||||
_cql_stats.write_consistency_levels_warned_violations++;
|
||||
return write_consistency_guardrail_state::WARN;
|
||||
}
|
||||
return write_consistency_guardrail_state::NONE;
|
||||
}
|
||||
|
||||
private:
|
||||
// Keep the holder until you stop using the `remote` services.
|
||||
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
|
||||
|
||||
@@ -212,11 +212,20 @@ public:
|
||||
}
|
||||
|
||||
virtual uint32_t add_column_for_post_processing(const column_definition& c) override {
|
||||
uint32_t index = selection::add_column_for_post_processing(c);
|
||||
auto it = std::find_if(_selectors.begin(), _selectors.end(), [&c](const expr::expression& e) {
|
||||
auto col = expr::as_if<expr::column_value>(&e);
|
||||
return col && col->col == &c;
|
||||
});
|
||||
if (it != _selectors.end()) {
|
||||
return std::distance(_selectors.begin(), it);
|
||||
}
|
||||
|
||||
add_column(c);
|
||||
get_result_metadata()->add_non_serialized_column(c.column_specification);
|
||||
_selectors.push_back(expr::column_value(&c));
|
||||
if (_inner_loop.empty()) {
|
||||
// Simple case: no aggregation
|
||||
return index;
|
||||
return _selectors.size() - 1;
|
||||
} else {
|
||||
// Complex case: aggregation, must pass through temporary
|
||||
auto first_func = cql3::functions::aggregate_fcts::make_first_function(c.type);
|
||||
@@ -470,10 +479,21 @@ std::vector<const column_definition*> selection::wildcard_columns(schema_ptr sch
|
||||
return simple_selection::make(schema, std::move(columns), false);
|
||||
}
|
||||
|
||||
uint32_t selection::add_column_for_post_processing(const column_definition& c) {
|
||||
selection::add_column_result selection::add_column(const column_definition& c) {
|
||||
auto index = index_of(c);
|
||||
if (index != -1) {
|
||||
return {index, false};
|
||||
}
|
||||
_columns.push_back(&c);
|
||||
_metadata->add_non_serialized_column(c.column_specification);
|
||||
return _columns.size() - 1;
|
||||
return {_columns.size() - 1, true};
|
||||
}
|
||||
|
||||
uint32_t selection::add_column_for_post_processing(const column_definition& c) {
|
||||
auto col = add_column(c);
|
||||
if (col.added) {
|
||||
_metadata->add_non_serialized_column(c.column_specification);
|
||||
}
|
||||
return col.index;
|
||||
}
|
||||
|
||||
::shared_ptr<selection> selection::from_selectors(data_dictionary::database db, schema_ptr schema, const sstring& ks, const std::vector<prepared_selector>& prepared_selectors) {
|
||||
|
||||
@@ -130,6 +130,14 @@ public:
|
||||
virtual std::vector<shared_ptr<functions::function>> used_functions() const { return {}; }
|
||||
|
||||
query::partition_slice::option_set get_query_options();
|
||||
protected:
|
||||
// Result of add_column: index in _columns and whether it was added now (or existed already).
|
||||
struct add_column_result {
|
||||
uint32_t index;
|
||||
bool added;
|
||||
};
|
||||
// Adds a column to the _columns if not already present, returns add_column_result.
|
||||
add_column_result add_column(const column_definition& c);
|
||||
private:
|
||||
static bool processes_selection(const std::vector<prepared_selector>& prepared_selectors);
|
||||
|
||||
|
||||
@@ -259,6 +259,15 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
||||
if (options.getSerialConsistency() == null)
|
||||
throw new InvalidRequestException("Invalid empty serial consistency level");
|
||||
#endif
|
||||
|
||||
const auto cl = options.get_consistency();
|
||||
const query_processor::write_consistency_guardrail_state guardrail_state = qp.check_write_consistency_levels_guardrail(cl);
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::FAIL) {
|
||||
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(
|
||||
exceptions::invalid_request_exception(
|
||||
format("Consistency level {} is not allowed for write operations", cl)));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < _statements.size(); ++i) {
|
||||
_statements[i].statement->restrictions().validate_primary_key(options.for_statement(i));
|
||||
}
|
||||
@@ -266,23 +275,31 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
||||
if (_has_conditions) {
|
||||
++_stats.cas_batches;
|
||||
_stats.statements_in_cas_batches += _statements.size();
|
||||
return execute_with_conditions(qp, options, query_state);
|
||||
return execute_with_conditions(qp, options, query_state).then([guardrail_state, cl] (auto result) {
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
|
||||
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
|
||||
}
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
++_stats.batches;
|
||||
_stats.statements_in_batches += _statements.size();
|
||||
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
|
||||
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, &options, timeout, tr_state = query_state.get_trace_state(),
|
||||
return get_mutations(qp, options, timeout, local, now, query_state).then([this, &qp, cl, timeout, tr_state = query_state.get_trace_state(),
|
||||
permit = query_state.get_permit()] (utils::chunked_vector<mutation> ms) mutable {
|
||||
return execute_without_conditions(qp, std::move(ms), options.get_consistency(), timeout, std::move(tr_state), std::move(permit));
|
||||
}).then([] (coordinator_result<> res) {
|
||||
return execute_without_conditions(qp, std::move(ms), cl, timeout, std::move(tr_state), std::move(permit));
|
||||
}).then([guardrail_state, cl] (coordinator_result<> res) {
|
||||
if (!res) {
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
|
||||
seastar::make_shared<cql_transport::messages::result_message::exception>(std::move(res).assume_error()));
|
||||
}
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
|
||||
make_shared<cql_transport::messages::result_message::void_message>());
|
||||
auto result = make_shared<cql_transport::messages::result_message::void_message>();
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
|
||||
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
|
||||
}
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(result));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -659,8 +659,7 @@ future<std::vector<std::vector<managed_bytes_opt>>> schema_describe_statement::d
|
||||
auto& auth_service = *client_state.get_auth_service();
|
||||
|
||||
if (config.with_hashed_passwords) {
|
||||
const auto maybe_user = client_state.user();
|
||||
if (!maybe_user || !co_await auth::has_superuser(auth_service, *maybe_user)) {
|
||||
if (!co_await client_state.has_superuser()) {
|
||||
co_await coroutine::return_exception(exceptions::unauthorized_exception(
|
||||
"DESCRIBE SCHEMA WITH INTERNALS AND PASSWORDS can only be issued by a superuser"));
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ future<> cql3::statements::list_permissions_statement::check_access(query_proces
|
||||
const auto& as = *state.get_auth_service();
|
||||
const auto user = state.user();
|
||||
|
||||
return auth::has_superuser(as, *user).then([this, &as, user](bool has_super) {
|
||||
return state.has_superuser().then([this, &as, user](bool has_super) {
|
||||
if (has_super) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ cql3::statements::list_users_statement::execute(query_processor& qp, service::qu
|
||||
const auto& cs = state.get_client_state();
|
||||
const auto& as = *cs.get_auth_service();
|
||||
|
||||
return auth::has_superuser(as, *cs.user()).then([&cs, &as, make_results = std::move(make_results)](bool has_superuser) mutable {
|
||||
return cs.has_superuser().then([&cs, &as, make_results = std::move(make_results)](bool has_superuser) mutable {
|
||||
if (has_superuser) {
|
||||
return as.underlying_role_manager().query_all().then([&as, make_results = std::move(make_results)](std::unordered_set<sstring> roles) mutable {
|
||||
return make_results(as, std::move(roles));
|
||||
|
||||
@@ -268,10 +268,22 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
|
||||
|
||||
inc_cql_stats(qs.get_client_state().is_internal());
|
||||
|
||||
const auto cl = options.get_consistency();
|
||||
const query_processor::write_consistency_guardrail_state guardrail_state = qp.check_write_consistency_levels_guardrail(cl);
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::FAIL) {
|
||||
co_return coroutine::exception(
|
||||
std::make_exception_ptr(exceptions::invalid_request_exception(
|
||||
format("Consistency level {} is not allowed for write operations", cl))));
|
||||
}
|
||||
|
||||
_restrictions->validate_primary_key(options);
|
||||
|
||||
if (has_conditions()) {
|
||||
co_return co_await execute_with_condition(qp, qs, options);
|
||||
auto result = co_await execute_with_condition(qp, qs, options);
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
|
||||
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
|
||||
}
|
||||
co_return result;
|
||||
}
|
||||
|
||||
json_cache_opt json_cache = maybe_prepare_json_cache(options);
|
||||
@@ -290,6 +302,9 @@ modification_statement::do_execute(query_processor& qp, service::query_state& qs
|
||||
}
|
||||
|
||||
auto result = seastar::make_shared<cql_transport::messages::result_message::void_message>();
|
||||
if (guardrail_state == query_processor::write_consistency_guardrail_state::WARN) {
|
||||
result->add_warning(format("Write with consistency level {} is warned by guardrail configuration", cl));
|
||||
}
|
||||
if (keys_size_one) {
|
||||
auto&& table = s->table();
|
||||
if (_may_use_token_aware_routing && table.uses_tablets() && qs.get_client_state().is_protocol_extension_set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1)) {
|
||||
|
||||
@@ -94,7 +94,7 @@ future<> create_role_statement::check_access(query_processor& qp, const service:
|
||||
return;
|
||||
}
|
||||
|
||||
const bool has_superuser = auth::has_superuser(*state.get_auth_service(), *state.user()).get();
|
||||
const bool has_superuser = state.has_superuser().get();
|
||||
|
||||
if (_options.hashed_password && !has_superuser) {
|
||||
throw exceptions::unauthorized_exception("Only superusers can create a role with a hashed password.");
|
||||
@@ -213,7 +213,7 @@ future<> alter_role_statement::check_access(query_processor& qp, const service::
|
||||
auto& as = *state.get_auth_service();
|
||||
|
||||
const auto& user = *state.user();
|
||||
const bool user_is_superuser = auth::has_superuser(as, user).get();
|
||||
const bool user_is_superuser = state.has_superuser().get();
|
||||
|
||||
if (_options.is_superuser) {
|
||||
if (!user_is_superuser) {
|
||||
@@ -306,7 +306,7 @@ future<> drop_role_statement::check_access(query_processor& qp, const service::c
|
||||
|
||||
auto& as = *state.get_auth_service();
|
||||
|
||||
const bool user_is_superuser = auth::has_superuser(as, *state.user()).get();
|
||||
const bool user_is_superuser = state.has_superuser().get();
|
||||
|
||||
const bool role_has_superuser = [this, &as] {
|
||||
try {
|
||||
@@ -442,7 +442,7 @@ list_roles_statement::execute(query_processor& qp, service::query_state& state,
|
||||
const auto& cs = state.get_client_state();
|
||||
const auto& as = *cs.get_auth_service();
|
||||
|
||||
return auth::has_superuser(as, *cs.user()).then([this, &cs, &as, make_results = std::move(make_results)](bool super) mutable {
|
||||
return cs.has_superuser().then([this, &cs, &as, make_results = std::move(make_results)](bool super) mutable {
|
||||
auto& rm = as.underlying_role_manager();
|
||||
const auto& a = as.underlying_authenticator();
|
||||
const auto query_mode = _recursive ? auth::recursive_role_query::yes : auth::recursive_role_query::no;
|
||||
|
||||
@@ -2757,11 +2757,7 @@ select_statement::ordering_comparator_type select_statement::get_ordering_compar
|
||||
// even if we don't
|
||||
// ultimately ship them to the client (CASSANDRA-4911).
|
||||
for (auto&& [column_def, is_descending] : orderings) {
|
||||
auto index = selection.index_of(*column_def);
|
||||
if (index < 0) {
|
||||
index = selection.add_column_for_post_processing(*column_def);
|
||||
}
|
||||
|
||||
auto index = selection.add_column_for_post_processing(*column_def);
|
||||
sorters.emplace_back(index, column_def->type);
|
||||
}
|
||||
|
||||
@@ -2864,9 +2860,7 @@ void select_statement::ensure_filtering_columns_retrieval(data_dictionary::datab
|
||||
selection::selection& selection,
|
||||
const restrictions::statement_restrictions& restrictions) {
|
||||
for (auto&& cdef : restrictions.get_column_defs_for_filtering(db)) {
|
||||
if (!selection.has_column(*cdef)) {
|
||||
selection.add_column_for_post_processing(*cdef);
|
||||
}
|
||||
selection.add_column_for_post_processing(*cdef);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "cql3/statements/statement_type.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
@@ -87,6 +88,9 @@ struct cql_stats {
|
||||
|
||||
uint64_t replication_strategy_warn_list_violations = 0;
|
||||
uint64_t replication_strategy_fail_list_violations = 0;
|
||||
uint64_t writes_per_consistency_level[size_t(db::consistency_level::MAX_VALUE) + 1] = {};
|
||||
uint64_t write_consistency_levels_disallowed_violations = 0;
|
||||
uint64_t write_consistency_levels_warned_violations = 0;
|
||||
|
||||
private:
|
||||
uint64_t _unpaged_select_queries[(size_t)ks_selector::SIZE] = {0ul};
|
||||
|
||||
@@ -199,18 +199,9 @@ class cache_mutation_reader final : public mutation_reader::impl {
|
||||
return *_snp->schema();
|
||||
}
|
||||
|
||||
gc_clock::time_point get_read_time() {
|
||||
return _read_context.tombstone_gc_state() ? gc_clock::now() : gc_clock::time_point::min();
|
||||
}
|
||||
|
||||
gc_clock::time_point get_gc_before() {
|
||||
if (!_gc_before.has_value()) {
|
||||
auto gc_state = _read_context.tombstone_gc_state();
|
||||
if (gc_state) {
|
||||
_gc_before = gc_state->with_commitlog_check_disabled().get_gc_before_for_key(_schema, _dk, _read_time);
|
||||
} else {
|
||||
_gc_before = gc_clock::time_point::min();
|
||||
}
|
||||
_gc_before = _read_context.tombstone_gc_state().with_commitlog_check_disabled().get_gc_before_for_key(_schema, _dk, _read_time);
|
||||
}
|
||||
return *_gc_before;
|
||||
}
|
||||
@@ -242,7 +233,7 @@ public:
|
||||
, _read_context_holder()
|
||||
, _read_context(ctx) // ctx is owned by the caller, who's responsible for closing it.
|
||||
, _next_row(*_schema, *_snp, false, _read_context.is_reversed())
|
||||
, _read_time(get_read_time())
|
||||
, _read_time(gc_clock::now())
|
||||
{
|
||||
clogger.trace("csm {}: table={}.{}, dk={}, reversed={}, snap={}",
|
||||
fmt::ptr(this),
|
||||
@@ -801,7 +792,7 @@ void cache_mutation_reader::copy_from_cache_to_buffer() {
|
||||
if (_next_row_in_range) {
|
||||
bool remove_row = false;
|
||||
|
||||
if (_read_context.tombstone_gc_state() // do not compact rows when tombstone_gc_state is not set (used in some unit tests)
|
||||
if (_read_context.tombstone_gc_state().is_gc_enabled() // do not compact rows when set to no_gc() (used in some unit tests)
|
||||
&& !_next_row.dummy()
|
||||
&& _snp->at_latest_version()
|
||||
&& _snp->at_oldest_version()) {
|
||||
|
||||
60
db/config.cc
60
db/config.cc
@@ -266,6 +266,13 @@ const config_type& config_type_for<std::vector<enum_option<db::replication_strat
|
||||
return ct;
|
||||
}
|
||||
|
||||
template <>
|
||||
const config_type& config_type_for<std::vector<enum_option<db::consistency_level_restriction_t>>>() {
|
||||
static config_type ct(
|
||||
"consistency level list", printable_vector_to_json<enum_option<db::consistency_level_restriction_t>>);
|
||||
return ct;
|
||||
}
|
||||
|
||||
template <>
|
||||
const config_type& config_type_for<enum_option<db::tri_mode_restriction_t>>() {
|
||||
static config_type ct(
|
||||
@@ -415,6 +422,23 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
class convert<enum_option<db::consistency_level_restriction_t>> {
|
||||
public:
|
||||
static bool decode(const Node& node, enum_option<db::consistency_level_restriction_t>& rhs) {
|
||||
std::string name;
|
||||
if (!convert<std::string>::decode(node, name)) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
std::istringstream(name) >> rhs;
|
||||
} catch (boost::program_options::invalid_option_value&) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
class convert<enum_option<db::tri_mode_restriction_t>> {
|
||||
public:
|
||||
@@ -1066,7 +1090,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.")
|
||||
, native_transport_port(this, "native_transport_port", "cql_port", value_status::Used, 9042,
|
||||
"Port on which the CQL native transport listens for clients.")
|
||||
, maintenance_socket(this, "maintenance_socket", value_status::Used, "ignore",
|
||||
, maintenance_socket(this, "maintenance_socket", value_status::Used, "workdir",
|
||||
"The Unix Domain Socket the node uses for maintenance socket.\n"
|
||||
"The possible options are:\n"
|
||||
"\tignore the node will not open the maintenance socket.\n"
|
||||
@@ -1515,10 +1539,15 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Ignored if authentication tables already contain a super user password.")
|
||||
, auth_certificate_role_queries(this, "auth_certificate_role_queries", value_status::Used, { { { "source", "SUBJECT" }, {"query", "CN=([^,]+)" } } },
|
||||
"Regular expression used by CertificateAuthenticator to extract role name from an accepted transport authentication certificate subject info.")
|
||||
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
|
||||
, minimum_replication_factor_fail_threshold(this, "minimum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
|
||||
, minimum_replication_factor_warn_threshold(this, "minimum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, 3, "")
|
||||
, maximum_replication_factor_warn_threshold(this, "maximum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
|
||||
, maximum_replication_factor_fail_threshold(this, "maximum_replication_factor_fail_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
|
||||
, maximum_replication_factor_warn_threshold(this, "maximum_replication_factor_warn_threshold", liveness::LiveUpdate, value_status::Used, -1, "")
|
||||
, replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
|
||||
, replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
|
||||
, write_consistency_levels_disallowed(this, "write_consistency_levels_disallowed", liveness::LiveUpdate, value_status::Used, {}, "A list of consistency levels that are not allowed for write operations. Requests using these levels will fail.")
|
||||
, write_consistency_levels_warned(this, "write_consistency_levels_warned", liveness::LiveUpdate, value_status::Used, {}, "A list of consistency levels that will trigger a warning when used in write operations. Requests using these levels will contain a warning in the query response.")
|
||||
, tablets_initial_scale_factor(this, "tablets_initial_scale_factor", liveness::LiveUpdate, value_status::Used, 10,
|
||||
"Minimum average number of tablet replicas per shard per table. Suppressed by tablet options in table's schema: min_per_shard_tablet_count and min_tablet_count")
|
||||
, tablets_per_shard_goal(this, "tablets_per_shard_goal", liveness::LiveUpdate, value_status::Used, 100,
|
||||
@@ -1531,8 +1560,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Maximum number of tablets which may be leaving a shard at the same time. Effecting only on topology coordinator. Set to the same value on all nodes.")
|
||||
, tablet_streaming_write_concurrency_per_shard(this, "tablet_streaming_write_concurrency_per_shard", liveness::LiveUpdate, value_status::Used, 2,
|
||||
"Maximum number of tablets which may be pending on a shard at the same time. Effecting only on topology coordinator. Set to the same value on all nodes.")
|
||||
, replication_strategy_warn_list(this, "replication_strategy_warn_list", liveness::LiveUpdate, value_status::Used, {locator::replication_strategy_type::simple}, "Controls which replication strategies to warn about when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
|
||||
, replication_strategy_fail_list(this, "replication_strategy_fail_list", liveness::LiveUpdate, value_status::Used, {}, "Controls which replication strategies are disallowed to be used when creating/altering a keyspace. Doesn't affect the pre-existing keyspaces.")
|
||||
, service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table")
|
||||
|
||||
, audit(this, "audit", value_status::Used, "table",
|
||||
@@ -1570,7 +1597,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, disk_space_monitor_high_polling_interval_in_seconds(this, "disk_space_monitor_high_polling_interval_in_seconds", value_status::Used, 1, "Disk-space polling interval at or above polling threshold")
|
||||
, disk_space_monitor_polling_interval_threshold(this, "disk_space_monitor_polling_interval_threshold", value_status::Used, 0.9, "Disk-space polling threshold. Polling interval is increased when disk utilization is greater than or equal to this threshold")
|
||||
, critical_disk_utilization_level(this, "critical_disk_utilization_level", liveness::LiveUpdate, value_status::Used, 0.98, "Disk utilization level above which mechanisms preventing a node getting out of space are activated")
|
||||
, enable_create_table_with_compact_storage(this, "enable_create_table_with_compact_storage", liveness::LiveUpdate, value_status::Used, false, "Enable the deprecated feature of CREATE TABLE WITH COMPACT STORAGE. This feature will eventually be removed in a future version.")
|
||||
, rf_rack_valid_keyspaces(this, "rf_rack_valid_keyspaces", liveness::MustRestart, value_status::Used, false,
|
||||
"Enforce RF-rack-valid keyspaces. Additionally, if there are existing RF-rack-invalid "
|
||||
"keyspaces, attempting to start a node with this option ON will fail. "
|
||||
@@ -1843,6 +1869,30 @@ std::unordered_map<sstring, locator::replication_strategy_type> db::replication_
|
||||
{"EverywhereStrategy", locator::replication_strategy_type::everywhere_topology}};
|
||||
}
|
||||
|
||||
std::unordered_map<sstring, db::consistency_level> db::consistency_level_restriction_t::map() {
|
||||
using cl = db::consistency_level;
|
||||
std::unordered_map<sstring, cl> result = {
|
||||
{"ANY", cl::ANY},
|
||||
{"ONE", cl::ONE},
|
||||
{"TWO", cl::TWO},
|
||||
{"THREE", cl::THREE},
|
||||
{"QUORUM", cl::QUORUM},
|
||||
{"ALL", cl::ALL},
|
||||
{"LOCAL_QUORUM", cl::LOCAL_QUORUM},
|
||||
{"EACH_QUORUM", cl::EACH_QUORUM},
|
||||
{"SERIAL", cl::SERIAL},
|
||||
{"LOCAL_SERIAL", cl::LOCAL_SERIAL},
|
||||
{"LOCAL_ONE", cl::LOCAL_ONE},
|
||||
};
|
||||
|
||||
constexpr auto expected_size = static_cast<size_t>(cl::MAX_VALUE) - static_cast<size_t>(cl::MIN_VALUE) + 1;
|
||||
if (result.size() != expected_size) {
|
||||
on_internal_error_noexcept(dblog, format("consistency_level_option::map() has {} entries but expected {}", result.size(), expected_size));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
|
||||
std::vector<enum_option<db::experimental_features_t>> ret;
|
||||
for (const auto& f : db::experimental_features_t::map()) {
|
||||
|
||||
18
db/config.hh
18
db/config.hh
@@ -24,6 +24,7 @@
|
||||
#include "utils/error_injection.hh"
|
||||
#include "message/dict_trainer.hh"
|
||||
#include "message/advanced_rpc_compressor.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/tri_mode_restriction.hh"
|
||||
#include "sstables/compressor.hh"
|
||||
|
||||
@@ -126,6 +127,10 @@ struct replication_strategy_restriction_t {
|
||||
static std::unordered_map<sstring, locator::replication_strategy_type> map(); // for enum_option<>
|
||||
};
|
||||
|
||||
struct consistency_level_restriction_t {
|
||||
static std::unordered_map<sstring, db::consistency_level> map(); // for enum_option<>
|
||||
};
|
||||
|
||||
constexpr unsigned default_murmur3_partitioner_ignore_msb_bits = 12;
|
||||
|
||||
struct tablets_mode_t {
|
||||
@@ -534,10 +539,16 @@ public:
|
||||
|
||||
named_value<std::vector<std::unordered_map<sstring, sstring>>> auth_certificate_role_queries;
|
||||
|
||||
// guardrails options
|
||||
named_value<bool> enable_create_table_with_compact_storage;
|
||||
named_value<int> minimum_replication_factor_fail_threshold;
|
||||
named_value<int> minimum_replication_factor_warn_threshold;
|
||||
named_value<int> maximum_replication_factor_warn_threshold;
|
||||
named_value<int> maximum_replication_factor_fail_threshold;
|
||||
named_value<int> maximum_replication_factor_warn_threshold;
|
||||
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_fail_list;
|
||||
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;
|
||||
named_value<std::vector<enum_option<consistency_level_restriction_t>>> write_consistency_levels_disallowed;
|
||||
named_value<std::vector<enum_option<consistency_level_restriction_t>>> write_consistency_levels_warned;
|
||||
|
||||
named_value<double> tablets_initial_scale_factor;
|
||||
named_value<unsigned> tablets_per_shard_goal;
|
||||
@@ -545,9 +556,6 @@ public:
|
||||
named_value<unsigned> tablet_streaming_read_concurrency_per_shard;
|
||||
named_value<unsigned> tablet_streaming_write_concurrency_per_shard;
|
||||
|
||||
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_warn_list;
|
||||
named_value<std::vector<enum_option<replication_strategy_restriction_t>>> replication_strategy_fail_list;
|
||||
|
||||
named_value<uint32_t> service_levels_interval;
|
||||
|
||||
named_value<sstring> audit;
|
||||
@@ -598,8 +606,6 @@ public:
|
||||
named_value<float> disk_space_monitor_polling_interval_threshold;
|
||||
named_value<float> critical_disk_utilization_level;
|
||||
|
||||
named_value<bool> enable_create_table_with_compact_storage;
|
||||
|
||||
named_value<bool> rf_rack_valid_keyspaces;
|
||||
named_value<bool> enforce_rack_list;
|
||||
|
||||
|
||||
@@ -154,7 +154,10 @@ hint_sender::~hint_sender() {
|
||||
|
||||
|
||||
future<> hint_sender::stop(drain should_drain) noexcept {
|
||||
return seastar::async([this, should_drain] {
|
||||
seastar::thread_attributes attr;
|
||||
|
||||
attr.sched_group = _hints_cpu_sched_group;
|
||||
return seastar::async(std::move(attr), [this, should_drain] {
|
||||
set_stopping();
|
||||
_stop_as.request_abort();
|
||||
_stopped.get();
|
||||
|
||||
@@ -125,7 +125,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
bool _range_query;
|
||||
const tombstone_gc_state* _tombstone_gc_state;
|
||||
tombstone_gc_state _tombstone_gc_state;
|
||||
max_purgeable_fn _get_max_purgeable;
|
||||
// When reader enters a partition, it must be set up for reading that
|
||||
// partition from the underlying mutation source (_underlying) in one of two ways:
|
||||
@@ -149,7 +149,7 @@ public:
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const tombstone_gc_state* gc_state,
|
||||
tombstone_gc_state gc_state,
|
||||
max_purgeable_fn get_max_purgeable,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
@@ -161,7 +161,7 @@ public:
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _range_query(!query::is_single_partition(range))
|
||||
, _tombstone_gc_state(gc_state)
|
||||
, _tombstone_gc_state(std::move(gc_state))
|
||||
, _get_max_purgeable(std::move(get_max_purgeable))
|
||||
, _underlying(_cache, *this)
|
||||
{
|
||||
@@ -197,7 +197,7 @@ public:
|
||||
bool partition_exists() const { return _partition_exists; }
|
||||
void on_underlying_created() { ++_underlying_created; }
|
||||
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
|
||||
const tombstone_gc_state* tombstone_gc_state() const { return _tombstone_gc_state; }
|
||||
const tombstone_gc_state& tombstone_gc_state() const { return _tombstone_gc_state; }
|
||||
max_purgeable get_max_purgeable(const dht::decorated_key& dk, is_shadowable is) const { return _get_max_purgeable(dk, is); }
|
||||
public:
|
||||
future<> ensure_underlying() {
|
||||
|
||||
@@ -775,7 +775,7 @@ row_cache::make_reader_opt(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const tombstone_gc_state* gc_state,
|
||||
tombstone_gc_state gc_state,
|
||||
max_purgeable_fn get_max_purgeable,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
|
||||
@@ -373,7 +373,7 @@ public:
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no,
|
||||
const tombstone_gc_state* gc_state = nullptr,
|
||||
tombstone_gc_state gc_state = tombstone_gc_state::no_gc(),
|
||||
max_purgeable_fn get_max_purgeable = can_never_purge) {
|
||||
if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(get_max_purgeable), std::move(trace_state), fwd, fwd_mr)) {
|
||||
return std::move(*reader_opt);
|
||||
@@ -386,7 +386,7 @@ public:
|
||||
reader_permit permit,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
const tombstone_gc_state*,
|
||||
tombstone_gc_state,
|
||||
max_purgeable_fn get_max_purgeable,
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
@@ -395,7 +395,7 @@ public:
|
||||
mutation_reader make_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range = query::full_partition_range,
|
||||
const tombstone_gc_state* gc_state = nullptr,
|
||||
tombstone_gc_state gc_state = tombstone_gc_state::no_gc(),
|
||||
max_purgeable_fn get_max_purgeable = can_never_purge) {
|
||||
auto& full_slice = s->full_slice();
|
||||
return make_reader(std::move(s), std::move(permit), range, full_slice, nullptr,
|
||||
|
||||
@@ -1139,14 +1139,17 @@ future<> schema_applier::finalize_tables_and_views() {
|
||||
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
|
||||
for (auto& dropped_view : diff.tables_and_views.local().views.dropped) {
|
||||
auto s = dropped_view.get();
|
||||
co_await _ss.local().on_cleanup_for_drop_table(s->id());
|
||||
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
|
||||
}
|
||||
for (auto& dropped_table : diff.tables_and_views.local().tables.dropped) {
|
||||
auto s = dropped_table.get();
|
||||
co_await _ss.local().on_cleanup_for_drop_table(s->id());
|
||||
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
|
||||
}
|
||||
for (auto& dropped_cdc : diff.tables_and_views.local().cdc.dropped) {
|
||||
auto s = dropped_cdc.get();
|
||||
co_await _ss.local().on_cleanup_for_drop_table(s->id());
|
||||
co_await replica::database::cleanup_drop_table_on_all_shards(sharded_db, _sys_ks, true, diff.table_shards[s->id()]);
|
||||
}
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ namespace {
|
||||
schema_builder::register_schema_initializer([](schema_builder& builder) {
|
||||
if (builder.ks_name() == schema_tables::NAME) {
|
||||
// all schema tables are group0 tables
|
||||
builder.set_is_group0_table(true);
|
||||
builder.set_is_group0_table();
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -2840,20 +2840,15 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
|
||||
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
|
||||
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);
|
||||
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
||||
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks.query_processor().execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
db::consistency_level::LOCAL_ONE,
|
||||
{table_id.uuid(), version.uuid()},
|
||||
cql3::query_processor::cache_internal::no
|
||||
);
|
||||
if (results->empty()) {
|
||||
// If we don't have a stored column_mapping for an obsolete schema version
|
||||
// then it means it's way too old and been cleaned up already.
|
||||
// Fail the whole learn stage in this case.
|
||||
co_await coroutine::return_exception(std::runtime_error(
|
||||
format("Failed to look up column mapping for schema version {}",
|
||||
version)));
|
||||
co_return std::nullopt;
|
||||
}
|
||||
std::vector<column_definition> static_columns, regular_columns;
|
||||
for (const auto& row : *results) {
|
||||
@@ -2881,6 +2876,18 @@ future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_i
|
||||
co_return std::move(cm);
|
||||
}
|
||||
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version) {
|
||||
auto cm_opt = co_await schema_tables::get_column_mapping_if_exists(sys_ks, table_id, version);
|
||||
if (!cm_opt) {
|
||||
// If we don't have a stored column_mapping for an obsolete schema version
|
||||
// then it means it's way too old and been cleaned up already.
|
||||
co_await coroutine::return_exception(std::runtime_error(
|
||||
format("Failed to look up column mapping for schema version {}",
|
||||
version)));
|
||||
}
|
||||
co_return std::move(*cm_opt);
|
||||
}
|
||||
|
||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version) {
|
||||
shared_ptr<cql3::untyped_result_set> results = co_await sys_ks._qp.execute_internal(
|
||||
GET_COLUMN_MAPPING_QUERY,
|
||||
|
||||
@@ -320,6 +320,8 @@ std::optional<std::map<K, V>> get_map(const query::result_set_row& row, const ss
|
||||
future<> store_column_mapping(sharded<service::storage_proxy>& proxy, schema_ptr s, bool with_ttl);
|
||||
/// Query column mapping for a given version of the table locally.
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Returns the same result as `get_column_mapping()` wrapped in optional and returns nullopt if the mapping doesn't exist.
|
||||
future<std::optional<column_mapping>> get_column_mapping_if_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Check that column mapping exists for a given version of the table
|
||||
future<bool> column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
|
||||
/// Delete matching column mapping entries from the `system.scylla_table_schema_history` table
|
||||
|
||||
@@ -87,31 +87,15 @@ namespace {
|
||||
static const std::unordered_set<sstring> tables = {
|
||||
schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY,
|
||||
system_keyspace::BROADCAST_KV_STORE,
|
||||
system_keyspace::CDC_GENERATIONS_V3,
|
||||
system_keyspace::RAFT,
|
||||
system_keyspace::RAFT_SNAPSHOTS,
|
||||
system_keyspace::RAFT_SNAPSHOT_CONFIG,
|
||||
system_keyspace::GROUP0_HISTORY,
|
||||
system_keyspace::DISCOVERY,
|
||||
system_keyspace::TABLETS,
|
||||
system_keyspace::TOPOLOGY,
|
||||
system_keyspace::TOPOLOGY_REQUESTS,
|
||||
system_keyspace::LOCAL,
|
||||
system_keyspace::PEERS,
|
||||
system_keyspace::SCYLLA_LOCAL,
|
||||
system_keyspace::COMMITLOG_CLEANUPS,
|
||||
system_keyspace::SERVICE_LEVELS_V2,
|
||||
system_keyspace::VIEW_BUILD_STATUS_V2,
|
||||
system_keyspace::CDC_STREAMS_STATE,
|
||||
system_keyspace::CDC_STREAMS_HISTORY,
|
||||
system_keyspace::ROLES,
|
||||
system_keyspace::ROLE_MEMBERS,
|
||||
system_keyspace::ROLE_ATTRIBUTES,
|
||||
system_keyspace::ROLE_PERMISSIONS,
|
||||
system_keyspace::CDC_LOCAL,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
};
|
||||
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
||||
builder.enable_schema_commitlog();
|
||||
@@ -143,7 +127,7 @@ namespace {
|
||||
system_keyspace::REPAIR_TASKS,
|
||||
};
|
||||
if (builder.ks_name() == system_keyspace::NAME && tables.contains(builder.cf_name())) {
|
||||
builder.set_is_group0_table(true);
|
||||
builder.set_is_group0_table();
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -416,26 +400,7 @@ schema_ptr system_keyspace::cdc_streams_history() {
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::raft() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, RAFT);
|
||||
return schema_builder(NAME, RAFT, std::optional(id))
|
||||
.with_column("group_id", timeuuid_type, column_kind::partition_key)
|
||||
// raft log part
|
||||
.with_column("index", long_type, column_kind::clustering_key)
|
||||
.with_column("term", long_type)
|
||||
.with_column("data", bytes_type) // decltype(raft::log_entry::data) - serialized variant
|
||||
// persisted term and vote
|
||||
.with_column("vote_term", long_type, column_kind::static_column)
|
||||
.with_column("vote", uuid_type, column_kind::static_column)
|
||||
// id of the most recent persisted snapshot
|
||||
.with_column("snapshot_id", uuid_type, column_kind::static_column)
|
||||
.with_column("commit_idx", long_type, column_kind::static_column)
|
||||
|
||||
.set_comment("Persisted RAFT log, votes and snapshot info")
|
||||
.with_hash_version()
|
||||
.set_caching_options(caching_options::get_disabled_caching_options())
|
||||
.build();
|
||||
}();
|
||||
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT, true);
|
||||
return schema;
|
||||
}
|
||||
|
||||
@@ -443,35 +408,32 @@ schema_ptr system_keyspace::raft() {
|
||||
// on user-provided state machine and could be stored anywhere else in any other form.
|
||||
// This should be seen as a snapshot descriptor, instead.
|
||||
schema_ptr system_keyspace::raft_snapshots() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, RAFT_SNAPSHOTS);
|
||||
return schema_builder(NAME, RAFT_SNAPSHOTS, std::optional(id))
|
||||
.with_column("group_id", timeuuid_type, column_kind::partition_key)
|
||||
.with_column("snapshot_id", uuid_type)
|
||||
// Index and term of last entry in the snapshot
|
||||
.with_column("idx", long_type)
|
||||
.with_column("term", long_type)
|
||||
|
||||
.set_comment("Persisted RAFT snapshot descriptors info")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_SNAPSHOTS, true);
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::raft_snapshot_config() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG);
|
||||
return schema_builder(system_keyspace::NAME, RAFT_SNAPSHOT_CONFIG, std::optional(id))
|
||||
.with_column("group_id", timeuuid_type, column_kind::partition_key)
|
||||
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
|
||||
.with_column("server_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("can_vote", boolean_type)
|
||||
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_SNAPSHOT_CONFIG, true);
|
||||
return schema;
|
||||
}
|
||||
|
||||
.set_comment("RAFT configuration for the latest snapshot descriptor")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
// Raft tables for strongly consistent tablets.
|
||||
// These tables have partition keys of the form (shard, group_id), allowing the data
|
||||
// to be co-located with the tablet replica that owns the raft group.
|
||||
// The raft_groups_partitioner creates tokens that map to the specified shard.
|
||||
|
||||
schema_ptr system_keyspace::raft_groups() {
|
||||
static thread_local auto schema = replica::make_raft_schema(db::system_keyspace::RAFT_GROUPS, false);
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::raft_groups_snapshots() {
|
||||
static thread_local auto schema = replica::make_raft_snapshots_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOTS, false);
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::raft_groups_snapshot_config() {
|
||||
static thread_local auto schema = replica::make_raft_snapshot_config_schema(db::system_keyspace::RAFT_GROUPS_SNAPSHOT_CONFIG, false);
|
||||
return schema;
|
||||
}
|
||||
|
||||
@@ -2316,21 +2278,29 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
r.insert(r.end(), {sstables_registry()});
|
||||
}
|
||||
|
||||
if (cfg.check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES)) {
|
||||
r.insert(r.end(), {raft_groups(), raft_groups_snapshots(), raft_groups_snapshot_config()});
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
static bool maybe_write_in_user_memory(schema_ptr s) {
|
||||
static bool maybe_write_in_user_memory(schema_ptr s, replica::database& db) {
|
||||
bool strongly_consistent = db.get_config().check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES);
|
||||
return (s.get() == system_keyspace::batchlog().get())
|
||||
|| (s.get() == system_keyspace::batchlog_v2().get())
|
||||
|| (s.get() == system_keyspace::paxos().get())
|
||||
|| s == system_keyspace::scylla_views_builds_in_progress();
|
||||
|| s == system_keyspace::scylla_views_builds_in_progress()
|
||||
|| (strongly_consistent && s == system_keyspace::raft_groups())
|
||||
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshots())
|
||||
|| (strongly_consistent && s == system_keyspace::raft_groups_snapshot_config());
|
||||
}
|
||||
|
||||
future<> system_keyspace::make(
|
||||
locator::effective_replication_map_factory& erm_factory,
|
||||
replica::database& db) {
|
||||
for (auto&& table : system_keyspace::all_tables(db.get_config())) {
|
||||
co_await db.create_local_system_table(table, maybe_write_in_user_memory(table), erm_factory);
|
||||
co_await db.create_local_system_table(table, maybe_write_in_user_memory(table, db), erm_factory);
|
||||
co_await db.find_column_family(table).init_storage();
|
||||
}
|
||||
|
||||
|
||||
@@ -191,6 +191,9 @@ public:
|
||||
static constexpr auto RAFT = "raft";
|
||||
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
|
||||
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
|
||||
static constexpr auto RAFT_GROUPS = "raft_groups";
|
||||
static constexpr auto RAFT_GROUPS_SNAPSHOTS = "raft_groups_snapshots";
|
||||
static constexpr auto RAFT_GROUPS_SNAPSHOT_CONFIG = "raft_groups_snapshot_config";
|
||||
static constexpr auto REPAIR_HISTORY = "repair_history";
|
||||
static constexpr auto REPAIR_TASKS = "repair_tasks";
|
||||
static constexpr auto GROUP0_HISTORY = "group0_history";
|
||||
@@ -244,6 +247,9 @@ public:
|
||||
static schema_ptr scylla_local();
|
||||
static schema_ptr raft();
|
||||
static schema_ptr raft_snapshots();
|
||||
static schema_ptr raft_groups();
|
||||
static schema_ptr raft_groups_snapshots();
|
||||
static schema_ptr raft_groups_snapshot_config();
|
||||
static schema_ptr repair_history();
|
||||
static schema_ptr repair_tasks();
|
||||
static schema_ptr group0_history();
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "db/tablet_options.hh"
|
||||
#include <seastar/core/bitops.hh>
|
||||
#include "utils/log.hh"
|
||||
|
||||
extern logging::logger dblog;
|
||||
@@ -23,6 +24,11 @@ tablet_options::tablet_options(const map_type& map) {
|
||||
min_tablet_count.emplace(value);
|
||||
}
|
||||
break;
|
||||
case tablet_option_type::max_tablet_count:
|
||||
if (auto value = std::atol(value_str.c_str())) {
|
||||
max_tablet_count.emplace(value);
|
||||
}
|
||||
break;
|
||||
case tablet_option_type::min_per_shard_tablet_count:
|
||||
if (auto value = std::atof(value_str.c_str())) {
|
||||
min_per_shard_tablet_count.emplace(value);
|
||||
@@ -40,6 +46,7 @@ tablet_options::tablet_options(const map_type& map) {
|
||||
sstring tablet_options::to_string(tablet_option_type hint) {
|
||||
switch (hint) {
|
||||
case tablet_option_type::min_tablet_count: return "min_tablet_count";
|
||||
case tablet_option_type::max_tablet_count: return "max_tablet_count";
|
||||
case tablet_option_type::min_per_shard_tablet_count: return "min_per_shard_tablet_count";
|
||||
case tablet_option_type::expected_data_size_in_gb: return "expected_data_size_in_gb";
|
||||
}
|
||||
@@ -48,6 +55,8 @@ sstring tablet_options::to_string(tablet_option_type hint) {
|
||||
tablet_option_type tablet_options::from_string(sstring hint_desc) {
|
||||
if (hint_desc == "min_tablet_count") {
|
||||
return tablet_option_type::min_tablet_count;
|
||||
} else if (hint_desc == "max_tablet_count") {
|
||||
return tablet_option_type::max_tablet_count;
|
||||
} else if (hint_desc == "min_per_shard_tablet_count") {
|
||||
return tablet_option_type::min_per_shard_tablet_count;
|
||||
} else if (hint_desc == "expected_data_size_in_gb") {
|
||||
@@ -62,6 +71,9 @@ std::map<sstring, sstring> tablet_options::to_map() const {
|
||||
if (min_tablet_count) {
|
||||
res[to_string(tablet_option_type::min_tablet_count)] = fmt::to_string(*min_tablet_count);
|
||||
}
|
||||
if (max_tablet_count) {
|
||||
res[to_string(tablet_option_type::max_tablet_count)] = fmt::to_string(*max_tablet_count);
|
||||
}
|
||||
if (min_per_shard_tablet_count) {
|
||||
res[to_string(tablet_option_type::min_per_shard_tablet_count)] = fmt::to_string(*min_per_shard_tablet_count);
|
||||
}
|
||||
@@ -72,11 +84,23 @@ std::map<sstring, sstring> tablet_options::to_map() const {
|
||||
}
|
||||
|
||||
void tablet_options::validate(const map_type& map) {
|
||||
std::optional<ssize_t> min_tablets;
|
||||
std::optional<ssize_t> max_tablets;
|
||||
|
||||
for (auto& [key, value_str] : map) {
|
||||
switch (tablet_options::from_string(key)) {
|
||||
case tablet_option_type::min_tablet_count:
|
||||
if (auto value = std::atol(value_str.c_str()); value < 0) {
|
||||
throw exceptions::configuration_exception(format("Invalid value '{}' for min_tablet_count", value));
|
||||
} else {
|
||||
min_tablets = value;
|
||||
}
|
||||
break;
|
||||
case tablet_option_type::max_tablet_count:
|
||||
if (auto value = std::atol(value_str.c_str()); value <= 0) {
|
||||
throw exceptions::configuration_exception(format("Invalid value '{}' for max_tablet_count", value));
|
||||
} else {
|
||||
max_tablets = value;
|
||||
}
|
||||
break;
|
||||
case tablet_option_type::min_per_shard_tablet_count:
|
||||
@@ -91,6 +115,20 @@ void tablet_options::validate(const map_type& map) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (min_tablets && max_tablets) {
|
||||
auto effective_min = 1u << log2ceil(static_cast<size_t>(*min_tablets));
|
||||
auto effective_max = 1u << log2floor(static_cast<size_t>(*max_tablets));
|
||||
|
||||
if (effective_min > effective_max) {
|
||||
throw exceptions::configuration_exception(
|
||||
format("Invalid tablet count range: min_tablet_count={} (effective: {}) and max_tablet_count={} (effective: {}) "
|
||||
"result in conflicting constraints after rounding to powers of 2. "
|
||||
"Since tablet counts must be powers of 2, min_tablet_count rounds up and max_tablet_count rounds down"
|
||||
"Please adjust the values so that the smallest power of 2 greater than min_tablet_count is <= the largest power of 2 <= max_tablet_count.",
|
||||
*min_tablets, effective_min, *max_tablets, effective_max));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
|
||||
@@ -18,6 +18,7 @@ namespace db {
|
||||
// Per-table tablet options
|
||||
enum class tablet_option_type {
|
||||
min_tablet_count,
|
||||
max_tablet_count,
|
||||
min_per_shard_tablet_count,
|
||||
expected_data_size_in_gb,
|
||||
};
|
||||
@@ -26,6 +27,7 @@ struct tablet_options {
|
||||
using map_type = std::map<sstring, sstring>;
|
||||
|
||||
std::optional<ssize_t> min_tablet_count;
|
||||
std::optional<ssize_t> max_tablet_count;
|
||||
std::optional<double> min_per_shard_tablet_count;
|
||||
std::optional<ssize_t> expected_data_size_in_gb;
|
||||
|
||||
@@ -33,7 +35,7 @@ struct tablet_options {
|
||||
explicit tablet_options(const map_type& map);
|
||||
|
||||
operator bool() const noexcept {
|
||||
return min_tablet_count || min_per_shard_tablet_count || expected_data_size_in_gb;
|
||||
return min_tablet_count || max_tablet_count || min_per_shard_tablet_count || expected_data_size_in_gb;
|
||||
}
|
||||
|
||||
map_type to_map() const;
|
||||
|
||||
@@ -932,8 +932,7 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
|
||||
const row& existing_row = existing.cells();
|
||||
const row& updated_row = update.cells();
|
||||
|
||||
const bool base_has_nonexpiring_marker = update.marker().is_live() && !update.marker().is_expiring();
|
||||
return std::ranges::all_of(_base->regular_columns(), [this, &updated_row, &existing_row, base_has_nonexpiring_marker] (const column_definition& cdef) {
|
||||
return std::ranges::all_of(_base->regular_columns(), [this, &updated_row, &existing_row] (const column_definition& cdef) {
|
||||
const auto view_it = _view->columns_by_name().find(cdef.name());
|
||||
const bool column_is_selected = view_it != _view->columns_by_name().end();
|
||||
|
||||
@@ -941,7 +940,7 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
|
||||
// as part of its PK, there are NO virtual columns corresponding to the unselected columns in the view.
|
||||
// Because of that, we don't generate view updates when the value in an unselected column is created
|
||||
// or changes.
|
||||
if (!column_is_selected && _base_info.has_base_non_pk_columns_in_view_pk) {
|
||||
if (!column_is_selected) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -950,40 +949,20 @@ bool view_updates::can_skip_view_updates(const clustering_or_static_row& update,
|
||||
return false;
|
||||
}
|
||||
|
||||
// We cannot skip if the value was created or deleted, unless we have a non-expiring marker
|
||||
// We cannot skip if the value was created or deleted
|
||||
const auto* existing_cell = existing_row.find_cell(cdef.id);
|
||||
const auto* updated_cell = updated_row.find_cell(cdef.id);
|
||||
if (existing_cell == nullptr || updated_cell == nullptr) {
|
||||
return existing_cell == updated_cell || (!column_is_selected && base_has_nonexpiring_marker);
|
||||
return existing_cell == updated_cell;
|
||||
}
|
||||
atomic_cell_view existing_cell_view = existing_cell->as_atomic_cell(cdef);
|
||||
atomic_cell_view updated_cell_view = updated_cell->as_atomic_cell(cdef);
|
||||
|
||||
// We cannot skip when a selected column is changed
|
||||
if (column_is_selected) {
|
||||
if (view_it->second->is_view_virtual()) {
|
||||
return atomic_cells_liveness_equal(existing_cell_view, updated_cell_view);
|
||||
}
|
||||
return compare_atomic_cell_for_merge(existing_cell_view, updated_cell_view) == 0;
|
||||
if (view_it->second->is_view_virtual()) {
|
||||
return atomic_cells_liveness_equal(existing_cell_view, updated_cell_view);
|
||||
}
|
||||
|
||||
// With non-expiring row marker, liveness checks below are not relevant
|
||||
if (base_has_nonexpiring_marker) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (existing_cell_view.is_live() != updated_cell_view.is_live()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// We cannot skip if the change updates TTL
|
||||
const bool existing_has_ttl = existing_cell_view.is_live_and_has_ttl();
|
||||
const bool updated_has_ttl = updated_cell_view.is_live_and_has_ttl();
|
||||
if (existing_has_ttl || updated_has_ttl) {
|
||||
return existing_has_ttl == updated_has_ttl && existing_cell_view.expiry() == updated_cell_view.expiry();
|
||||
}
|
||||
|
||||
return true;
|
||||
return compare_atomic_cell_for_merge(existing_cell_view, updated_cell_view) == 0;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1460,7 +1439,7 @@ void view_update_builder::generate_update(clustering_row&& update, std::optional
|
||||
}
|
||||
|
||||
auto dk = dht::decorate_key(*_schema, _key);
|
||||
const auto& gc_state = _base.get_compaction_manager().get_tombstone_gc_state();
|
||||
const auto gc_state = _base.get_tombstone_gc_state();
|
||||
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
|
||||
|
||||
// We allow existing to be disengaged, which we treat the same as an empty row.
|
||||
@@ -1489,7 +1468,7 @@ void view_update_builder::generate_update(static_row&& update, const tombstone&
|
||||
}
|
||||
|
||||
auto dk = dht::decorate_key(*_schema, _key);
|
||||
const auto& gc_state = _base.get_compaction_manager().get_tombstone_gc_state();
|
||||
const auto gc_state = _base.get_tombstone_gc_state();
|
||||
auto gc_before = gc_state.get_gc_before_for_key(_schema, dk, _now);
|
||||
|
||||
// We allow existing to be disengaged, which we treat the same as an empty row.
|
||||
@@ -3321,7 +3300,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
step.pslice,
|
||||
batch_size,
|
||||
query::max_partitions,
|
||||
tombstone_gc_state(nullptr));
|
||||
tombstone_gc_state::no_gc());
|
||||
auto consumer = compact_for_query<view_builder::consumer>(compaction_state, view_builder::consumer{*this, _vug.shared_from_this(), step, now});
|
||||
auto built = step.reader.consume_in_thread(std::move(consumer));
|
||||
if (auto ds = std::move(*compaction_state).detach_state()) {
|
||||
|
||||
@@ -610,7 +610,7 @@ future<> view_building_worker::do_build_range(table_id base_id, std::vector<tabl
|
||||
slice,
|
||||
query::max_rows,
|
||||
query::max_partitions,
|
||||
base_cf->get_compaction_manager().get_tombstone_gc_state());
|
||||
base_cf->get_tombstone_gc_state());
|
||||
auto consumer = compact_for_query<view_building_worker::consumer>(compaction_state, view_building_worker::consumer(
|
||||
_db,
|
||||
views_ids,
|
||||
|
||||
@@ -834,7 +834,10 @@ class clients_table : public streaming_virtual_table {
|
||||
auto& clients = cd_map[dip.ip];
|
||||
|
||||
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
|
||||
return a->port < b->port || a->client_type_str() < b->client_type_str();
|
||||
if (a->port != b->port) {
|
||||
return a->port < b->port;
|
||||
}
|
||||
return a->client_type_str() < b->client_type_str();
|
||||
});
|
||||
|
||||
for (const auto& cd : clients) {
|
||||
|
||||
@@ -4,6 +4,7 @@ add_library(scylla_dht STATIC)
|
||||
target_sources(scylla_dht
|
||||
PRIVATE
|
||||
boot_strapper.cc
|
||||
fixed_shard.cc
|
||||
i_partitioner.cc
|
||||
murmur3_partitioner.cc
|
||||
range_streamer.cc
|
||||
|
||||
156
dht/fixed_shard.cc
Normal file
156
dht/fixed_shard.cc
Normal file
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
|
||||
#include "dht/fixed_shard.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "sstables/key.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "keys/compound_compat.hh"
|
||||
#include "utils/murmur_hash.hh"
|
||||
#include "utils/log.hh"
|
||||
|
||||
namespace dht {
|
||||
|
||||
static logging::logger fslog("fixed_shard");
|
||||
|
||||
const sstring fixed_shard_partitioner::classname = "com.scylladb.dht.FixedShardPartitioner";
|
||||
|
||||
const sstring fixed_shard_partitioner::name() const {
|
||||
return classname;
|
||||
}
|
||||
|
||||
dht::token fixed_shard_partitioner::token_for_shard(uint16_t shard, uint64_t hash_bits) {
|
||||
int64_t token_value = (static_cast<int64_t>(shard) << shard_shift) | static_cast<int64_t>(hash_bits & hash_mask);
|
||||
return dht::token(token_value);
|
||||
}
|
||||
|
||||
unsigned fixed_shard_partitioner::shard_of(dht::token token) {
|
||||
uint64_t token_bits = static_cast<uint64_t>(token.raw());
|
||||
return static_cast<unsigned>(token_bits >> shard_shift);
|
||||
}
|
||||
|
||||
// Called with the bytes of the first partition key component, representing the shard.
|
||||
static uint16_t compute_shard(managed_bytes_view mb) {
|
||||
if (mb.size() != sizeof(uint16_t)) {
|
||||
on_internal_error(fslog, format("Invalid shard value size: expected {}, got {}", sizeof(uint16_t), mb.size()));
|
||||
}
|
||||
|
||||
// No need to linearize, 2 bytes are represented as a single fragment
|
||||
auto shard_bytes = mb.current_fragment();
|
||||
uint16_t shard_value = net::ntoh(read_unaligned<uint16_t>(shard_bytes.begin()));
|
||||
|
||||
if (shard_value > fixed_shard_partitioner::max_shard) {
|
||||
on_internal_error(fslog, format("Shard value {} exceeds maximum allowed shard {}", shard_value, fixed_shard_partitioner::max_shard));
|
||||
}
|
||||
|
||||
return shard_value;
|
||||
}
|
||||
|
||||
dht::token fixed_shard_partitioner::get_token(const schema& s, partition_key_view key) const {
|
||||
uint16_t shard_value = compute_shard(*key.begin());
|
||||
std::array<uint64_t, 2> hash;
|
||||
auto&& legacy = key.legacy_form(s);
|
||||
utils::murmur_hash::hash3_x64_128(legacy.begin(), legacy.size(), 0, hash);
|
||||
auto token = fixed_shard_partitioner::token_for_shard(shard_value, hash[0]);
|
||||
fslog.trace("get_token: shard={}, token={}", shard_value, token);
|
||||
return token;
|
||||
}
|
||||
|
||||
dht::token fixed_shard_partitioner::get_token(const sstables::key_view& key) const {
|
||||
return key.with_linearized([&](bytes_view v) {
|
||||
auto comp = composite_view(v, true);
|
||||
uint16_t shard_value = compute_shard(comp.begin()->first);
|
||||
std::array<uint64_t, 2> hash;
|
||||
utils::murmur_hash::hash3_x64_128(v, 0, hash);
|
||||
auto token = fixed_shard_partitioner::token_for_shard(shard_value, hash[0]);
|
||||
fslog.trace("get_token: shard={}, token={}", shard_value, token);
|
||||
return token;
|
||||
});
|
||||
}
|
||||
|
||||
using registry = class_registrator<dht::i_partitioner, fixed_shard_partitioner>;
|
||||
static registry registrator(fixed_shard_partitioner::classname);
|
||||
static registry registrator_short_name("FixedShardPartitioner");
|
||||
|
||||
fixed_shard_sharder& fixed_shard_sharder::instance() {
|
||||
static thread_local fixed_shard_sharder sharder;
|
||||
return sharder;
|
||||
}
|
||||
|
||||
fixed_shard_sharder::fixed_shard_sharder()
|
||||
: static_sharder(smp::count, 0)
|
||||
{
|
||||
}
|
||||
|
||||
unsigned fixed_shard_sharder::shard_of(const dht::token& t) const {
|
||||
if (t.is_minimum()) {
|
||||
return dht::token::shard_of_minimum_token();
|
||||
}
|
||||
if (t.is_maximum()) {
|
||||
return shard_count() - 1;
|
||||
}
|
||||
auto shard = fixed_shard_partitioner::shard_of(t);
|
||||
fslog.trace("shard_of({}) = {}", t, std::min(shard, shard_count() - 1));
|
||||
return std::min(shard, shard_count() - 1);
|
||||
}
|
||||
|
||||
std::optional<unsigned> fixed_shard_sharder::try_get_shard_for_reads(const dht::token& t) const {
|
||||
return shard_of(t);
|
||||
}
|
||||
|
||||
dht::shard_replica_set fixed_shard_sharder::shard_for_writes(const dht::token& t, std::optional<dht::write_replica_set_selector>) const {
|
||||
// We don't support migrations of the data in raft tables for strongly consistent tables.
|
||||
// When migrating a strongly consistent tablet, we'll need to move its metadata
|
||||
// explicitly to the new shard along with its raft group data.
|
||||
auto shard = try_get_shard_for_reads(t);
|
||||
if (!shard) {
|
||||
return {};
|
||||
}
|
||||
return { *shard };
|
||||
}
|
||||
|
||||
dht::token fixed_shard_sharder::token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans) const {
|
||||
return token_for_next_shard_for_reads(t, shard, spans);
|
||||
}
|
||||
|
||||
dht::token fixed_shard_sharder::token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans) const {
|
||||
// With the fixed_shard_partitioner, there's only one token range per shard, so spans > 1 always overflows.
|
||||
if (spans > 1 || shard >= shard_count() || t.is_maximum()) {
|
||||
return dht::maximum_token();
|
||||
}
|
||||
|
||||
int64_t token_value = t.is_minimum() ? 0 : t.raw();
|
||||
int64_t start = static_cast<int64_t>(shard) << fixed_shard_partitioner::shard_shift;
|
||||
if (token_value < start) {
|
||||
return dht::token(start);
|
||||
}
|
||||
return dht::maximum_token();
|
||||
}
|
||||
|
||||
std::optional<dht::shard_and_token> fixed_shard_sharder::next_shard(const dht::token& t) const {
|
||||
auto shard = try_get_shard_for_reads(t);
|
||||
if (!shard || *shard + 1 >= shard_count()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
auto next_shard = *shard + 1;
|
||||
auto next_token = token_for_next_shard_for_reads(t, next_shard);
|
||||
if (next_token.is_maximum()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return dht::shard_and_token{next_shard, next_token};
|
||||
}
|
||||
|
||||
std::optional<dht::shard_and_token> fixed_shard_sharder::next_shard_for_reads(const dht::token& t) const {
|
||||
return next_shard(t);
|
||||
}
|
||||
|
||||
} // namespace dht
|
||||
93
dht/fixed_shard.hh
Normal file
93
dht/fixed_shard.hh
Normal file
@@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
|
||||
class schema;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class key_view;
|
||||
|
||||
}
|
||||
|
||||
namespace dht {
|
||||
|
||||
/// A partitioner mainly for Raft metadata tables for strongly consistent tables
|
||||
/// (raft_groups, raft_groups_snapshots, raft_groups_snapshot_config).
|
||||
///
|
||||
/// These tables have partition keys with the shard as the first column.
|
||||
/// The partitioner creates tokens that will be assigned to the shard specified
|
||||
/// in the partition key's first column.
|
||||
///
|
||||
/// Token encoding:
|
||||
/// [shard: 16 bits][hash: 48 bits]
|
||||
///
|
||||
/// To skip converting between signed and unsigned tokens (biasing), the top bit
|
||||
/// is always 0, so we can effectively use only 15 bits for the shard.
|
||||
/// This correlates with the limit enforced by the Raft tables' schema which uses
|
||||
/// a signed int (smallint) for the shard column, where allowing only positive
|
||||
/// shards allows up to 32767 (1 << 15 - 1) shards.
|
||||
///
|
||||
/// The lower 48 bits is a hash of the entire partition key.
|
||||
///
|
||||
/// This encoding is shard-count independent - the shard can be extracted by simple
|
||||
/// bit shifting regardless of how many shards exist in the cluster.
|
||||
struct fixed_shard_partitioner final : public dht::i_partitioner {
|
||||
static constexpr unsigned shard_bits = 16;
|
||||
static constexpr unsigned shard_shift = 64 - shard_bits;
|
||||
static constexpr uint16_t max_shard = std::numeric_limits<int16_t>::max();
|
||||
static constexpr uint64_t hash_mask = (uint64_t(1) << shard_shift) - 1;
|
||||
static const sstring classname;
|
||||
|
||||
fixed_shard_partitioner() = default;
|
||||
virtual const sstring name() const override;
|
||||
virtual dht::token get_token(const schema& s, partition_key_view key) const override;
|
||||
virtual dht::token get_token(const sstables::key_view& key) const override;
|
||||
|
||||
static dht::token token_for_shard(uint16_t shard, uint64_t hash_bits);
|
||||
static unsigned shard_of(dht::token token);
|
||||
};
|
||||
|
||||
/// A sharder for Raft metadata tables for strongly consistent tables (raft_groups,
|
||||
/// raft_groups_snapshots, raft_groups_snapshot_config).
|
||||
///
|
||||
/// These tables store raft group state for all tablets of strongly consistent tables.
|
||||
/// The sharder allows specifying the shard where the metadata should be located by
|
||||
/// including the shard id in the partition key.
|
||||
///
|
||||
/// The shard is encoded in the token by fixed_shard_partitioner. The sharder extracts
|
||||
/// the shard by decoding the token bits used for shard encoding.
|
||||
///
|
||||
/// We inherit from static_sharder because that's what we use for system tables.
|
||||
class fixed_shard_sharder : public dht::static_sharder {
|
||||
public:
|
||||
/// Singleton instance for the raft tablet sharder.
|
||||
static fixed_shard_sharder& instance();
|
||||
|
||||
fixed_shard_sharder();
|
||||
virtual ~fixed_shard_sharder() = default;
|
||||
|
||||
/// Returns the shard for a token by extracting it from the token's high bits.
|
||||
/// This overrides static_sharder::shard_of to use the bit-based encoding.
|
||||
virtual unsigned shard_of(const dht::token& t) const override;
|
||||
|
||||
virtual std::optional<unsigned> try_get_shard_for_reads(const dht::token& t) const override;
|
||||
virtual dht::shard_replica_set shard_for_writes(const dht::token& t, std::optional<dht::write_replica_set_selector> sel) const override;
|
||||
virtual dht::token token_for_next_shard(const dht::token& t, shard_id shard, unsigned spans = 1) const override;
|
||||
virtual dht::token token_for_next_shard_for_reads(const dht::token& t, shard_id shard, unsigned spans = 1) const override;
|
||||
virtual std::optional<dht::shard_and_token> next_shard(const dht::token& t) const override;
|
||||
virtual std::optional<dht::shard_and_token> next_shard_for_reads(const dht::token& t) const override;
|
||||
};
|
||||
|
||||
} // namespace dht
|
||||
17
dist/common/kernel_conf/post_install.sh
vendored
17
dist/common/kernel_conf/post_install.sh
vendored
@@ -31,6 +31,23 @@ EOS
|
||||
sysctl -p /etc/sysctl.d/99-scylla-perfevent.conf
|
||||
fi
|
||||
|
||||
# Tune tcp_mem to max out at 3% of total system memory.
|
||||
# Seastar defaults to allocating 93% of physical memory. The kernel's default
|
||||
# allocation for TCP is ~9%. This adds up to 102%. Reduce the TCP allocation
|
||||
# to 3% to avoid OOM.
|
||||
PAGE_SIZE=$(getconf PAGE_SIZE)
|
||||
TOTAL_MEM_KB=$(sed -n 's/^MemTotal:[[:space:]]*\([0-9]*\).*/\1/p' /proc/meminfo)
|
||||
TOTAL_MEM_BYTES=$((TOTAL_MEM_KB * 1024))
|
||||
TCP_MEM_MAX=$((TOTAL_MEM_BYTES * 3 / 100))
|
||||
TCP_MEM_MAX_PAGES=$((TCP_MEM_MAX / PAGE_SIZE))
|
||||
TCP_MEM_MID_PAGES=$((TCP_MEM_MAX * 2 / 3 / PAGE_SIZE))
|
||||
TCP_MEM_MIN_PAGES=$((TCP_MEM_MAX / 2 / PAGE_SIZE))
|
||||
cat << EOS > /etc/sysctl.d/99-scylla-tcp.conf
|
||||
# Scylla: limit TCP memory to 3% of total system memory
|
||||
net.ipv4.tcp_mem = $TCP_MEM_MIN_PAGES $TCP_MEM_MID_PAGES $TCP_MEM_MAX_PAGES
|
||||
EOS
|
||||
sysctl -p /etc/sysctl.d/99-scylla-tcp.conf || :
|
||||
|
||||
if [ ! -d /run/systemd/system ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
2
dist/debian/control.template
vendored
2
dist/debian/control.template
vendored
@@ -39,7 +39,7 @@ Description: debugging symbols for %{product}-server
|
||||
|
||||
Package: %{product}-kernel-conf
|
||||
Architecture: any
|
||||
Depends: procps
|
||||
Depends: procps, sed
|
||||
Replaces: scylla-enterprise-kernel-conf (<< 2025.1.0~)
|
||||
Breaks: scylla-enterprise-kernel-conf (<< 2025.1.0~)
|
||||
Description: Scylla kernel tuning configuration
|
||||
|
||||
1
dist/debian/debian/scylla-kernel-conf.postrm
vendored
1
dist/debian/debian/scylla-kernel-conf.postrm
vendored
@@ -6,6 +6,7 @@ case "$1" in
|
||||
purge|remove)
|
||||
if [ "$1" = "purge" ]; then
|
||||
rm -f /etc/sysctl.d/99-scylla-perfevent.conf
|
||||
rm -f /etc/sysctl.d/99-scylla-tcp.conf
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
3
dist/redhat/scylla.spec
vendored
3
dist/redhat/scylla.spec
vendored
@@ -186,7 +186,7 @@ This package contains the main scylla configuration file.
|
||||
%package kernel-conf
|
||||
Group: Applications/Databases
|
||||
Summary: Scylla configuration package for the Linux kernel
|
||||
Requires: kmod
|
||||
Requires: kmod sed
|
||||
# tuned overwrites our sysctl settings
|
||||
Obsoletes: tuned >= 2.11.0
|
||||
Provides: scylla-enterprise-kernel-conf = %{version}-%{release}
|
||||
@@ -220,6 +220,7 @@ fi
|
||||
%{_unitdir}/scylla-tune-sched.service
|
||||
/opt/scylladb/kernel_conf/*
|
||||
%ghost /etc/sysctl.d/99-scylla-perfevent.conf
|
||||
%ghost /etc/sysctl.d/99-scylla-tcp.conf
|
||||
|
||||
|
||||
%package node-exporter
|
||||
|
||||
@@ -316,6 +316,17 @@ experimental:
|
||||
example, a single PutItem is represented by a REMOVE + MODIFY event,
|
||||
instead of just a single MODIFY or INSERT.
|
||||
<https://github.com/scylladb/scylla/issues/6930>
|
||||
* Alternator Streams cannot always distinguish between INSERT and MODIFY
|
||||
events - the distinction depends on whether the item existed before the
|
||||
change. Alternator Streams may also produce spurious REMOVE or MODIFY
|
||||
events when a non-existent item is deleted or when an item is set to the
|
||||
same value it already had.
|
||||
This incompatibility can be resolved by setting the configuration option
|
||||
``alternator_streams_increased_compatibility=true``, but this comes with
|
||||
a performance penalty because Alternator needs to read the old value of
|
||||
the item during data-modifying operations on tables with Alternator
|
||||
Streams enabled. By default (``alternator_streams_increased_compatibility=false``),
|
||||
this incompatibility remains.
|
||||
<https://github.com/scylladb/scylla/issues/6918>
|
||||
* In GetRecords responses, Alternator sets `eventSource` to
|
||||
`scylladb:alternator`, rather than `aws:dynamodb`, and doesn't set the
|
||||
|
||||
@@ -106,6 +106,8 @@ The computed number of tablets a table will have is based on several parameters
|
||||
tablets on average. See :ref:`Per-table tablet options <cql-per-table-tablet-options>` for details.
|
||||
* Table-level option ``'min_tablet_count'``. This option sets the minimal number of tablets for the given table.
|
||||
See :ref:`Per-table tablet options <cql-per-table-tablet-options>` for details.
|
||||
* Table-level option ``'max_tablet_count'``. This option sets the maximum number of tablets for the given table
|
||||
See :ref:`Per-table tablet options <cql-per-table-tablet-options>` for details.
|
||||
* Config option ``'tablets_initial_scale_factor'``. This option sets the minimal number of tablets per shard
|
||||
per table globally. This option can be overridden by the table-level option: ``'min_per_shard_tablet_count'``.
|
||||
``'tablets_initial_scale_factor'`` is ignored if either the keyspace option ``'initial'`` or table-level
|
||||
@@ -119,6 +121,18 @@ half the target tablet size, it will be merged (the number of tablets will be ha
|
||||
Each of these factors is taken into consideration, and the one producing the largest number of tablets wins, and
|
||||
will be used as the number of tablets for the given table.
|
||||
|
||||
.. note::
|
||||
|
||||
When both ``'min_tablet_count'`` and ``'max_tablet_count'`` are set together, ScyllaDB validates the
|
||||
combination by computing **effective** bounds:
|
||||
|
||||
* The **effective minimum** is the smallest power of 2 that is greater than or equal to ``min_tablet_count``.
|
||||
* The **effective maximum** is the largest power of 2 that is less than or equal to ``max_tablet_count``.
|
||||
|
||||
ScyllaDB validates that the effective minimum does not exceed the effective maximum. If it does,
|
||||
the ``CREATE TABLE`` statement will be rejected with an error. To avoid ambiguity, it is recommended
|
||||
to use power-of-2 values for both options.
|
||||
|
||||
As the last step, in order to avoid having too many tablets per shard, which could potentially lead to overload
|
||||
and performance degradation, ScyllaDB will run the following algorithm to respect the ``tablets_per_shard_goal``
|
||||
config option:
|
||||
|
||||
@@ -1075,9 +1075,9 @@ The following tombstone gc modes are available:
|
||||
* - Mode
|
||||
- Description
|
||||
* - ``timeout``
|
||||
- Tombstone GC is performed after the wait time specified with ``gc_grace_seconds`` (default).
|
||||
- Tombstone GC is performed after the wait time specified with ``gc_grace_seconds``.
|
||||
* - ``repair``
|
||||
- Tombstone GC is performed after repair is run.
|
||||
- Tombstone GC is performed after repair is run (default).
|
||||
* - ``disabled``
|
||||
- Tombstone GC is never performed. This mode may be useful when loading data to the database, to avoid tombstone GC when part of the data is not yet available.
|
||||
* - ``immediate``
|
||||
@@ -1085,6 +1085,9 @@ The following tombstone gc modes are available:
|
||||
|
||||
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
|
||||
|
||||
The default tombstone-gc mode is ``repair`` for all tables that use ``NetworkTopologyStrategy`` or ``SimpleStrategy``, except for :term:`Colocated Tables <Colocated Table>`.
|
||||
Tables which have a single replica (RF=1) don't need repair (and cannot be repaired either). For such tables, tombstone-gc mode ``repair`` acts the same as ``immediate`` mode would: all tombstones are immediately collectible.
|
||||
|
||||
.. _cql-per-table-tablet-options:
|
||||
|
||||
Per-table tablet options
|
||||
@@ -1128,6 +1131,13 @@ if its data size, or performance requirements are known in advance.
|
||||
function of the tablet count, the replication factor in the datacenter, and the number
|
||||
of nodes and shards in the datacenter. It is recommended to use higher-level options
|
||||
such as ``expected_data_size_in_gb`` or ``min_per_shard_tablet_count`` instead.
|
||||
``max_tablet_count`` 0 Sets the maximum number of tablets for the table. When set, the tablet count
|
||||
will not exceed this value, even if the table grows large enough to normally
|
||||
trigger tablet splits. This option is mainly intended for use during restore
|
||||
operations, to ensure that each SSTable fits entirely within a single tablet.
|
||||
This enables efficient file-based streaming during restore. Setting both
|
||||
``min_tablet_count`` and ``max_tablet_count`` to the same value fixes the
|
||||
tablet count for the table.
|
||||
=============================== =============== ===================================================================================
|
||||
|
||||
When allocating tablets for a new table, ScyllaDB uses the maximum of the ``initial`` tablets configured for the keyspace
|
||||
|
||||
@@ -147,7 +147,7 @@ CREATE TABLE ks.t_scylla_cdc_log (
|
||||
AND comment = 'CDC log for ks.t'
|
||||
AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '60', 'compaction_window_unit': 'MINUTES', 'expired_sstable_check_frequency_seconds': '1800'}
|
||||
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
|
||||
AND tablets = {'expected_data_size_in_gb': '250', 'min_per_shard_tablet_count': '0.8', 'min_tablet_count': '1'}
|
||||
AND tablets = {'expected_data_size_in_gb': '250', 'min_per_shard_tablet_count': '0.8', 'min_tablet_count': '1', 'max_tablet_count': '8'}
|
||||
AND crc_check_chance = 1
|
||||
AND default_time_to_live = 0
|
||||
AND gc_grace_seconds = 0
|
||||
|
||||
69
docs/dev/strong_consistency.md
Normal file
69
docs/dev/strong_consistency.md
Normal file
@@ -0,0 +1,69 @@
|
||||
# Introduction
|
||||
|
||||
This document describes the implementation details and design choices for the
|
||||
strongly-consistent tables feature
|
||||
|
||||
The feature is heavily based on the existing implementation of Raft in scylla, which
|
||||
is described in [docs/dev/raft-in-scylla.md](raft-in-scylla.md).
|
||||
|
||||
# Raft metadata persistence
|
||||
|
||||
## Group0 persistence context
|
||||
|
||||
The Raft groups for strongly consistent tables differ from Raft group0 particularly
|
||||
in the extend of where their Raft group members can be located. For group0, all
|
||||
group members (Raft servers) are on shard 0. For groups for strongly consistent tablets,
|
||||
the group members may be located on any shard. In the future, they will even be able
|
||||
to move alongside their corresponding tablets.
|
||||
|
||||
That's why, when adding the Raft metadata persistence layer for strongly consistent tables,
|
||||
we can't reuse the existing approach for group 0. Group0's persistence stores all Raft state
|
||||
on shard 0. This approach can't be used for strongly consistent tables, because raft groups
|
||||
for strongly consistent tables can occupy many different shards and their metadata may be
|
||||
updated often. Storing all data on a single shard would at the same time make this shard
|
||||
a bottleneck and it would require performing cross-shard operations for most strongly
|
||||
consistent writes, which would also diminish their performance on its own.
|
||||
|
||||
Instead, we want to store the metadata for a Raft group on the same shard where this group's
|
||||
server is located, avoiding any cross-shard operations and evenly distributing the work
|
||||
related to writing metadata to all shards.
|
||||
|
||||
## Strongly consistent table persistence
|
||||
|
||||
We introduce a separate set of Raft system tables for strongly consistent tablets:
|
||||
|
||||
- `system.raft_groups`
|
||||
- `system.raft_groups_snapshots`
|
||||
- `system.raft_groups_snapshot_config`
|
||||
|
||||
These tables mirror the logical contents of the existing `system.raft`, `system.raft_snapshots`,
|
||||
`system.raft_snapshot_config` tables, but their partition key is a composite `(shard, group_id)`
|
||||
rather than just `group_id`.
|
||||
|
||||
To make “(shard, group_id) belongs to shard X” true at the storage layer, we use:
|
||||
|
||||
- a dedicated partitioner (`service::strong_consistency::raft_groups_partitioner`)
|
||||
which encodes the shard into the token, and
|
||||
- a dedicated sharder (`service::strong_consistency::raft_groups_sharder`) which extracts
|
||||
that shard from the token.
|
||||
|
||||
As a result, reads and writes for a given group’s persistence are routed to the same shard
|
||||
where the Raft server instance runs.
|
||||
|
||||
## Token encoding
|
||||
|
||||
The partitioner encodes the destination shard in the token’s high bits:
|
||||
|
||||
- token layout: `[shard: 16 bits][group_id_hash: 48 bits]`
|
||||
- the shard value is constrained to fit the `smallint` column used in the schema.
|
||||
it also needs to be non-negative, so it's effectively limited to range `[0, 32767]`
|
||||
- the lower 48 bits are derived by hashing the `group_id` (timeuuid)
|
||||
|
||||
The key property is that shard extraction is a pure bit operation and does not depend on
|
||||
the cluster’s shard count.
|
||||
|
||||
## No direct migration support
|
||||
|
||||
`raft_groups_sharder::shard_for_writes()` returns up to one shard - it does not support
|
||||
migrations using double writes. Instead, for a given Raft group, when a tablet is migrated,
|
||||
the Raft metadata needs to be erased from the former location and added in the new location.
|
||||
File diff suppressed because it is too large
Load Diff
@@ -15,39 +15,35 @@ Ensure your platform is supported by the ScyllaDB version you want to install.
|
||||
See `OS Support <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
for information about supported Linux distributions and versions.
|
||||
|
||||
Note that if you're on CentOS 7, only root offline installation is supported.
|
||||
|
||||
Download and Install
|
||||
-----------------------
|
||||
|
||||
#. Download the latest tar.gz file for ScyllaDB version (x86 or ARM) from ``https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-<version>/``.
|
||||
|
||||
Example for version 6.1: https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-6.1/
|
||||
**Example** for version 2025.1:
|
||||
|
||||
- Go to https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-2025.1/
|
||||
- Download the ``scylla-unified`` file for the patch version you want to
|
||||
install. For example, to install 2025.1.9 (x86), download
|
||||
``scylla-unified-2025.1.9-0.20251010.6c539463bbda.x86_64.tar.gz``.
|
||||
|
||||
#. Uncompress the downloaded package.
|
||||
|
||||
The following example shows the package for ScyllaDB 6.1.1 (x86):
|
||||
**Example** for version 2025.1.9 (x86) (downloaded in the previous step):
|
||||
|
||||
.. code:: console
|
||||
.. code::
|
||||
|
||||
tar xvfz scylla-unified-6.1.1-0.20240814.8d90b817660a.x86_64.tar.gz
|
||||
tar xvfz scylla-unified-2025.1.9-0.20251010.6c539463bbda.x86_64.tar.gz
|
||||
|
||||
#. Install OpenJDK 8 or 11.
|
||||
|
||||
The following example shows Java installation on a CentOS-like system:
|
||||
|
||||
.. code:: console
|
||||
|
||||
sudo yum install -y java-11-openjdk-headless
|
||||
|
||||
For root offline installation on Debian-like systems, two additional packages, ``xfsprogs``
|
||||
and ``mdadm``, should be installed to be used in RAID setup.
|
||||
#. (Root offline installation only) For root offline installation on Debian-like
|
||||
systems, two additional packages, ``xfsprogs`` and ``mdadm``, should be
|
||||
installed to be used in RAID setup.
|
||||
|
||||
#. Install ScyllaDB as a user with non-root privileges:
|
||||
|
||||
.. code:: console
|
||||
|
||||
./install.sh --nonroot --python3 ~/scylladb/python3/bin/python3
|
||||
./install.sh --nonroot
|
||||
|
||||
Configure and Run ScyllaDB
|
||||
----------------------------
|
||||
@@ -77,19 +73,14 @@ Run nodetool:
|
||||
|
||||
.. code:: console
|
||||
|
||||
~/scylladb/share/cassandra/bin/nodetool status
|
||||
~/scylladb/bin/nodetool nodetool status
|
||||
|
||||
Run cqlsh:
|
||||
|
||||
.. code:: console
|
||||
|
||||
~/scylladb/share/cassandra/bin/cqlsh
|
||||
~/scylladb/bin/cqlsh
|
||||
|
||||
Run cassandra-stress:
|
||||
|
||||
.. code:: console
|
||||
|
||||
~/scylladb/share/cassandra/bin/cassandra-stress write
|
||||
|
||||
.. note::
|
||||
|
||||
@@ -120,7 +111,7 @@ Nonroot install
|
||||
|
||||
./install.sh --upgrade --nonroot
|
||||
|
||||
.. note:: The installation script does not upgrade scylla-jmx and scylla-tools. You will have to upgrade them separately.
|
||||
.. note:: The installation script does not upgrade scylla-tools. You will have to upgrade them separately.
|
||||
|
||||
Uninstall
|
||||
===========
|
||||
@@ -150,4 +141,4 @@ Next Steps
|
||||
* Manage your clusters with `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_
|
||||
* Monitor your cluster and data with `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_
|
||||
* Get familiar with ScyllaDB’s :doc:`command line reference guide </operating-scylla/nodetool>`.
|
||||
* Learn about ScyllaDB at `ScyllaDB University <https://university.scylladb.com/>`_
|
||||
* Learn about ScyllaDB at `ScyllaDB University <https://university.scylladb.com/>`_
|
||||
@@ -19,21 +19,15 @@ Procedure
|
||||
authenticator: PasswordAuthenticator
|
||||
|
||||
|
||||
#. Restart ScyllaDB.
|
||||
#. Restart ScyllaDB.
|
||||
|
||||
.. include:: /rst_include/scylla-commands-restart-index.rst
|
||||
|
||||
#. Start cqlsh with the default superuser username and password.
|
||||
#. Start cqlsh over the maintenance socket and create a new superuser. See :ref:`Setting Up a Superuser Using the Maintenance Socket <create-superuser-using-maintenance-socket>` for instructions.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
cqlsh -u cassandra -p cassandra
|
||||
|
||||
.. note::
|
||||
|
||||
Before proceeding to the next step, we recommend creating a custom superuser
|
||||
to improve security.
|
||||
See :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>` for instructions.
|
||||
cqlsh <maintenance_socket_path>
|
||||
|
||||
#. If you want to create users and roles, continue to :doc:`Enable Authorization </operating-scylla/security/enable-authorization>`.
|
||||
|
||||
|
||||
@@ -1,111 +1,100 @@
|
||||
================================
|
||||
Creating a Custom Superuser
|
||||
Creating a Superuser
|
||||
================================
|
||||
|
||||
The default ScyllaDB superuser role is ``cassandra`` with password ``cassandra``.
|
||||
Users with the ``cassandra`` role have full access to the database and can run
|
||||
There is no default superuser role in ScyllaDB.
|
||||
Users with a superuser role have full access to the database and can run
|
||||
any CQL command on the database resources.
|
||||
|
||||
To improve security, we recommend creating a custom superuser. You should:
|
||||
There are two ways you can create a superuser in ScyllaDB:
|
||||
|
||||
#. Use the default ``cassandra`` superuser to log in.
|
||||
#. Create a custom superuser.
|
||||
#. Log in as the custom superuser.
|
||||
#. Remove the ``cassandra`` role.
|
||||
- :ref:`Using the ScyllaDB Maintenance Socket to create a superuser role <create-superuser-using-maintenance-socket>`
|
||||
- :ref:`Using an existing superuser account to create a new superuser role <create-superuser-using-existing-superuser>`
|
||||
|
||||
In the above procedure, you only need to use the ``cassandra`` superuser once, during
|
||||
the initial RBAC set up.
|
||||
To completely eliminate the need to use ``cassandra``, you can :ref:`configure the initial
|
||||
custom superuser in the scylla.yaml configuration file <create-superuser-in-config-file>`.
|
||||
When setting up a new cluster, use the maintenance socket approach to create the first superuser.
|
||||
|
||||
.. _create-superuser-procedure:
|
||||
|
||||
Procedure
|
||||
-----------
|
||||
.. _create-superuser-using-maintenance-socket:
|
||||
|
||||
#. Start cqlsh with the default superuser settings:
|
||||
|
||||
.. code::
|
||||
|
||||
cqlsh -u cassandra -p cassandra
|
||||
|
||||
#. Create a new superuser:
|
||||
|
||||
.. code::
|
||||
|
||||
CREATE ROLE <custom_superuser name> WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '<custom_superuser_password>';
|
||||
|
||||
For example:
|
||||
|
||||
.. code::
|
||||
:class: hide-copy-button
|
||||
|
||||
CREATE ROLE dba WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '39fksah!';
|
||||
|
||||
.. warning::
|
||||
|
||||
You must set a PASSWORD when creating a role with LOGIN privileges.
|
||||
Otherwise, you will not be able to log in to the database using that role.
|
||||
|
||||
#. Exit cqlsh:
|
||||
|
||||
.. code::
|
||||
|
||||
EXIT;
|
||||
|
||||
#. Log in as the new superuser:
|
||||
|
||||
.. code::
|
||||
|
||||
cqlsh -u <custom_superuser name> -p <custom_superuser_password>
|
||||
|
||||
For example:
|
||||
|
||||
.. code::
|
||||
:class: hide-copy-button
|
||||
|
||||
cqlsh -u dba -p 39fksah!
|
||||
|
||||
#. Show all the roles to verify that the new superuser was created:
|
||||
|
||||
.. code::
|
||||
|
||||
LIST ROLES;
|
||||
|
||||
#. Remove the cassandra superuser:
|
||||
|
||||
.. code::
|
||||
|
||||
DROP ROLE cassandra;
|
||||
|
||||
#. Show all the roles to verify that the cassandra role was deleted:
|
||||
|
||||
.. code::
|
||||
|
||||
LIST ROLES;
|
||||
|
||||
.. _create-superuser-in-config-file:
|
||||
|
||||
Setting Custom Superuser Credentials in scylla.yaml
|
||||
Setting Up a Superuser Using the Maintenance Socket
|
||||
------------------------------------------------------
|
||||
|
||||
Operating ScyllaDB using the default superuser ``cassandra`` with password ``cassandra``
|
||||
is insecure and impacts performance. For this reason, the default should be used only once -
|
||||
to create a custom superuser role, following the CQL :ref:`procedure <create-superuser-procedure>` above.
|
||||
If no superuser account exists in the cluster, which is the case for new clusters, you can create a superuser using the ScyllaDB Maintenance Socket.
|
||||
In order to do that, the node must have the maintenance socket enabled.
|
||||
See :doc:`Admin Tools: Maintenance Socket </operating-scylla/admin-tools/maintenance-socket/>`.
|
||||
|
||||
To avoid executing with the default credentials for the period before you can make
|
||||
the CQL modifications, you can configure the custom superuser name and password
|
||||
in the ``scylla.yaml`` configuration file:
|
||||
To create a superuser using the maintenance socket, you should:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
auth_superuser_name: <superuser name>
|
||||
auth_superuser_salted_password: <superuser salted password as processed by mkpassword or similar - cleartext is not allowed>
|
||||
1. Connect to the node using ``cqlsh`` over the maintenance socket.
|
||||
|
||||
.. caution::
|
||||
.. code-block:: shell
|
||||
|
||||
The superuser credentials in the ``scylla.yaml`` file will be ignored:
|
||||
cqlsh <maintenance_socket_path>
|
||||
|
||||
* If any superuser other than ``cassandra`` is already defined in the cluster.
|
||||
* After you create a custom superuser with the CQL :ref:`procedure <create-superuser-procedure>`.
|
||||
Replace ``<maintenance_socket_path>`` with the socket path configured in ``scylla.yaml``.
|
||||
|
||||
2. Create new superuser role using ``CREATE ROLE`` command.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE ROLE <new_superuser> WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '<new_superuser_password>';
|
||||
|
||||
3. Verify that you can log in to your node using ``cqlsh`` command with the new password.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh -u <new_superuser>
|
||||
Password:
|
||||
|
||||
.. note::
|
||||
|
||||
Enter the value of `<new_superuser_password>` password when prompted. The input is not displayed.
|
||||
|
||||
4. Show all the roles to verify that the new superuser was created:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
LIST ROLES;
|
||||
|
||||
|
||||
.. _create-superuser-using-existing-superuser:
|
||||
|
||||
Setting Up a Superuser Using an Existing Superuser Account
|
||||
-------------------------------------------------------------
|
||||
|
||||
To create a superuser using an existing superuser account, you should:
|
||||
|
||||
1. Log in to cqlsh using an existing superuser account.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh -u <existing_superuser>
|
||||
Password:
|
||||
|
||||
.. note::
|
||||
|
||||
Enter the value of `<existing_superuser_password>` password when prompted. The input is not displayed.
|
||||
|
||||
2. Create a new superuser.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE ROLE <new_superuser> WITH SUPERUSER = true AND LOGIN = true and PASSWORD = '<new_superuser_password>';
|
||||
|
||||
3. Verify that you can log in to your node using ``cqlsh`` command with the new password.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh -u <new_superuser>
|
||||
Password:
|
||||
|
||||
.. note::
|
||||
|
||||
Enter the value of `<new_superuser_password>` password when prompted. The input is not displayed.
|
||||
|
||||
4. Show all the roles to verify that the new superuser was created:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
LIST ROLES;
|
||||
|
||||
|
||||
@@ -28,7 +28,6 @@ The following assumes that Authentication has already been enabled via the proce
|
||||
2. Set your credentials as the `superuser`_
|
||||
3. Login to cqlsh as the superuser and set `roles`_ and privileges for your users
|
||||
4. Confirm users can `access`_ the client with their new credentials.
|
||||
5. `Remove`_ Cassandra default user / passwords
|
||||
|
||||
.. _authorizer:
|
||||
|
||||
@@ -51,19 +50,11 @@ It is highly recommended to perform this action on a node that is not processing
|
||||
|
||||
.. _superuser:
|
||||
|
||||
Set a Superuser
|
||||
Create a Superuser
|
||||
.........................
|
||||
|
||||
The default ScyllaDB superuser role is ``cassandra`` with password ``cassandra``. Using the default
|
||||
superuser is unsafe and may significantly impact performance.
|
||||
|
||||
If you haven't created a custom superuser while enabling authentication, you should create a custom superuser
|
||||
before creating additional roles.
|
||||
See :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>` for instructions.
|
||||
|
||||
.. note::
|
||||
|
||||
We recommend creating a custom superuser to improve security.
|
||||
There is no default superuser in ScyllaDB. You should create a superuser before creating additional roles.
|
||||
See :doc:`Creating a Superuser </operating-scylla/security/create-superuser/>` for instructions.
|
||||
|
||||
.. _roles:
|
||||
|
||||
@@ -80,7 +71,12 @@ Validate you have the credentials for the superuser for your system for yourself
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
cqlsh -u dba -p 39fksah!
|
||||
cqlsh -u dba
|
||||
Password:
|
||||
|
||||
.. note::
|
||||
|
||||
Enter the password when prompted. The input is not displayed.
|
||||
|
||||
2. Configure the appropriate access privileges for clients using :ref:`GRANT PERMISSION <grant-permission-statement>` statements. For additional examples, consult the :doc:`RBAC example </operating-scylla/security/rbac-usecase>`.
|
||||
|
||||
@@ -110,30 +106,8 @@ The following should be noted:
|
||||
|
||||
* When initiating a connection, clients will need to use the user name and password that you assign
|
||||
|
||||
* Confirm all clients can connect before removing the Cassandra default password and user.
|
||||
|
||||
2. To remove permission from any role or user, see :ref:`REVOKE PERMISSION <revoke-permission-statement>`.
|
||||
|
||||
|
||||
.. _remove:
|
||||
|
||||
Remove Cassandra Default Password and User
|
||||
..........................................
|
||||
|
||||
To prevent others from entering with the old superuser password, you can and should delete it.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DROP ROLE [ IF EXISTS ] 'old-username';
|
||||
|
||||
For example
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
DROP ROLE [ IF EXISTS ] 'cassandra';
|
||||
|
||||
|
||||
|
||||
Additional References
|
||||
---------------------
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ Security
|
||||
|
||||
* :doc:`Enable Authentication </operating-scylla/security/authentication/>`
|
||||
* :doc:`Enable and Disable Authentication Without Downtime </operating-scylla/security/runtime-authentication/>`
|
||||
* :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>`
|
||||
* :doc:`Creating a Superuser </operating-scylla/security/create-superuser/>`
|
||||
* :doc:`Generate a cqlshrc File <gen-cqlsh-file>`
|
||||
* :doc:`Enable Authorization</operating-scylla/security/enable-authorization/>`
|
||||
* :doc:`Role Based Access Control (RBAC) </operating-scylla/security/rbac-usecase/>`
|
||||
|
||||
@@ -25,20 +25,19 @@ Procedure
|
||||
|
||||
.. include:: /rst_include/scylla-commands-restart-index.rst
|
||||
|
||||
#. Login with the default superuser credentials and create an authenticated user with strong password.
|
||||
#. Login over the maintenance socket and create an authenticated user with strong password.
|
||||
|
||||
For example:
|
||||
See :ref:`Setting Up a Superuser Using the Maintenance Socket <create-superuser-using-maintenance-socket>` for instructions.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
cqlsh -ucassandra -pcassandra
|
||||
cqlsh /path/to/maintenance/socket/cql.m
|
||||
|
||||
cassandra@cqlsh> CREATE ROLE scylla WITH PASSWORD = '123456' AND LOGIN = true AND SUPERUSER = true;
|
||||
cassandra@cqlsh> LIST ROLES;
|
||||
|
||||
name |super
|
||||
----------+-------
|
||||
cassandra |True
|
||||
scylla |True
|
||||
|
||||
Optionally, assign the role to your user. For example:
|
||||
@@ -47,20 +46,6 @@ Procedure
|
||||
|
||||
cassandra@cqlsh> GRANT scylla TO myuser
|
||||
|
||||
#. Login with the new user created and drop the superuser cassandra.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
cqlsh -u scylla -p 123456
|
||||
|
||||
scylla@cqlsh> DROP ROLE cassandra;
|
||||
|
||||
scylla@cqlsh> LIST ROLES;
|
||||
|
||||
name |super
|
||||
----------+-------
|
||||
scylla |True
|
||||
|
||||
#. Update the ``authenticator`` parameter in ``scylla.yaml`` for all the nodes in the cluster: Change ``authenticator: com.scylladb.auth.TransitionalAuthenticator`` to ``authenticator: PasswordAuthenticator``.
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
@@ -1,37 +1,59 @@
|
||||
Reset Authenticator Password
|
||||
============================
|
||||
|
||||
This procedure describes what to do when a user loses his password and can not reset it with a superuser role.
|
||||
The procedure requires cluster downtime and as a result, all auth data is deleted.
|
||||
This procedure describes what to do when a user loses their password and can not reset it with a superuser role.
|
||||
If a node has maintenance socket available, there is no node or cluster downtime required.
|
||||
If a node does not have maintenance socket available, node has to be restarted with maintenance socket enabled, but no cluster downtime is required.
|
||||
|
||||
|
||||
Prerequisites
|
||||
.............
|
||||
|
||||
Enable maintenance socket on the node. If already done, skip to the ``Procedure`` section.
|
||||
To check if the maintenance socket is enabled, see :doc:`Admin Tools: Maintenance Socket </operating-scylla/admin-tools/maintenance-socket/>` for details.
|
||||
|
||||
1. Stop the ScyllaDB node.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
sudo systemctl stop scylla-server
|
||||
|
||||
2. Edit ``/etc/scylla/scylla.yaml`` file and configure the maintenance socket.
|
||||
See :doc:`Admin Tools: Maintenance Socket </operating-scylla/admin-tools/maintenance-socket/>` for details.
|
||||
|
||||
3. Start the ScyllaDB node.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
|
||||
Procedure
|
||||
.........
|
||||
|
||||
| 1. Stop ScyllaDB nodes (**Stop all the nodes in the cluster**).
|
||||
1. Connect to the node using ``cqlsh`` over the maintenance socket.
|
||||
|
||||
.. code-block:: shell
|
||||
.. code-block:: shell
|
||||
|
||||
sudo systemctl stop scylla-server
|
||||
cqlsh <maintenance_socket_path>
|
||||
|
||||
| 2. Remove system tables starting with ``role`` prefix from ``/var/lib/scylla/data/system`` directory.
|
||||
Replace ``<maintenance_socket_path>`` with the socket path configured in ``scylla.yaml``.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
rm -rf /var/lib/scylla/data/system/role*
|
||||
|
||||
| 3. Start ScyllaDB nodes.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
sudo systemctl start scylla-server
|
||||
|
||||
| 4. Verify that you can log in to your node using ``cqlsh`` command.
|
||||
| The access is only possible using ScyllaDB superuser.
|
||||
2. Reset the password for the user using ``ALTER ROLE`` command.
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
cqlsh -u cassandra -p cassandra
|
||||
|
||||
| 5. Recreate the users
|
||||
ALTER ROLE username WITH PASSWORD '<new_password>';
|
||||
|
||||
3. Verify that you can log in to your node using ``cqlsh`` command with the new password.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
cqlsh -u username
|
||||
Password:
|
||||
|
||||
.. note::
|
||||
|
||||
Enter the value of `<new_password>` password when prompted. The input is not displayed.
|
||||
|
||||
.. include:: /troubleshooting/_common/ts-return.rst
|
||||
|
||||
@@ -154,6 +154,7 @@ fedora_packages=(
|
||||
|
||||
podman
|
||||
buildah
|
||||
slirp4netns
|
||||
|
||||
# for cassandra-stress
|
||||
java-openjdk-headless
|
||||
|
||||
@@ -327,9 +327,6 @@ future<tablet_map> network_topology_strategy::reallocate_tablets(schema_ptr s, t
|
||||
auto tinfo = tablets.get_tablet_info(tb);
|
||||
tinfo.replicas = co_await reallocate_tablets(s, tm, load, tablets, tb);
|
||||
if (tablets.has_raft_info()) {
|
||||
for (auto& r: tinfo.replicas) {
|
||||
r.shard = 0;
|
||||
}
|
||||
if (!tablets.get_tablet_raft_info(tb).group_id) {
|
||||
tablets.set_tablet_raft_info(tb, tablet_raft_info {
|
||||
.group_id = raft::group_id{utils::UUID_gen::get_time_UUID()}
|
||||
|
||||
47
main.cc
47
main.cc
@@ -19,6 +19,7 @@
|
||||
#include "gms/inet_address.hh"
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
#include "auth/allow_all_authorizer.hh"
|
||||
#include "auth/maintenance_socket_authenticator.hh"
|
||||
#include "auth/maintenance_socket_role_manager.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/signal.hh>
|
||||
@@ -1830,7 +1831,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
checkpoint(stop_signal, "initializing strongly consistent groups manager");
|
||||
sharded<service::strong_consistency::groups_manager> groups_manager;
|
||||
groups_manager.start(std::ref(messaging), std::ref(raft_gr), std::ref(qp),
|
||||
std::ref(db), std::ref(feature_service)).get();
|
||||
std::ref(db), std::ref(mm), std::ref(sys_ks), std::ref(feature_service)).get();
|
||||
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [&] {
|
||||
groups_manager.stop().get();
|
||||
});
|
||||
@@ -2093,17 +2094,15 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
if (cfg->maintenance_socket() != "ignore") {
|
||||
checkpoint(stop_signal, "starting maintenance auth service");
|
||||
auth::service_config maintenance_auth_config;
|
||||
maintenance_auth_config.authorizer_java_name = sstring{auth::allow_all_authorizer_name};
|
||||
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
|
||||
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
|
||||
|
||||
maintenance_auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache)).get();
|
||||
maintenance_auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier),
|
||||
auth::make_authorizer_factory(auth::allow_all_authorizer_name, qp, group0_client, mm),
|
||||
auth::make_maintenance_socket_authenticator_factory(qp, group0_client, mm, auth_cache),
|
||||
auth::make_maintenance_socket_role_manager_factory(qp, group0_client, mm, auth_cache),
|
||||
maintenance_socket_enabled::yes, std::ref(auth_cache)).get();
|
||||
|
||||
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
|
||||
|
||||
start_auth_service(maintenance_auth_service, stop_maintenance_auth_service, "maintenance auth service");
|
||||
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
|
||||
}
|
||||
|
||||
checkpoint(stop_signal, "starting REST API");
|
||||
@@ -2133,6 +2132,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
if (cfg->maintenance_mode()) {
|
||||
checkpoint(stop_signal, "entering maintenance mode");
|
||||
|
||||
// Notify maintenance auth service that maintenance mode is starting
|
||||
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
|
||||
auth::set_maintenance_mode(svc);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
|
||||
|
||||
ss.local().start_maintenance_mode().get();
|
||||
|
||||
seastar::set_abort_on_ebadf(cfg->abort_on_ebadf());
|
||||
@@ -2260,6 +2267,15 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}).get();
|
||||
stop_signal.ready(false);
|
||||
|
||||
if (cfg->maintenance_socket() != "ignore") {
|
||||
// Enable role operations now that node joined the cluster
|
||||
maintenance_auth_service.invoke_on_all([](auth::service& svc) {
|
||||
return auth::ensure_role_operations_are_enabled(svc);
|
||||
}).get();
|
||||
|
||||
start_cql(*cql_maintenance_server_ctl, stop_maintenance_cql, "maintenance native server");
|
||||
}
|
||||
|
||||
// At this point, `locator::topology` should be stable, i.e. we should have complete information
|
||||
// about the layout of the cluster (= list of nodes along with the racks/DCs).
|
||||
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
|
||||
@@ -2348,10 +2364,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
vb.init_virtual_table();
|
||||
}).get();
|
||||
|
||||
const qualified_name qualified_authorizer_name(auth::meta::AUTH_PACKAGE_NAME, cfg->authorizer());
|
||||
const qualified_name qualified_authenticator_name(auth::meta::AUTH_PACKAGE_NAME, cfg->authenticator());
|
||||
const qualified_name qualified_role_manager_name(auth::meta::AUTH_PACKAGE_NAME, cfg->role_manager());
|
||||
|
||||
// Reproducer of scylladb/scylladb#24792.
|
||||
auto i24792_reproducer = defer([] {
|
||||
if (utils::get_local_injector().enter("reload_service_level_cache_after_auth_service_is_stopped")) {
|
||||
@@ -2360,12 +2372,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting auth service");
|
||||
auth::service_config auth_config;
|
||||
auth_config.authorizer_java_name = qualified_authorizer_name;
|
||||
auth_config.authenticator_java_name = qualified_authenticator_name;
|
||||
auth_config.role_manager_java_name = qualified_role_manager_name;
|
||||
|
||||
auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache)).get();
|
||||
auth_service.start(std::ref(qp), std::ref(group0_client), std::ref(mm_notifier),
|
||||
auth::make_authorizer_factory(cfg->authorizer(), qp, group0_client, mm),
|
||||
auth::make_authenticator_factory(cfg->authenticator(), qp, group0_client, mm, auth_cache),
|
||||
auth::make_role_manager_factory(cfg->role_manager(), qp, group0_client, mm, auth_cache),
|
||||
maintenance_socket_enabled::no, std::ref(auth_cache)).get();
|
||||
|
||||
std::any stop_auth_service;
|
||||
// Has to be called after node joined the cluster (join_cluster())
|
||||
|
||||
@@ -180,7 +180,7 @@ mutation mutation::sliced(const query::clustering_row_ranges& ranges) const {
|
||||
|
||||
mutation mutation::compacted() const {
|
||||
auto m = *this;
|
||||
m.partition().compact_for_compaction(*schema(), always_gc, m.decorated_key(), gc_clock::time_point::min(), tombstone_gc_state(nullptr));
|
||||
m.partition().compact_for_compaction(*schema(), always_gc, m.decorated_key(), gc_clock::time_point::min(), tombstone_gc_state::no_gc());
|
||||
return m;
|
||||
}
|
||||
|
||||
|
||||
@@ -1344,7 +1344,7 @@ mutation_partition::compact_for_query(
|
||||
bool drop_tombstones_unconditionally = false;
|
||||
// Replicas should only send non-purgeable tombstones already,
|
||||
// so we can expect to not have to actually purge any tombstones here.
|
||||
return do_compact(s, dk, query_time, row_ranges, always_return_static_content, row_limit, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr));
|
||||
return do_compact(s, dk, query_time, row_ranges, always_return_static_content, row_limit, always_gc, drop_tombstones_unconditionally, tombstone_gc_state::no_gc());
|
||||
}
|
||||
|
||||
void mutation_partition::compact_for_compaction(const schema& s,
|
||||
@@ -1368,7 +1368,7 @@ void mutation_partition::compact_for_compaction_drop_tombstones_unconditionally(
|
||||
};
|
||||
bool drop_tombstones_unconditionally = true;
|
||||
auto compaction_time = gc_clock::time_point::max();
|
||||
do_compact(s, dk, compaction_time, all_rows, true, query::partition_max_rows, always_gc, drop_tombstones_unconditionally, tombstone_gc_state(nullptr));
|
||||
do_compact(s, dk, compaction_time, all_rows, true, query::partition_max_rows, always_gc, drop_tombstones_unconditionally, tombstone_gc_state::gc_all());
|
||||
}
|
||||
|
||||
// Returns true if the mutation_partition represents no writes.
|
||||
@@ -2157,7 +2157,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
|
||||
// This result was already built with a limit, don't apply another one.
|
||||
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
|
||||
auto consumer = compact_for_query<query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
|
||||
max_partitions, tombstone_gc_state(nullptr), query_result_builder(*s, builder));
|
||||
max_partitions, tombstone_gc_state::no_gc(), query_result_builder(*s, builder));
|
||||
auto compaction_state = consumer.get_state();
|
||||
frozen_mutation_consumer_adaptor adaptor(s, consumer);
|
||||
for (const partition& p : r.partitions()) {
|
||||
@@ -2176,7 +2176,7 @@ query::result
|
||||
query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) {
|
||||
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
|
||||
auto consumer = compact_for_query<query_result_builder>(*m.schema(), now, slice, row_limit,
|
||||
query::max_partitions, tombstone_gc_state(nullptr), query_result_builder(*m.schema(), builder));
|
||||
query::max_partitions, tombstone_gc_state::no_gc(), query_result_builder(*m.schema(), builder));
|
||||
auto compaction_state = consumer.get_state();
|
||||
std::move(m).consume(consumer, consume_in_reverse::no);
|
||||
return builder.build(compaction_state->current_full_position());
|
||||
@@ -2405,7 +2405,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
|
||||
auto r_a_r = std::make_unique<range_and_reader>(s, source, std::move(permit), dk, slice, std::move(trace_ptr));
|
||||
auto cwqrb = counter_write_query_result_builder(*s);
|
||||
auto cfq = compact_for_query<counter_write_query_result_builder>(
|
||||
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, tombstone_gc_state(nullptr), std::move(cwqrb));
|
||||
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, tombstone_gc_state::no_gc(), std::move(cwqrb));
|
||||
auto f = r_a_r->reader.consume(std::move(cfq));
|
||||
return f.finally([r_a_r = std::move(r_a_r)] {
|
||||
return r_a_r->reader.close();
|
||||
|
||||
14
raft/fsm.cc
14
raft/fsm.cc
@@ -8,6 +8,7 @@
|
||||
#include "fsm.hh"
|
||||
#include <random>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "raft/raft.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
@@ -205,6 +206,11 @@ void fsm::become_follower(server_id leader) {
|
||||
}
|
||||
|
||||
void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
|
||||
if (utils::get_local_injector().enter("avoid_being_raft_leader")) {
|
||||
become_follower(server_id{});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!std::holds_alternative<candidate>(_state)) {
|
||||
_output.state_changed = true;
|
||||
}
|
||||
@@ -1108,6 +1114,14 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
|
||||
return std::make_pair(id, _commit_idx);
|
||||
}
|
||||
|
||||
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
|
||||
// read_idx from the leader might not be replicated to the local node yet.
|
||||
const bool in_local_log = read_idx <= _log.last_idx();
|
||||
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
|
||||
advance_commit_idx(read_idx);
|
||||
}
|
||||
}
|
||||
|
||||
void fsm::stop() {
|
||||
if (is_leader()) {
|
||||
// Become follower to stop accepting requests
|
||||
|
||||
@@ -480,6 +480,15 @@ public:
|
||||
|
||||
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
|
||||
|
||||
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
|
||||
// read index belongs to the current term.
|
||||
//
|
||||
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
|
||||
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
|
||||
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
|
||||
// Completeness Property). Hence, updating the commit index is safe.
|
||||
void maybe_update_commit_idx_for_read(index_t read_idx);
|
||||
|
||||
size_t in_memory_log_size() const {
|
||||
return _log.in_memory_size();
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ public:
|
||||
// and non matching term is in second
|
||||
std::pair<bool, term_t> match_term(index_t idx, term_t term) const;
|
||||
// Return term number of the entry matching the index. If the
|
||||
// entry is not in the log and does not match snapshot index,
|
||||
// entry is in the truncated part of the log and does not match snapshot index,
|
||||
// return an empty optional.
|
||||
// Used to validate the log matching rule.
|
||||
std::optional<term_t> term_for(index_t idx) const;
|
||||
|
||||
@@ -505,7 +505,7 @@ using add_entry_reply = std::variant<entry_id, transient_error, commit_status_un
|
||||
// std::monostate {} if the leader cannot execute the barrier because
|
||||
// it did not commit any entries yet
|
||||
// raft::not_a_leader if the node is not a leader
|
||||
// index_t index that is safe to read without breaking linearizability
|
||||
// index_t index that is safe to read once applied without breaking linearizability
|
||||
using read_barrier_reply = std::variant<std::monostate, index_t, raft::not_a_leader>;
|
||||
|
||||
using rpc_message = std::variant<append_request,
|
||||
|
||||
@@ -436,6 +436,8 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
|
||||
}
|
||||
|
||||
future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
||||
check_not_aborted();
|
||||
|
||||
if (_fsm->current_leader()) {
|
||||
co_return;
|
||||
}
|
||||
@@ -454,6 +456,8 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
|
||||
}
|
||||
|
||||
future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
|
||||
check_not_aborted();
|
||||
|
||||
if (!_state_change_promise) {
|
||||
_state_change_promise.emplace();
|
||||
}
|
||||
@@ -748,6 +752,8 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
|
||||
}
|
||||
_stats.add_command++;
|
||||
|
||||
check_not_aborted();
|
||||
|
||||
logger.trace("[{}] an entry is submitted", id());
|
||||
if (!_config.enable_forwarding) {
|
||||
if (const auto leader = _fsm->current_leader(); leader != _id) {
|
||||
@@ -858,6 +864,8 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
|
||||
}
|
||||
|
||||
future<> server_impl::modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) {
|
||||
check_not_aborted();
|
||||
|
||||
utils::get_local_injector().inject("raft/throw_commit_status_unknown_in_modify_config", [] {
|
||||
throw raft::commit_status_unknown();
|
||||
});
|
||||
@@ -1553,6 +1561,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
read_idx = std::get<index_t>(res);
|
||||
_fsm->maybe_update_commit_idx_for_read(read_idx);
|
||||
co_return stop_iteration::yes;
|
||||
});
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ public:
|
||||
// committed locally means simply that the commit index is beyond this entry's index.
|
||||
//
|
||||
// The caller may pass a pointer to an abort_source to make the operation abortable.
|
||||
// It it passes nullptr, the operation is unabortable.
|
||||
// If it passes nullptr, the operation is unabortable.
|
||||
//
|
||||
// Successful `add_entry` with `wait_type::committed` does not guarantee that `state_machine::apply` will be called
|
||||
// locally for this entry. Between the commit and the application we may receive a snapshot containing this entry,
|
||||
@@ -125,7 +125,7 @@ public:
|
||||
// returned even in case of a successful config change.
|
||||
//
|
||||
// The caller may pass a pointer to an abort_source to make the operation abortable.
|
||||
// It it passes nullptr, the operation is unabortable.
|
||||
// If it passes nullptr, the operation is unabortable.
|
||||
//
|
||||
// Exceptions:
|
||||
// raft::conf_change_in_progress
|
||||
@@ -206,7 +206,7 @@ public:
|
||||
// future has resolved successfully.
|
||||
//
|
||||
// The caller may pass a pointer to an abort_source to make the operation abortable.
|
||||
// It it passes nullptr, the operation is unabortable.
|
||||
// If it passes nullptr, the operation is unabortable.
|
||||
//
|
||||
// Exceptions:
|
||||
// raft::request_aborted
|
||||
@@ -251,9 +251,11 @@ public:
|
||||
// the call as before, but term should be different.
|
||||
//
|
||||
// The caller may pass a pointer to an abort_source to make the function abortable.
|
||||
// It it passes nullptr, the function is unabortable.
|
||||
// If it passes nullptr, the function is unabortable.
|
||||
//
|
||||
// Exceptions:
|
||||
// raft::stopped_error
|
||||
// Thrown if abort() was called on the server instance.
|
||||
// raft::request_aborted
|
||||
// Thrown if abort is requested before the operation finishes.
|
||||
virtual future<> wait_for_state_change(seastar::abort_source* as) = 0;
|
||||
@@ -265,9 +267,11 @@ public:
|
||||
// `raft::server_id`.
|
||||
//
|
||||
// The caller may pass a pointer to an abort_source to make the function abortable.
|
||||
// It it passes nullptr, the function is unabortable.
|
||||
// If it passes nullptr, the function is unabortable.
|
||||
//
|
||||
// Exceptions:
|
||||
// raft::stopped_error
|
||||
// Thrown if abort() was called on the server instance.
|
||||
// raft::request_aborted
|
||||
// Thrown if abort is requested before the operation finishes.
|
||||
virtual future<> wait_for_leader(seastar::abort_source* as) = 0;
|
||||
|
||||
@@ -1211,6 +1211,7 @@ private:
|
||||
}
|
||||
|
||||
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
|
||||
rlogger.debug("Disabling compaction for range={} for incremental repair", _range);
|
||||
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
|
||||
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
|
||||
_rs._repair_compaction_locks[gid].push_back(std::move(lock_holder));
|
||||
@@ -1240,6 +1241,8 @@ private:
|
||||
// compaction.
|
||||
reenablers_and_holders.cres.clear();
|
||||
rlogger.info("Re-enabled compaction for range={} for incremental repair", _range);
|
||||
|
||||
co_await utils::get_local_injector().inject("wait_after_prepare_sstables_for_incremental_repair", utils::wait_for_message(5min));
|
||||
}
|
||||
|
||||
// Read rows from sstable until the size of rows exceeds _max_row_buf_size - current_size
|
||||
@@ -3953,3 +3956,19 @@ future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_ta
|
||||
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
|
||||
co_return progress;
|
||||
}
|
||||
|
||||
void repair_service::on_cleanup_for_drop_table(const table_id& id) {
|
||||
// Prevent repair lock from being leaked in repair_service when table is dropped midway.
|
||||
// The RPC verb that removes the lock on success path will not be called by coordinator after table was dropped.
|
||||
// We also cannot move the lock from repair_service to repair_meta, since the lock must outlive the latter.
|
||||
// Since tablet metadata has been erased at this point, we can simply erase all instances for the dropped table.
|
||||
rlogger.debug("Cleaning up state for dropped table {}", id);
|
||||
for (auto it = _repair_compaction_locks.begin(); it != _repair_compaction_locks.end();) {
|
||||
auto& [global_tid, _] = *it;
|
||||
if (global_tid.table == id) {
|
||||
it = _repair_compaction_locks.erase(it);
|
||||
} else {
|
||||
it++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -318,6 +318,8 @@ public:
|
||||
|
||||
future<uint32_t> get_next_repair_meta_id();
|
||||
|
||||
void on_cleanup_for_drop_table(const table_id& id);
|
||||
|
||||
friend class repair::user_requested_repair_task_impl;
|
||||
friend class repair::data_sync_repair_task_impl;
|
||||
friend class repair::tablet_repair_task_impl;
|
||||
|
||||
@@ -448,6 +448,7 @@ public:
|
||||
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
|
||||
virtual future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) = 0;
|
||||
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;
|
||||
virtual future<> wait_for_background_tablet_resize_work() = 0;
|
||||
|
||||
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;
|
||||
};
|
||||
|
||||
@@ -2415,9 +2415,7 @@ future<> database::apply_hint(schema_ptr s, const frozen_mutation& m, tracing::t
|
||||
if (!s->is_synced()) {
|
||||
on_internal_error(dblog, format("attempted to apply hint using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version()));
|
||||
}
|
||||
return with_scheduling_group(_dbcfg.streaming_scheduling_group, [this, s = std::move(s), &m, tr_state = std::move(tr_state), timeout] () mutable {
|
||||
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{});
|
||||
});
|
||||
return _apply_stage(this, std::move(s), seastar::cref(m), std::move(tr_state), timeout, db::commitlog::force_sync::no, std::monostate{});
|
||||
}
|
||||
|
||||
keyspace::config
|
||||
|
||||
@@ -971,6 +971,8 @@ public:
|
||||
[[gnu::always_inline]] bool uses_tablets() const;
|
||||
int64_t calculate_tablet_count() const;
|
||||
private:
|
||||
void update_tombstone_gc_rf_one();
|
||||
|
||||
future<> clear_inactive_reads_for_tablet(database& db, storage_group& sg);
|
||||
future<> stop_compaction_groups(storage_group& sg);
|
||||
future<> flush_compaction_groups(storage_group& sg);
|
||||
@@ -1356,6 +1358,8 @@ public:
|
||||
// If leave_unsealead is set, all the destination sstables will be left unsealed.
|
||||
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed);
|
||||
|
||||
tombstone_gc_state get_tombstone_gc_state() const;
|
||||
|
||||
friend class compaction_group;
|
||||
friend class compaction::compaction_task_impl;
|
||||
|
||||
@@ -1366,8 +1370,6 @@ public:
|
||||
future<compaction_reenablers_and_lock_holders> get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
|
||||
const service::frozen_topology_guard& guard, dht::token_range range);
|
||||
future<uint64_t> estimated_partitions_in_range(dht::token_range tr) const;
|
||||
private:
|
||||
future<std::vector<compaction::compaction_group_view*>> get_compaction_group_views_for_repair(dht::token_range range);
|
||||
};
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&);
|
||||
|
||||
@@ -778,7 +778,7 @@ future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> do_query
|
||||
auto erm = table.get_effective_replication_map();
|
||||
auto ctx = seastar::make_shared<read_context>(db, s, erm, cmd, ranges, trace_state, timeout);
|
||||
|
||||
tombstone_gc_state gc_state = tombstone_gc_enabled ? table.get_compaction_manager().get_tombstone_gc_state() : tombstone_gc_state::no_gc();
|
||||
tombstone_gc_state gc_state = tombstone_gc_enabled ? table.get_tombstone_gc_state() : tombstone_gc_state::no_gc();
|
||||
|
||||
// Use nested coroutine so each step can fail without exceptions using try_future.
|
||||
auto f = co_await coroutine::as_future(std::invoke([&] -> future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> {
|
||||
|
||||
@@ -609,7 +609,7 @@ future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
|
||||
auto accounter = co_await db.local().get_result_memory_limiter().new_data_read(permit.max_result_size(), short_read_allowed);
|
||||
query_state qs(output_schema, cmd, opts, prs, std::move(accounter));
|
||||
|
||||
auto compaction_state = make_lw_shared<compact_for_query_state>(*output_schema, qs.cmd.timestamp, qs.cmd.slice, qs.remaining_rows(), qs.remaining_partitions(), tombstone_gc_state(nullptr));
|
||||
auto compaction_state = make_lw_shared<compact_for_query_state>(*output_schema, qs.cmd.timestamp, qs.cmd.slice, qs.remaining_rows(), qs.remaining_partitions(), tombstone_gc_state::no_gc());
|
||||
auto partition_key_generator = make_partition_key_generator(db, underlying_schema, prs, ts, timeout);
|
||||
|
||||
auto dk_opt = co_await partition_key_generator();
|
||||
|
||||
@@ -257,7 +257,7 @@ table::make_mutation_reader(schema_ptr s,
|
||||
|
||||
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
|
||||
if (cache_enabled() && !bypass_cache) {
|
||||
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(),
|
||||
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, get_tombstone_gc_state(),
|
||||
get_max_purgeable_fn_for_cache_underlying_reader(), std::move(trace_state), fwd, fwd_mr)) {
|
||||
readers.emplace_back(std::move(*reader_opt));
|
||||
}
|
||||
@@ -286,7 +286,7 @@ sstables::shared_sstable table::make_streaming_staging_sstable() {
|
||||
return newtab;
|
||||
}
|
||||
|
||||
static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, const compaction::compaction_manager& cm,
|
||||
static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, tombstone_gc_state gc_state,
|
||||
gc_clock::time_point compaction_time, bool compaction_enabled, bool compaction_can_gc) {
|
||||
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_enabled", fmt::to_string(compaction_enabled));
|
||||
utils::get_local_injector().set_parameter("maybe_compact_for_streaming", "compaction_can_gc", fmt::to_string(compaction_can_gc));
|
||||
@@ -297,7 +297,7 @@ static mutation_reader maybe_compact_for_streaming(mutation_reader underlying, c
|
||||
std::move(underlying),
|
||||
compaction_time,
|
||||
compaction_can_gc ? can_always_purge : can_never_purge,
|
||||
cm.get_tombstone_gc_state(),
|
||||
gc_state,
|
||||
streamed_mutation::forwarding::no);
|
||||
}
|
||||
|
||||
@@ -320,7 +320,7 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
|
||||
|
||||
return maybe_compact_for_streaming(
|
||||
make_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, nullptr, mutation_reader::forwarding::no),
|
||||
get_compaction_manager(),
|
||||
get_tombstone_gc_state(),
|
||||
compaction_time,
|
||||
_config.enable_compacting_data_for_streaming_and_repair(),
|
||||
_config.enable_tombstone_gc_for_streaming_and_repair());
|
||||
@@ -339,7 +339,7 @@ mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit pe
|
||||
std::move(trace_state), fwd, fwd_mr, sstables::default_sstable_predicate(), sstables::integrity_check::yes));
|
||||
return maybe_compact_for_streaming(
|
||||
make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr),
|
||||
get_compaction_manager(),
|
||||
get_tombstone_gc_state(),
|
||||
compaction_time,
|
||||
_config.enable_compacting_data_for_streaming_and_repair(),
|
||||
_config.enable_tombstone_gc_for_streaming_and_repair());
|
||||
@@ -354,7 +354,7 @@ mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit pe
|
||||
return maybe_compact_for_streaming(
|
||||
sstables->make_range_sstable_reader(std::move(schema), std::move(permit), range, slice,
|
||||
std::move(trace_state), fwd, fwd_mr, sstables::default_read_monitor_generator(), sstables::integrity_check::yes),
|
||||
get_compaction_manager(),
|
||||
get_tombstone_gc_state(),
|
||||
compaction_time,
|
||||
_config.enable_compacting_data_for_streaming_and_repair(),
|
||||
_config.enable_tombstone_gc_for_streaming_and_repair());
|
||||
@@ -750,6 +750,7 @@ public:
|
||||
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
|
||||
}
|
||||
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
|
||||
future<> wait_for_background_tablet_resize_work() override { return make_ready_future<>(); }
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
||||
return get_compaction_group().make_sstable_set();
|
||||
@@ -768,6 +769,13 @@ class tablet_storage_group_manager final : public storage_group_manager {
|
||||
locator::resize_decision::seq_number_t _split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min();
|
||||
future<> _merge_completion_fiber;
|
||||
condition_variable _merge_completion_event;
|
||||
// Ensures that processes such as incremental repair will wait for pending work from
|
||||
// merge fiber before proceeding. This guarantees stability on the compaction groups.
|
||||
// NOTE: it's important that we don't await on the barrier with any compaction group
|
||||
// gate held, since merge fiber will stop groups that in turn await on gate,
|
||||
// potentially causing an ABBA deadlock.
|
||||
utils::phased_barrier _merge_fiber_barrier;
|
||||
std::optional<utils::phased_barrier::operation> _pending_merge_fiber_work;
|
||||
// Holds compaction reenabler which disables compaction temporarily during tablet merge
|
||||
std::vector<compaction::compaction_reenabler> _compaction_reenablers_for_merging;
|
||||
private:
|
||||
@@ -856,6 +864,7 @@ public:
|
||||
, _my_host_id(erm.get_token_metadata().get_my_id())
|
||||
, _tablet_map(&erm.get_token_metadata().tablets().get_tablet_map(schema()->id()))
|
||||
, _merge_completion_fiber(merge_completion_fiber())
|
||||
, _merge_fiber_barrier(format("[table {}.{}] merge_fiber_barrier", _t.schema()->ks_name(), _t.schema()->cf_name()))
|
||||
{
|
||||
storage_group_map ret;
|
||||
|
||||
@@ -908,6 +917,10 @@ public:
|
||||
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
|
||||
return tablet_map().get_token_range_after_split(token);
|
||||
}
|
||||
future<> wait_for_background_tablet_resize_work() override {
|
||||
co_await _merge_fiber_barrier.advance_and_await();
|
||||
co_return;
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_sstable_set() const override {
|
||||
// FIXME: avoid recreation of compound_set for groups which had no change. usually, only one group will be changed at a time.
|
||||
@@ -2117,33 +2130,31 @@ compaction_group::update_repaired_at_for_merge() {
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<compaction::compaction_group_view*>> table::get_compaction_group_views_for_repair(dht::token_range range) {
|
||||
std::vector<compaction::compaction_group_view*> ret;
|
||||
auto sgs = storage_groups_for_token_range(range);
|
||||
for (auto& sg : sgs) {
|
||||
co_await coroutine::maybe_yield();
|
||||
sg->for_each_compaction_group([&ret] (const compaction_group_ptr& cg) {
|
||||
ret.push_back(&cg->view_for_unrepaired_data());
|
||||
});
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<compaction_reenablers_and_lock_holders> table::get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db,
|
||||
const service::frozen_topology_guard& guard, dht::token_range range) {
|
||||
auto ret = compaction_reenablers_and_lock_holders();
|
||||
auto views = co_await get_compaction_group_views_for_repair(range);
|
||||
for (auto view : views) {
|
||||
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(*view);
|
||||
// Waits for background tablet resize work like merge that might destroy compaction groups,
|
||||
// providing stability. Essentially, serializes tablet merge completion handling with
|
||||
// the start of incremental repair, from the replica side.
|
||||
co_await _sg_manager->wait_for_background_tablet_resize_work();
|
||||
|
||||
for (auto sg : storage_groups_for_token_range(range)) {
|
||||
// FIXME: indentation
|
||||
auto cgs = sg->compaction_groups_immediate();
|
||||
for (auto& cg : cgs) {
|
||||
auto gate_holder = cg->async_gate().hold();
|
||||
auto& view = cg->view_for_unrepaired_data();
|
||||
auto cre = co_await db.get_compaction_manager().await_and_disable_compaction(view);
|
||||
tlogger.info("Disabled compaction for range={} session_id={} for incremental repair", range, guard);
|
||||
ret.cres.push_back(std::make_unique<compaction::compaction_reenabler>(std::move(cre)));
|
||||
|
||||
// This lock prevents the unrepaired compaction started by major compaction to run in parallel with repair.
|
||||
// The unrepaired compaction started by minor compaction does not need to take the lock since it ignores
|
||||
// sstables being repaired, so it can run in parallel with repair.
|
||||
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(*view, "row_level_repair");
|
||||
auto lock_holder = co_await db.get_compaction_manager().get_incremental_repair_write_lock(view, "row_level_repair");
|
||||
tlogger.info("Got unrepaired compaction and repair lock for range={} session_id={} for incremental repair", range, guard);
|
||||
ret.lock_holders.push_back(std::move(lock_holder));
|
||||
}
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
@@ -2747,8 +2758,8 @@ public:
|
||||
bool tombstone_gc_enabled() const noexcept override {
|
||||
return _t.tombstone_gc_enabled() && _cg.tombstone_gc_enabled();
|
||||
}
|
||||
const tombstone_gc_state& get_tombstone_gc_state() const noexcept override {
|
||||
return _t.get_compaction_manager().get_tombstone_gc_state();
|
||||
tombstone_gc_state get_tombstone_gc_state() const noexcept override {
|
||||
return _t.get_tombstone_gc_state();
|
||||
}
|
||||
compaction::compaction_backlog_tracker& get_backlog_tracker() override {
|
||||
return _cg.get_backlog_tracker();
|
||||
@@ -2906,6 +2917,8 @@ table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_optio
|
||||
|
||||
recalculate_tablet_count_stats();
|
||||
set_metrics();
|
||||
|
||||
update_tombstone_gc_rf_one();
|
||||
}
|
||||
|
||||
void table::on_flush_timer() {
|
||||
@@ -3015,7 +3028,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
|
||||
|
||||
while (!_t.async_gate().is_closed()) {
|
||||
try {
|
||||
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(60s));
|
||||
co_await utils::get_local_injector().inject("merge_completion_fiber", utils::wait_for_message(5min));
|
||||
auto ks_name = schema()->ks_name();
|
||||
auto cf_name = schema()->cf_name();
|
||||
// Enable compaction after merge is done.
|
||||
@@ -3049,6 +3062,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
|
||||
utils::get_local_injector().inject("replica_merge_completion_wait", [] () {
|
||||
tlogger.info("Merge completion fiber finished, about to sleep");
|
||||
});
|
||||
_pending_merge_fiber_work.reset();
|
||||
co_await _merge_completion_event.wait();
|
||||
tlogger.debug("Merge completion fiber woke up for {}.{}", schema()->ks_name(), schema()->cf_name());
|
||||
}
|
||||
@@ -3107,6 +3121,7 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
|
||||
new_storage_groups[new_tid] = std::move(new_sg);
|
||||
}
|
||||
_storage_groups = std::move(new_storage_groups);
|
||||
_pending_merge_fiber_work = _merge_fiber_barrier.start();
|
||||
_merge_completion_event.signal();
|
||||
}
|
||||
|
||||
@@ -3123,6 +3138,9 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
|
||||
} else if (new_tablet_count < old_tablet_count) {
|
||||
tlogger.info0("Detected tablet merge for table {}.{}, decreasing from {} to {} tablets",
|
||||
schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count);
|
||||
if (utils::get_local_injector().is_enabled("tablet_force_tablet_count_decrease_once")) {
|
||||
utils::get_local_injector().disable("tablet_force_tablet_count_decrease");
|
||||
}
|
||||
handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map);
|
||||
}
|
||||
|
||||
@@ -3216,6 +3234,17 @@ void table::update_effective_replication_map(locator::effective_replication_map_
|
||||
}
|
||||
|
||||
recalculate_tablet_count_stats();
|
||||
|
||||
update_tombstone_gc_rf_one();
|
||||
}
|
||||
|
||||
void table::update_tombstone_gc_rf_one() {
|
||||
auto& st = _compaction_manager.get_shared_tombstone_gc_state();
|
||||
if (_erm && _erm->get_replication_factor() == 1) {
|
||||
st.set_table_rf_one(_schema->id());
|
||||
} else {
|
||||
st.set_table_rf_n(_schema->id());
|
||||
}
|
||||
}
|
||||
|
||||
void table::recalculate_tablet_count_stats() {
|
||||
@@ -3277,7 +3306,7 @@ table::sstables_as_snapshot_source() {
|
||||
std::move(reader),
|
||||
gc_clock::now(),
|
||||
get_max_purgeable_fn_for_cache_underlying_reader(),
|
||||
_compaction_manager.get_tombstone_gc_state().with_commitlog_check_disabled(),
|
||||
get_tombstone_gc_state().with_commitlog_check_disabled(),
|
||||
fwd);
|
||||
}, [this, sst_set] {
|
||||
return make_partition_presence_checker(sst_set);
|
||||
@@ -3287,9 +3316,10 @@ table::sstables_as_snapshot_source() {
|
||||
|
||||
// define in .cc, since sstable is forward-declared in .hh
|
||||
table::~table() {
|
||||
auto& st = _compaction_manager.get_shared_tombstone_gc_state();
|
||||
st.remove_table_from_rf_registry(_schema->id());
|
||||
}
|
||||
|
||||
|
||||
logalloc::occupancy_stats table::occupancy() const {
|
||||
logalloc::occupancy_stats res;
|
||||
for_each_compaction_group([&] (const compaction_group& cg) {
|
||||
@@ -4469,7 +4499,7 @@ table::query(schema_ptr query_schema,
|
||||
|
||||
if (!querier_opt) {
|
||||
querier_base::querier_config conf(_config.tombstone_warn_threshold);
|
||||
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_compaction_manager().get_tombstone_gc_state(), conf);
|
||||
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_tombstone_gc_state(), conf);
|
||||
}
|
||||
auto& q = *querier_opt;
|
||||
|
||||
@@ -4521,7 +4551,7 @@ table::mutation_query(schema_ptr query_schema,
|
||||
querier_opt = std::move(*saved_querier);
|
||||
}
|
||||
if (!querier_opt) {
|
||||
auto tombstone_gc_state = tombstone_gc_enabled ? get_compaction_manager().get_tombstone_gc_state() : tombstone_gc_state::no_gc();
|
||||
auto tombstone_gc_state = tombstone_gc_enabled ? get_tombstone_gc_state() : tombstone_gc_state::no_gc();
|
||||
querier_base::querier_config conf(_config.tombstone_warn_threshold);
|
||||
querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, tombstone_gc_state, conf);
|
||||
}
|
||||
@@ -5097,4 +5127,8 @@ future<uint64_t> table::estimated_partitions_in_range(dht::token_range tr) const
|
||||
co_return partition_count;
|
||||
}
|
||||
|
||||
tombstone_gc_state table::get_tombstone_gc_state() const {
|
||||
return tombstone_gc_state(_compaction_manager.get_shared_tombstone_gc_state());
|
||||
}
|
||||
|
||||
} // namespace replica
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "dht/token.hh"
|
||||
#include "mutation/async_utils.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "dht/fixed_shard.hh"
|
||||
|
||||
namespace replica {
|
||||
|
||||
@@ -97,6 +98,103 @@ schema_ptr make_tablets_schema() {
|
||||
.build();
|
||||
}
|
||||
|
||||
schema_ptr make_raft_schema(sstring name, bool is_group0) {
|
||||
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
|
||||
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
|
||||
if (!is_group0) {
|
||||
if (!strongly_consistent_tables_enabled) {
|
||||
on_internal_error(tablet_logger, "Can't create raft table for strongly consistent tablets when the feature is disabled");
|
||||
}
|
||||
builder.with_column("shard", short_type, column_kind::partition_key);
|
||||
}
|
||||
builder
|
||||
.with_column("group_id", timeuuid_type, column_kind::partition_key)
|
||||
// raft log part
|
||||
.with_column("index", long_type, column_kind::clustering_key)
|
||||
.with_column("term", long_type)
|
||||
.with_column("data", bytes_type) // decltype(raft::log_entry::data) - serialized variant
|
||||
// persisted term and vote
|
||||
.with_column("vote_term", long_type, column_kind::static_column)
|
||||
.with_column("vote", uuid_type, column_kind::static_column)
|
||||
// id of the most recent persisted snapshot
|
||||
.with_column("snapshot_id", uuid_type, column_kind::static_column)
|
||||
.with_column("commit_idx", long_type, column_kind::static_column)
|
||||
|
||||
.with_hash_version()
|
||||
.set_caching_options(caching_options::get_disabled_caching_options());
|
||||
|
||||
if (is_group0) {
|
||||
return builder
|
||||
.set_comment("Persisted RAFT log, votes and snapshot info")
|
||||
.build();
|
||||
} else {
|
||||
return builder
|
||||
.set_comment("Persisted RAFT log, votes and snapshot info for strongly consistent tablets")
|
||||
.with_partitioner(dht::fixed_shard_partitioner::classname)
|
||||
.with_sharder(dht::fixed_shard_sharder::instance())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
schema_ptr make_raft_snapshots_schema(sstring name, bool is_group0) {
|
||||
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
|
||||
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
|
||||
if (!is_group0) {
|
||||
if (!strongly_consistent_tables_enabled) {
|
||||
on_internal_error(tablet_logger, "Can't create raft snapshots table for strongly consistent tablets when the feature is disabled");
|
||||
}
|
||||
builder.with_column("shard", short_type, column_kind::partition_key);
|
||||
}
|
||||
builder
|
||||
.with_column("group_id", timeuuid_type, column_kind::partition_key)
|
||||
.with_column("snapshot_id", uuid_type)
|
||||
// Index and term of last entry in the snapshot
|
||||
.with_column("idx", long_type)
|
||||
.with_column("term", long_type)
|
||||
|
||||
.with_hash_version();
|
||||
if (is_group0) {
|
||||
return builder
|
||||
.set_comment("Persisted RAFT snapshots for strongly consistent tablets")
|
||||
.build();
|
||||
} else {
|
||||
return builder
|
||||
.set_comment("Persisted RAFT snapshot descriptors info for strongly consistent tablets")
|
||||
.with_partitioner(dht::fixed_shard_partitioner::classname)
|
||||
.with_sharder(dht::fixed_shard_sharder::instance())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
schema_ptr make_raft_snapshot_config_schema(sstring name, bool is_group0) {
|
||||
auto id = generate_legacy_id(db::system_keyspace::NAME, name);
|
||||
auto builder = schema_builder(db::system_keyspace::NAME, name, std::optional(id));
|
||||
if (!is_group0) {
|
||||
if (!strongly_consistent_tables_enabled) {
|
||||
on_internal_error(tablet_logger, "Can't create raft snapshot config table for strongly consistent tablets when the feature is disabled");
|
||||
}
|
||||
builder.with_column("shard", short_type, column_kind::partition_key);
|
||||
}
|
||||
builder
|
||||
.with_column("group_id", timeuuid_type, column_kind::partition_key)
|
||||
.with_column("disposition", ascii_type, column_kind::clustering_key) // can be 'CURRENT` or `PREVIOUS'
|
||||
.with_column("server_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("can_vote", boolean_type)
|
||||
|
||||
.with_hash_version();
|
||||
if (is_group0) {
|
||||
return builder
|
||||
.set_comment("RAFT configuration for the latest snapshot descriptor")
|
||||
.build();
|
||||
} else {
|
||||
return builder
|
||||
.set_comment("RAFT configuration for the snapshot descriptor for strongly consistent tablets")
|
||||
.with_partitioner(dht::fixed_shard_partitioner::classname)
|
||||
.with_sharder(dht::fixed_shard_sharder::instance())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<data_value> replicas_to_data_value(const tablet_replica_set& replicas) {
|
||||
std::vector<data_value> result;
|
||||
result.reserve(replicas.size());
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user