Compare commits

..

3 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
3f31d8a5b8 Refactor to use table::add_new_sstable() and add test
- Add table::add_new_sstable() method that wraps error handling
- Update streaming/consumer.cc to use new method
- Update streaming/stream_blob.cc to use new method
- Add test_add_new_sstable_cleanup_on_failure test to verify cleanup

Addresses feedback from @tgrabiec to extract common logic into a reusable method.

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-03 17:49:22 +00:00
copilot-swe-agent[bot]
f9d6e36f02 Add cleanup of sstables on add_sstable_and_update_cache failure
Fix issue where streaming consumer leaves sealed sstables on disk when
add_sstable_and_update_cache() fails. This prevents data resurrection
and tablet split issues.

Changes:
- streaming/consumer.cc: Wrap add_sstable_and_update_cache with try-catch
  and call sst->unlink() on failure before rethrowing
- streaming/stream_blob.cc: Add same cleanup logic in load_sstable_for_tablet

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-03 17:14:59 +00:00
copilot-swe-agent[bot]
f40dd06156 Initial plan 2025-12-03 17:09:17 +00:00
142 changed files with 850 additions and 2556 deletions

View File

@@ -1,182 +0,0 @@
# SCYLLA_ASSERT to scylla_assert() Conversion Summary
## Objective
Replace crash-inducing `SCYLLA_ASSERT` with exception-throwing `scylla_assert()` to prevent cluster-wide crashes and maintain availability.
## What Was Done
### 1. Infrastructure Implementation ✓
Created new `scylla_assert()` macro in `utils/assert.hh`:
- Based on `on_internal_error()` for exception-based error handling
- Supports optional custom error messages via variadic arguments
- Uses `seastar::format()` for string formatting
- Compatible with C++23 standard (uses `__VA_OPT__`)
**Key difference from SCYLLA_ASSERT:**
```cpp
// Old: Crashes the process immediately
SCYLLA_ASSERT(condition);
// New: Throws exception (or aborts based on config)
scylla_assert(condition);
scylla_assert(condition, "custom error message: {}", value);
```
### 2. Comprehensive Analysis ✓
Analyzed entire codebase to identify safe vs unsafe conversion locations:
**Statistics:**
- Total SCYLLA_ASSERT usages: ~1307 (including tests)
- Non-test usages: ~886
- **Unsafe to convert**: 223 usages (25%)
- In noexcept functions: 187 usages across 50 files
- In destructors: 36 usages across 25 files
- **Safe to convert**: ~668 usages (75%)
- **Converted in this PR**: 112 usages (16.8% of safe conversions)
### 3. Documentation ✓
Created comprehensive documentation:
1. **Conversion Guide** (`docs/dev/scylla_assert_conversion.md`)
- Explains safe vs unsafe contexts
- Provides conversion strategy
- Lists all completed conversions
- Includes testing guidance
2. **Unsafe Locations Report** (`docs/dev/unsafe_scylla_assert_locations.md`)
- Detailed listing of 223 unsafe locations
- Organized by file with line numbers
- Separated into noexcept and destructor categories
### 4. Sample Conversions ✓
Converted 112 safe SCYLLA_ASSERT usages across 32 files as demonstration:
| File | Conversions | Context |
|------|------------|---------|
| db/large_data_handler.{cc,hh} | 5 | Future-returning functions |
| db/schema_applier.cc | 1 | Coroutine function |
| db/system_distributed_keyspace.cc | 1 | Regular function |
| db/commitlog/commitlog_replayer.cc | 1 | Coroutine function |
| db/view/row_locking.cc | 2 | Regular function |
| db/size_estimates_virtual_reader.cc | 1 | Lambda in coroutine |
| db/corrupt_data_handler.cc | 2 | Lambdas in future-returning function |
| raft/tracker.cc | 2 | Unreachable code (switch defaults) |
| service/topology_coordinator.cc | 11 | Coroutine functions (topology operations) |
| service/storage_service.cc | 28 | Critical node lifecycle operations |
| sstables/* (22 files) | 58 | SSTable operations (read/write/compress/index) |
All conversions were in **safe contexts** (non-noexcept, non-destructor functions). 3 assertions in storage_service.cc remain as SCYLLA_ASSERT (in noexcept functions).
## Why These Cannot Be Converted
### Unsafe Context #1: noexcept Functions (187 usages)
**Problem**: Throwing from noexcept causes `std::terminate()`, same as crash.
**Example** (from `locator/production_snitch_base.hh`):
```cpp
virtual bool prefer_local() const noexcept override {
SCYLLA_ASSERT(_backreference != nullptr); // Cannot convert!
return _backreference->prefer_local();
}
```
**Solution for these**: Keep as SCYLLA_ASSERT or use `on_fatal_internal_error()`.
### Unsafe Context #2: Destructors (36 usages)
**Problem**: Destructors are implicitly noexcept, throwing causes `std::terminate()`.
**Example** (from `utils/file_lock.cc`):
```cpp
~file_lock() noexcept {
if (_fd.get() != -1) {
SCYLLA_ASSERT(_fd.get() != -1); // Cannot convert!
auto r = ::flock(_fd.get(), LOCK_UN);
SCYLLA_ASSERT(r == 0); // Cannot convert!
}
}
```
**Solution for these**: Keep as SCYLLA_ASSERT.
## Benefits of scylla_assert()
1. **Prevents Cluster-Wide Crashes**
- Exception can be caught and handled gracefully
- Failed node doesn't bring down entire cluster
2. **Maintains Availability**
- Service can continue with degraded functionality
- Better than complete crash
3. **Better Error Reporting**
- Includes backtrace via `on_internal_error()`
- Supports custom error messages
- Configurable abort-on-error for testing
4. **Backward Compatible**
- SCYLLA_ASSERT still exists for unsafe contexts
- Can be gradually adopted
## Testing
- Created manual test in `test/manual/test_scylla_assert.cc`
- Verifies passing and failing assertions
- Tests custom error messages
- Code review passed with improvements made
## Next Steps (Future Work)
1. **Gradual Conversion**
- Convert remaining ~653 safe SCYLLA_ASSERT usages incrementally
- Prioritize high-impact code paths first
2. **Review noexcept Functions**
- Evaluate if some can be made non-noexcept
- Consider using `on_fatal_internal_error()` where appropriate
3. **Integration Testing**
- Run full test suite with conversions
- Monitor for any unexpected behavior
- Validate exception propagation
4. **Automated Analysis Tool**
- Create tool to identify safe conversion candidates
- Generate conversion patches automatically
- Track conversion progress
## Files Modified in This PR
### Core Implementation
- `utils/assert.hh` - Added scylla_assert() macro
### Conversions
- `db/large_data_handler.cc`
- `db/large_data_handler.hh`
- `db/schema_applier.cc`
- `db/system_distributed_keyspace.cc`
- `db/commitlog/commitlog_replayer.cc`
- `db/view/row_locking.cc`
- `db/size_estimates_virtual_reader.cc`
- `db/corrupt_data_handler.cc`
- `raft/tracker.cc`
- `service/topology_coordinator.cc`
- `service/storage_service.cc`
- `sstables/` (22 files across trie/, mx/, and core sstables)
### Documentation
- `docs/dev/scylla_assert_conversion.md`
- `docs/dev/unsafe_scylla_assert_locations.md`
- `test/manual/test_scylla_assert.cc`
## Conclusion
This PR establishes the infrastructure and methodology for replacing SCYLLA_ASSERT with scylla_assert() to improve cluster availability. The sample conversions demonstrate the approach, while comprehensive documentation enables future work.
**Key Achievement**: Provided a safe path forward for converting 75% (~668) of SCYLLA_ASSERT usages to exception-based assertions, while clearly documenting the 25% (~223) that must remain as crash-inducing assertions due to language constraints. Converted 112 usages as demonstration (16.8% of safe conversions), prioritizing critical files like storage_service.cc (node lifecycle) and all sstables files (data persistence), with ~556 remaining.

View File

@@ -9,7 +9,6 @@ target_sources(scylla_auth
allow_all_authorizer.cc
authenticated_user.cc
authenticator.cc
cache.cc
certificate_authenticator.cc
common.cc
default_authorizer.cc

View File

@@ -23,7 +23,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -12,7 +12,6 @@
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
@@ -30,7 +29,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&) {
}
virtual future<> start() override {

View File

@@ -1,180 +0,0 @@
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "auth/cache.hh"
#include "auth/common.hh"
#include "auth/roles-metadata.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
auto it = _roles.find(role);
if (it == _roles.end()) {
return {};
}
return it->second;
}
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
auto rec = make_lw_shared<role_record>();
rec->version = _current_version;
auto fetch = [this, &role](const sstring& q) {
return _qp.execute_internal(q, db::consistency_level::LOCAL_ONE,
internal_distributed_query_state(), {role},
cql3::query_processor::cache_internal::yes);
};
// roles
{
static const sstring q = format("SELECT * FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, meta::roles_table::name);
auto rs = co_await fetch(q);
if (!rs->empty()) {
auto& r = rs->one();
rec->is_superuser = r.get_or<bool>("is_superuser", false);
rec->can_login = r.get_or<bool>("can_login", false);
rec->salted_hash = r.get_or<sstring>("salted_hash", "");
if (r.has("member_of")) {
auto mo = r.get_set<sstring>("member_of");
rec->member_of.insert(
std::make_move_iterator(mo.begin()),
std::make_move_iterator(mo.end()));
}
} else {
// role got deleted
co_return nullptr;
}
}
// members
{
static const sstring q = format("SELECT role, member FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_MEMBERS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->members.insert(r.get_as<sstring>("member"));
co_await coroutine::maybe_yield();
}
}
// attributes
{
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->attributes[r.get_as<sstring>("name")] =
r.get_as<sstring>("value");
co_await coroutine::maybe_yield();
}
}
// permissions
{
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
auto resource = r.get_as<sstring>("resource");
auto perms_strings = r.get_set<sstring>("permissions");
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
auto pset = permissions::from_strings(perms_set);
rec->permissions[std::move(resource)] = std::move(pset);
co_await coroutine::maybe_yield();
}
}
co_return rec;
}
future<> cache::prune_all() noexcept {
for (auto it = _roles.begin(); it != _roles.end(); ) {
if (it->second->version != _current_version) {
_roles.erase(it++);
co_await coroutine::maybe_yield();
} else {
++it;
}
}
co_return;
}
future<> cache::load_all() {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
++_current_version;
logger.info("Loading all roles");
const uint32_t page_size = 128;
auto loader = [this](const cql3::untyped_result_set::row& r) -> future<stop_iteration> {
const auto name = r.get_as<sstring>("role");
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
}
co_return stop_iteration::no;
};
co_await _qp.query_internal(format("SELECT * FROM {}.{}",
db::system_keyspace::NAME, meta::roles_table::name),
db::consistency_level::LOCAL_ONE, {}, page_size, loader);
co_await prune_all();
for (const auto& [name, role] : _roles) {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
c._current_version = _current_version;
co_await c.prune_all();
});
}
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
} else {
_roles.erase(name);
}
co_await distribute_role(name, role);
}
}
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
if (!role_ptr) {
c._roles.erase(name);
return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c._roles[name] = std::move(role_copy);
});
}
bool cache::includes_table(const table_id& id) noexcept {
return id == db::system_keyspace::roles()->id()
|| id == db::system_keyspace::role_members()->id()
|| id == db::system_keyspace::role_attributes()->id()
|| id == db::system_keyspace::role_permissions()->id();
}
} // namespace auth

View File

@@ -1,61 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <absl/container/flat_hash_map.h>
#include "auth/permission.hh"
#include "auth/common.hh"
namespace cql3 { class query_processor; }
namespace auth {
class cache : public peering_sharded_service<cache> {
public:
using role_name_t = sstring;
using version_tag_t = char;
struct role_record {
bool can_login = false;
bool is_superuser = false;
std::unordered_set<role_name_t> member_of;
std::unordered_set<role_name_t> members;
sstring salted_hash;
std::unordered_map<sstring, sstring> attributes;
std::unordered_map<sstring, permission_set> permissions;
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
static bool includes_table(const table_id&) noexcept;
private:
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
};
} // namespace auth

View File

@@ -48,10 +48,6 @@ extern constinit const std::string_view AUTH_PACKAGE_NAME;
} // namespace meta
constexpr std::string_view PERMISSIONS_CF = "role_permissions";
constexpr std::string_view ROLE_MEMBERS_CF = "role_members";
constexpr std::string_view ROLE_ATTRIBUTES_CF = "role_attributes";
// This is a helper to check whether auth-v2 is on.
bool legacy_mode(cql3::query_processor& qp);

View File

@@ -37,6 +37,7 @@ std::string_view default_authorizer::qualified_java_name() const {
static constexpr std::string_view ROLE_NAME = "role";
static constexpr std::string_view RESOURCE_NAME = "resource";
static constexpr std::string_view PERMISSIONS_NAME = "permissions";
static constexpr std::string_view PERMISSIONS_CF = "role_permissions";
static logging::logger alogger("default_authorizer");

View File

@@ -83,18 +83,17 @@ static const class_registrator<
ldap_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(ldap_role_manager_full_name);
::service::migration_manager&> registration(ldap_role_manager_full_name);
ldap_role_manager::ldap_role_manager(
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
: _std_mgr(qp, rg0c, mm, cache), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
: _std_mgr(qp, rg0c, mm), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
, _bind_password(bind_password)
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this))) {
}
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
: ldap_role_manager(
qp.db().get_config().ldap_url_template(),
qp.db().get_config().ldap_attr_role(),
@@ -102,8 +101,7 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
qp.db().get_config().ldap_bind_passwd(),
qp,
rg0c,
mm,
cache) {
mm) {
}
std::string_view ldap_role_manager::qualified_java_name() const noexcept {

View File

@@ -14,7 +14,6 @@
#include "ent/ldap/ldap_connection.hh"
#include "standard_role_manager.hh"
#include "auth/cache.hh"
namespace auth {
@@ -44,13 +43,12 @@ class ldap_role_manager : public role_manager {
std::string_view bind_password, ///< LDAP bind credentials.
cql3::query_processor& qp, ///< Passed to standard_role_manager.
::service::raft_group0_client& rg0c, ///< Passed to standard_role_manager.
::service::migration_manager& mm, ///< Passed to standard_role_manager.
cache& cache ///< Passed to standard_role_manager.
::service::migration_manager& mm ///< Passed to standard_role_manager.
);
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
/// class_registrator<role_manager>.
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm);
/// Thrown when query-template parsing fails.
struct url_error : public std::runtime_error {

View File

@@ -11,7 +11,6 @@
#include <seastar/core/future.hh>
#include <stdexcept>
#include <string_view>
#include "auth/cache.hh"
#include "cql3/description.hh"
#include "utils/class_registrator.hh"
@@ -24,8 +23,7 @@ static const class_registrator<
maintenance_socket_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(sstring{maintenance_socket_role_manager_name});
::service::migration_manager&> registration(sstring{maintenance_socket_role_manager_name});
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {

View File

@@ -8,7 +8,6 @@
#pragma once
#include "auth/cache.hh"
#include "auth/resource.hh"
#include "auth/role_manager.hh"
#include <seastar/core/future.hh>
@@ -30,7 +29,7 @@ extern const std::string_view maintenance_socket_role_manager_name;
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
class maintenance_socket_role_manager final : public role_manager {
public:
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {}
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -49,7 +49,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -64,11 +63,10 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
, _hashing_worker(hashing_worker)
@@ -317,20 +315,11 @@ future<authenticated_user> password_authenticator::authenticate(
const sstring password = credentials.at(PASSWORD_KEY);
try {
std::optional<sstring> salted_hash;
if (legacy_mode(_qp)) {
salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
} else {
auto role = _cache.get(username);
if (!role || role->salted_hash.empty()) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
salted_hash = role->salted_hash;
const std::optional<sstring> salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash = std::move(salted_hash)]{
return passwords::check(password, *salted_hash);
});
if (!password_match) {

View File

@@ -16,7 +16,6 @@
#include "db/consistency_level_type.hh"
#include "auth/authenticator.hh"
#include "auth/passwords.hh"
#include "auth/cache.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
@@ -42,7 +41,6 @@ class password_authenticator : public authenticator {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser; // default superuser name from the config (may or may not be present in roles table)
@@ -55,7 +53,7 @@ public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
~password_authenticator();

View File

@@ -35,10 +35,9 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -11,7 +11,6 @@
#pragma once
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
@@ -30,7 +29,7 @@ namespace auth {
class saslauthd_authenticator : public authenticator {
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
public:
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
future<> start() override;

View File

@@ -17,7 +17,6 @@
#include <chrono>
#include <seastar/core/future-util.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
@@ -158,7 +157,6 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
service::service(
utils::loading_cache_config c,
cache& cache,
cql3::query_processor& qp,
::service::raft_group0_client& g0,
::service::migration_notifier& mn,
@@ -168,7 +166,6 @@ service::service(
maintenance_socket_enabled used_by_maintenance_socket)
: _loading_cache_config(std::move(c))
, _permissions_cache(nullptr)
, _cache(cache)
, _qp(qp)
, _group0_client(g0)
, _mnotifier(mn)
@@ -191,17 +188,15 @@ service::service(
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache,
utils::alien_worker& hashing_worker)
: service(
std::move(c),
cache,
qp,
g0,
mn,
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm),
used_by_maintenance_socket) {
}
@@ -237,9 +232,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
auto auth_version = co_await sys_ks.get_auth_version();
// version is set in query processor to be easily available in various places we call auth::legacy_mode check.
_qp.auth_version = auth_version;
if (this_shard_id() == 0) {
co_await _cache.load_all();
}
if (!_used_by_maintenance_socket) {
// this legacy keyspace is only used by cqlsh
// it's needed when executing `list roles` or `list users`

View File

@@ -21,7 +21,6 @@
#include "auth/authorizer.hh"
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "auth/cache.hh"
#include "auth/role_manager.hh"
#include "auth/common.hh"
#include "cql3/description.hh"
@@ -78,7 +77,6 @@ public:
class service final : public seastar::peering_sharded_service<service> {
utils::loading_cache_config _loading_cache_config;
std::unique_ptr<permissions_cache> _permissions_cache;
cache& _cache;
cql3::query_processor& _qp;
@@ -109,7 +107,6 @@ class service final : public seastar::peering_sharded_service<service> {
public:
service(
utils::loading_cache_config,
cache& cache,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_notifier&,
@@ -131,7 +128,6 @@ public:
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&,
utils::alien_worker&);
future<> start(::service::migration_manager&, db::system_keyspace&);

View File

@@ -41,6 +41,21 @@
namespace auth {
namespace meta {
namespace role_members_table {
constexpr std::string_view name{"role_members" , 12};
}
namespace role_attributes_table {
constexpr std::string_view name{"role_attributes", 15};
}
}
static logging::logger log("standard_role_manager");
@@ -49,8 +64,7 @@ static const class_registrator<
standard_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
::service::migration_manager&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
sstring name;
@@ -107,11 +121,10 @@ static bool has_can_login(const cql3::untyped_result_set_row& row) {
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob_unfragmented("can_login")).is_null());
}
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
{}
@@ -123,7 +136,7 @@ std::string_view standard_role_manager::qualified_java_name() const noexcept {
const resource_set& standard_role_manager::protected_resources() const {
static const resource_set resources({
make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name),
make_data_resource(meta::legacy::AUTH_KS, ROLE_MEMBERS_CF)});
make_data_resource(meta::legacy::AUTH_KS, meta::role_members_table::name)});
return resources;
}
@@ -147,7 +160,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY (role, member)"
")",
meta::legacy::AUTH_KS,
ROLE_MEMBERS_CF);
meta::role_members_table::name);
static const sstring create_role_attributes_query = seastar::format(
"CREATE TABLE {}.{} ("
" role text,"
@@ -156,7 +169,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY(role, name)"
")",
meta::legacy::AUTH_KS,
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
return when_all_succeed(
create_legacy_metadata_table_if_missing(
meta::roles_table::name,
@@ -164,12 +177,12 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
create_roles_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
ROLE_MEMBERS_CF,
meta::role_members_table::name,
_qp,
create_role_members_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
ROLE_ATTRIBUTES_CF,
meta::role_attributes_table::name,
_qp,
create_role_attributes_query,
_migration_manager)).discard_result();
@@ -416,7 +429,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto revoke_from_members = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("SELECT member FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
const auto members = co_await _qp.execute_internal(
query,
consistency_for_role(role_name),
@@ -448,7 +461,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto remove_attributes_of = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name)},
cql3::query_processor::cache_internal::yes).discard_result();
@@ -504,7 +517,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::add: {
const sstring insert_query = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
co_return co_await _qp.execute_internal(
insert_query,
consistency_for_role(role_name),
@@ -516,7 +529,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::remove: {
const sstring delete_query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
co_return co_await _qp.execute_internal(
delete_query,
consistency_for_role(role_name),
@@ -554,12 +567,12 @@ standard_role_manager::modify_membership(
case membership_change::add:
modify_role_members = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
break;
case membership_change::remove:
modify_role_members = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
break;
default:
on_internal_error(log, format("unknown membership_change value: {}", int(ch)));
@@ -653,7 +666,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
const sstring query = seastar::format("SELECT * FROM {}.{}",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
const auto results = co_await _qp.execute_internal(
query,
@@ -718,21 +731,15 @@ future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
}
future<bool> standard_role_manager::can_login(std::string_view role_name) {
if (legacy_mode(_qp)) {
const auto r = co_await require_record(_qp, role_name);
co_return r.can_login;
}
auto role = _cache.get(sstring(role_name));
if (!role) {
throw nonexistant_role(role_name);
}
co_return role->can_login;
return require_record(_qp, role_name).then([](record r) {
return r.can_login;
});
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
if (!result_set->empty()) {
const cql3::untyped_result_set_row &row = result_set->one();
@@ -763,7 +770,7 @@ future<> standard_role_manager::set_attribute(std::string_view role_name, std::s
}
const sstring query = seastar::format("INSERT INTO {}.{} (role, name, value) VALUES (?, ?, ?)",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {
@@ -778,7 +785,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
}
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {

View File

@@ -10,7 +10,6 @@
#include "auth/common.hh"
#include "auth/role_manager.hh"
#include "auth/cache.hh"
#include <string_view>
@@ -37,14 +36,13 @@ class standard_role_manager final : public role_manager {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser;
shared_promise<> _superuser_created_promise;
public:
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -13,7 +13,6 @@
#include "auth/authorizer.hh"
#include "auth/default_authorizer.hh"
#include "auth/password_authenticator.hh"
#include "auth/cache.hh"
#include "auth/permission.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
@@ -38,8 +37,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, hashing_worker)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -241,7 +240,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&,
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<

View File

@@ -445,7 +445,6 @@ ldap_tests = set([
scylla_tests = set([
'test/boost/combined_tests',
'test/boost/UUID_test',
'test/boost/url_parse_test',
'test/boost/advanced_rpc_compressor_test',
'test/boost/allocation_strategy_test',
'test/boost/alternator_unit_test',
@@ -1196,7 +1195,6 @@ scylla_core = (['message/messaging_service.cc',
'auth/allow_all_authorizer.cc',
'auth/authenticated_user.cc',
'auth/authenticator.cc',
'auth/cache.cc',
'auth/common.cc',
'auth/default_authorizer.cc',
'auth/resource.cc',
@@ -1648,7 +1646,6 @@ deps['test/boost/bytes_ostream_test'] = [
]
deps['test/boost/input_stream_test'] = ['test/boost/input_stream_test.cc']
deps['test/boost/UUID_test'] = ['clocks-impl.cc', 'utils/UUID_gen.cc', 'test/boost/UUID_test.cc', 'utils/uuid.cc', 'utils/dynamic_bitset.cc', 'utils/hashers.cc', 'utils/on_internal_error.cc']
deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test.cc', ]
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']

View File

@@ -575,15 +575,6 @@ usingTimeoutServiceLevelClauseObjective[std::unique_ptr<cql3::attributes::raw>&
| serviceLevel sl_name=serviceLevelOrRoleName { attrs->service_level = std::move(sl_name); }
;
usingTimeoutConcurrencyClause[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_USING usingTimeoutConcurrencyClauseObjective[attrs] ( K_AND usingTimeoutConcurrencyClauseObjective[attrs] )*
;
usingTimeoutConcurrencyClauseObjective[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_TIMEOUT to=term { attrs->timeout = std::move(to); }
| K_CONCURRENCY c=term { attrs->concurrency = std::move(c); }
;
/**
* UPDATE <CF>
* USING TIMESTAMP <long>
@@ -675,7 +666,7 @@ pruneMaterializedViewStatement returns [std::unique_ptr<raw::select_statement> e
auto attrs = std::make_unique<cql3::attributes::raw>();
expression wclause = conjunction{};
}
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingTimeoutConcurrencyClause[attrs] )?
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingClause[attrs] )?
{
auto params = make_lw_shared<raw::select_statement::parameters>(std::move(orderings), is_distinct, allow_filtering, statement_subtype, bypass_cache);
return std::make_unique<raw::select_statement>(std::move(cf), std::move(params),
@@ -2379,7 +2370,6 @@ K_LIKE: L I K E;
K_TIMEOUT: T I M E O U T;
K_PRUNE: P R U N E;
K_CONCURRENCY: C O N C U R R E N C Y;
K_EXECUTE: E X E C U T E;

View File

@@ -20,21 +20,19 @@
namespace cql3 {
std::unique_ptr<attributes> attributes::none() {
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}, {}}};
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}}};
}
attributes::attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency)
std::optional<sstring> service_level)
: _timestamp_unset_guard(timestamp)
, _timestamp{std::move(timestamp)}
, _time_to_live_unset_guard(time_to_live)
, _time_to_live{std::move(time_to_live)}
, _timeout{std::move(timeout)}
, _service_level(std::move(service_level))
, _concurrency{std::move(concurrency)}
{ }
bool attributes::is_timestamp_set() const {
@@ -53,10 +51,6 @@ bool attributes::is_service_level_set() const {
return bool(_service_level);
}
bool attributes::is_concurrency_set() const {
return bool(_concurrency);
}
int64_t attributes::get_timestamp(int64_t now, const query_options& options) {
if (!_timestamp.has_value() || _timestamp_unset_guard.is_unset(options)) {
return now;
@@ -129,27 +123,6 @@ qos::service_level_options attributes::get_service_level(qos::service_level_cont
return sl_controller.get_service_level(sl_name).slo;
}
std::optional<int32_t> attributes::get_concurrency(const query_options& options) const {
if (!_concurrency.has_value()) {
return std::nullopt;
}
cql3::raw_value concurrency_raw = expr::evaluate(*_concurrency, options);
if (concurrency_raw.is_null()) {
throw exceptions::invalid_request_exception("Invalid null value of concurrency");
}
int32_t concurrency;
try {
concurrency = concurrency_raw.view().validate_and_deserialize<int32_t>(*int32_type);
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception("Invalid concurrency value");
}
if (concurrency <= 0) {
throw exceptions::invalid_request_exception("Concurrency must be a positive integer");
}
return concurrency;
}
void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timestamp.has_value()) {
expr::fill_prepare_context(*_timestamp, ctx);
@@ -160,13 +133,10 @@ void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timeout.has_value()) {
expr::fill_prepare_context(*_timeout, ctx);
}
if (_concurrency.has_value()) {
expr::fill_prepare_context(*_concurrency, ctx);
}
}
std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const {
std::optional<expr::expression> ts, ttl, to, conc;
std::optional<expr::expression> ts, ttl, to;
if (timestamp.has_value()) {
ts = prepare_expression(*timestamp, db, ks_name, nullptr, timestamp_receiver(ks_name, cf_name));
@@ -183,12 +153,7 @@ std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database d
verify_no_aggregate_functions(*timeout, "USING clause");
}
if (concurrency.has_value()) {
conc = prepare_expression(*concurrency, db, ks_name, nullptr, concurrency_receiver(ks_name, cf_name));
verify_no_aggregate_functions(*concurrency, "USING clause");
}
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level), std::move(conc)}};
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level)}};
}
lw_shared_ptr<column_specification> attributes::raw::timestamp_receiver(const sstring& ks_name, const sstring& cf_name) const {
@@ -203,8 +168,4 @@ lw_shared_ptr<column_specification> attributes::raw::timeout_receiver(const sstr
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[timeout]", true), duration_type);
}
lw_shared_ptr<column_specification> attributes::raw::concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const {
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[concurrency]", true), data_type_for<int32_t>());
}
}

View File

@@ -36,15 +36,13 @@ private:
std::optional<cql3::expr::expression> _time_to_live;
std::optional<cql3::expr::expression> _timeout;
std::optional<sstring> _service_level;
std::optional<cql3::expr::expression> _concurrency;
public:
static std::unique_ptr<attributes> none();
private:
attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency);
std::optional<sstring> service_level);
public:
bool is_timestamp_set() const;
@@ -54,8 +52,6 @@ public:
bool is_service_level_set() const;
bool is_concurrency_set() const;
int64_t get_timestamp(int64_t now, const query_options& options);
std::optional<int32_t> get_time_to_live(const query_options& options);
@@ -64,8 +60,6 @@ public:
qos::service_level_options get_service_level(qos::service_level_controller& sl_controller) const;
std::optional<int32_t> get_concurrency(const query_options& options) const;
void fill_prepare_context(prepare_context& ctx);
class raw final {
@@ -74,7 +68,6 @@ public:
std::optional<cql3::expr::expression> time_to_live;
std::optional<cql3::expr::expression> timeout;
std::optional<sstring> service_level;
std::optional<cql3::expr::expression> concurrency;
std::unique_ptr<attributes> prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const;
private:
@@ -83,8 +76,6 @@ public:
lw_shared_ptr<column_specification> time_to_live_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> timeout_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const;
};
};

View File

@@ -279,15 +279,11 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
}
// Regular secondary indexes require rf-rack-validity.
// Custom indexes need to validate this property themselves, if they need it.
if (!_properties || !_properties->custom_class) {
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
validate_for_local_index(*schema);

View File

@@ -21,7 +21,7 @@ namespace cql3 {
namespace statements {
static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, std::vector<query::clustering_range> clustering_bounds, view_ptr view,
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration, size_t concurrency) {
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration) {
auto key_columns = std::ranges::to<std::vector<const column_definition*>>(
view->all_columns()
| std::views::filter([] (const column_definition& cdef) { return cdef.is_primary_key(); })
@@ -35,7 +35,7 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
auto p = service::pager::query_pagers::ghost_row_deleting_pager(schema_ptr(view), selection, state,
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration, concurrency);
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration);
int32_t page_size = std::max(options.get_page_size(), 1000);
auto now = gc_clock::now();
@@ -62,8 +62,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> prune_materialized
auto timeout_duration = get_timeout(state.get_client_state(), options);
dht::partition_range_vector key_ranges = _restrictions->get_partition_key_ranges(options);
std::vector<query::clustering_range> clustering_bounds = _restrictions->get_clustering_bounds(options);
size_t concurrency = _attrs->is_concurrency_set() ? _attrs->get_concurrency(options).value() : 1;
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration, concurrency).then([] {
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(::make_shared<cql_transport::messages::result_message::void_message>());
});
}

View File

@@ -165,7 +165,7 @@ future<> db::commitlog_replayer::impl::init() {
future<db::commitlog_replayer::impl::stats>
db::commitlog_replayer::impl::recover(const commitlog::descriptor& d, const commitlog::replay_state& rpstate) const {
scylla_assert(_column_mappings.local_is_initialized());
SCYLLA_ASSERT(_column_mappings.local_is_initialized());
replay_position rp{d};
auto gp = min_pos(rp.shard_id());

View File

@@ -1172,17 +1172,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"* default_weight: (Default: 1 **) How many requests are handled during each turn of the RoundRobin.\n"
"* weights: (Default: Keyspace: 1) Takes a list of keyspaces. It sets how many requests are handled during each turn of the RoundRobin, based on the request_scheduler_id.")
/**
* @Group Vector search settings
* @GroupDescription Settings for configuring and tuning vector search functionality.
*/
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
/**
* @Group Security properties
* @GroupDescription Server and client security settings.
*/
@@ -1470,6 +1459,13 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
, vector_store_primary_uri(
this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "", "A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in vector_store_primary_uri and vector_store_secondary_uri. The available options are:\n"
"* truststore: (Default: <not set. use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")

View File

@@ -344,9 +344,6 @@ public:
named_value<sstring> request_scheduler;
named_value<sstring> request_scheduler_id;
named_value<string_map> request_scheduler_options;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<sstring> authenticator;
named_value<sstring> internode_authenticator;
named_value<sstring> authorizer;
@@ -474,6 +471,10 @@ public:
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<bool> abort_on_ebadf;
named_value<bool> sanitizer_report_backtrace;

View File

@@ -10,7 +10,6 @@
#include "reader_concurrency_semaphore.hh"
#include "replica/database.hh"
#include "utils/UUID_gen.hh"
#include "utils/assert.hh"
static logging::logger corrupt_data_logger("corrupt_data");
@@ -76,14 +75,14 @@ future<corrupt_data_handler::entry_id> system_table_corrupt_data_handler::do_rec
auto set_cell_raw = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, managed_bytes cell_value) {
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
scylla_assert(cdef);
SCYLLA_ASSERT(cdef);
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value, _entry_ttl));
};
auto set_cell = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, data_value cell_value) {
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
scylla_assert(cdef);
SCYLLA_ASSERT(cdef);
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value.serialize_nonnull(), _entry_ttl));
};

View File

@@ -39,7 +39,7 @@ large_data_handler::large_data_handler(uint64_t partition_threshold_bytes, uint6
}
future<large_data_handler::partition_above_threshold> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows) {
scylla_assert(running());
SCYLLA_ASSERT(running());
partition_above_threshold above_threshold{partition_size > _partition_threshold_bytes, rows > _rows_count_threshold};
static_assert(std::is_same_v<decltype(above_threshold.size), bool>);
_stats.partitions_bigger_than_threshold += above_threshold.size; // increment if true
@@ -83,7 +83,7 @@ sstring large_data_handler::sst_filename(const sstables::sstable& sst) {
}
future<> large_data_handler::maybe_delete_large_data_entries(sstables::shared_sstable sst) {
scylla_assert(running());
SCYLLA_ASSERT(running());
auto schema = sst->get_schema();
auto filename = sst_filename(*sst);
using ldt = sstables::large_data_type;
@@ -247,7 +247,7 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const {
auto sys_ks = _sys_ks.get_permit();
scylla_assert(sys_ks);
SCYLLA_ASSERT(sys_ks);
const sstring req =
seastar::format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?",
large_table_name);

View File

@@ -80,7 +80,7 @@ public:
future<bool> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, uint64_t row_size) {
scylla_assert(running());
SCYLLA_ASSERT(running());
if (row_size > _row_threshold_bytes) [[unlikely]] {
return with_sem([&sst, &partition_key, clustering_key, row_size, this] {
return record_large_rows(sst, partition_key, clustering_key, row_size);
@@ -100,7 +100,7 @@ public:
future<bool> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) {
scylla_assert(running());
SCYLLA_ASSERT(running());
if (cell_size > _cell_threshold_bytes || collection_elements > _collection_elements_count_threshold) [[unlikely]] {
return with_sem([&sst, &partition_key, clustering_key, &cdef, cell_size, collection_elements, this] {
return record_large_cells(sst, partition_key, clustering_key, cdef, cell_size, collection_elements);

View File

@@ -1121,7 +1121,7 @@ future<> schema_applier::commit() {
// Run func first on shard 0
// to allow "seeding" of the effective_replication_map
// with a new e_r_m instance.
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
commit_on_shard(sharded_db.local());
co_await sharded_db.invoke_on_others([this] (replica::database& db) {
commit_on_shard(db);

View File

@@ -187,7 +187,7 @@ static future<std::vector<token_range>> get_local_ranges(replica::database& db,
auto ranges = db.get_token_metadata().get_primary_ranges_for(std::move(tokens));
std::vector<token_range> local_ranges;
auto to_bytes = [](const std::optional<dht::token_range::bound>& b) {
scylla_assert(b);
SCYLLA_ASSERT(b);
return utf8_type->decompose(b->value().to_sstring());
};
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.

View File

@@ -231,7 +231,7 @@ static schema_ptr get_current_service_levels(data_dictionary::database db) {
}
static schema_ptr get_updated_service_levels(data_dictionary::database db, bool workload_prioritization_enabled) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
auto schema = get_current_service_levels(db);
schema_builder b(schema);
for (const auto& col : new_service_levels_columns(workload_prioritization_enabled)) {

View File

@@ -9,8 +9,6 @@
#include "query/query-result-reader.hh"
#include "replica/database_fwd.hh"
#include "db/timeout_clock.hh"
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
namespace service {
class storage_proxy;
@@ -27,14 +25,8 @@ class delete_ghost_rows_visitor {
replica::table& _view_table;
schema_ptr _base_schema;
std::optional<partition_key> _view_pk;
db::timeout_semaphore _concurrency_semaphore;
seastar::gate _gate;
std::exception_ptr& _ex;
public:
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex);
delete_ghost_rows_visitor(delete_ghost_rows_visitor&&) = default;
~delete_ghost_rows_visitor() noexcept;
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration);
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
}
@@ -53,9 +45,6 @@ public:
uint32_t accept_partition_end(const query::result_row_view& static_row) {
return 0;
}
private:
future<> do_accept_new_row(partition_key pk, clustering_key ck);
};
} //namespace db::view

View File

@@ -153,14 +153,14 @@ row_locker::unlock(const dht::decorated_key* pk, bool partition_exclusive,
mylog.error("column_family::local_base_lock_holder::~local_base_lock_holder() can't find lock for partition", *pk);
return;
}
scylla_assert(&pli->first == pk);
SCYLLA_ASSERT(&pli->first == pk);
if (cpk) {
auto rli = pli->second._row_locks.find(*cpk);
if (rli == pli->second._row_locks.end()) {
mylog.error("column_family::local_base_lock_holder::~local_base_lock_holder() can't find lock for row", *cpk);
return;
}
scylla_assert(&rli->first == cpk);
SCYLLA_ASSERT(&rli->first == cpk);
mylog.debug("releasing {} lock for row {} in partition {}", (row_exclusive ? "exclusive" : "shared"), *cpk, *pk);
auto& lock = rli->second;
if (row_exclusive) {

View File

@@ -3597,7 +3597,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem
})
{ }
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex)
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration)
: _proxy(proxy)
, _state(state)
, _timeout_duration(timeout_duration)
@@ -3605,20 +3605,8 @@ delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& pro
, _view_table(_proxy.get_db().local().find_column_family(view))
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
, _view_pk()
, _concurrency_semaphore(concurrency)
, _ex(ex)
{}
delete_ghost_rows_visitor::~delete_ghost_rows_visitor() noexcept {
try {
_gate.close().get();
} catch (...) {
// Closing the gate should never throw, but if it does anyway, capture the exception.
_ex = std::current_exception();
}
}
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
SCYLLA_ASSERT(thread::running_in_thread());
_view_pk = key;
@@ -3626,18 +3614,7 @@ void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, u
// Assumes running in seastar::thread
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
auto units = get_units(_concurrency_semaphore, 1).get();
(void)seastar::try_with_gate(_gate, [this, pk = _view_pk.value(), units = std::move(units), ck] () mutable {
return do_accept_new_row(std::move(pk), std::move(ck)).then_wrapped([this, units = std::move(units)] (future<>&& f) mutable {
if (f.failed()) {
_ex = f.get_exception();
}
});
});
}
future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clustering_key ck) {
auto view_exploded_pk = pk.explode();
auto view_exploded_pk = _view_pk->explode();
auto view_exploded_ck = ck.explode();
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
@@ -3672,17 +3649,17 @@ future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clusteri
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
auto timeout = db::timeout_clock::now() + _timeout_duration;
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
auto base_qr = co_await _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts);
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get();
query::result& result = *base_qr.query_result;
auto delete_ghost_row = [&]() -> future<> {
mutation m(_view, pk);
auto delete_ghost_row = [&]() {
mutation m(_view, *_view_pk);
auto& row = m.partition().clustered_row(*_view, ck);
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
timeout = db::timeout_clock::now() + _timeout_duration;
return _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no);
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
};
if (result.row_count().value_or(0) == 0) {
co_await delete_ghost_row();
delete_ghost_row();
} else if (!view_key_cols_not_in_base_key.empty()) {
if (result.row_count().value_or(0) != 1) {
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
@@ -3692,7 +3669,7 @@ future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clusteri
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
co_await delete_ghost_row();
delete_ghost_row();
break;
}
}

View File

@@ -2,6 +2,7 @@ etc/default/scylla-server
etc/default/scylla-housekeeping
etc/scylla.d/*.conf
etc/bash_completion.d/nodetool-completion
opt/scylladb/share/p11-kit/modules/*
opt/scylladb/share/doc/scylla/*
opt/scylladb/share/doc/scylla/licenses/
usr/lib/systemd/system/*.timer

View File

@@ -122,6 +122,7 @@ ln -sfT /etc/scylla /var/lib/scylla/conf
%config(noreplace) %{_sysconfdir}/sysconfig/scylla-housekeeping
%attr(0755,root,root) %dir %{_sysconfdir}/scylla.d
%config(noreplace) %{_sysconfdir}/scylla.d/*.conf
/opt/scylladb/share/p11-kit/modules/*
/opt/scylladb/share/doc/scylla/*
%{_unitdir}/scylla-fstrim.service
%{_unitdir}/scylla-housekeeping-daily.service

View File

@@ -1,18 +1,6 @@
### a dictionary of redirections
#old path: new path
# Move the diver information to another project
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html
/stable/using-scylla/drivers/dynamo-drivers/index.html: https://docs.scylladb.com/stable/drivers/dynamo-drivers.html
/stable/using-scylla/drivers/cql-drivers/index.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-python-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-java-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-go-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-gocqlx-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-cpp-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-rust-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
# Redirect 2025.1 upgrade guides that are not on master but were indexed by Google (404 reported)
/master/upgrade/upgrade-guides/upgrade-guide-from-2024.x-to-2025.1/upgrade-guide-from-2024.x-to-2025.1.html: https://docs.scylladb.com/manual/stable/upgrade/index.html

View File

@@ -106,15 +106,6 @@ which is recommended in order to make the operation less heavyweight
and allow for running multiple parallel pruning statements for non-overlapping
token ranges.
By default, the PRUNE MATERIALIZED VIEW statement is relatively slow, only
performing one base read or write at a time. This can be changed with the
USING CONCURRENCY clause. If the clause is used, the concurrency of reads
and writes from the base table will be allowed to increase up to the specified
value. For example, to run the PRUNE with 100 parallel reads/writes, you can use:
```cql
PRUNE MATERIALIZED VIEW my_view WHERE v = 19 USING CONCURRENCY 100;
```
## Synchronous materialized views
Usually, when a table with materialized views is updated, the update to the

View File

@@ -1,198 +0,0 @@
# SCYLLA_ASSERT to scylla_assert() Conversion Guide
## Overview
This document tracks the conversion of `SCYLLA_ASSERT` to the new `scylla_assert()` macro based on `on_internal_error()`. The new macro throws exceptions instead of crashing the process, preventing cluster-wide crashes and loss of availability.
## Status Summary
- **Total SCYLLA_ASSERT usages**: ~1307 (including tests)
- **Non-test usages**: ~886
- **Unsafe conversions (noexcept)**: ~187
- **Unsafe conversions (destructors)**: ~36
- **Safe conversions possible**: ~668
- **Converted so far**: 112
## Safe vs Unsafe Contexts
### Safe to Convert ✓
- Regular functions (non-noexcept)
- Coroutine functions (returning `future<T>`)
- Member functions without noexcept specifier
- Functions where exception propagation is acceptable
### Unsafe to Convert ✗
1. **noexcept functions** - throwing exceptions from noexcept causes `std::terminate()`
2. **Destructors** - destructors are implicitly noexcept
3. **noexcept lambdas and callbacks**
4. **Code with explicit exception-safety requirements** that cannot handle exceptions
## Files with Unsafe Conversions
### Files with SCYLLA_ASSERT in noexcept contexts (examples)
1. **reader_concurrency_semaphore.cc**
- Lines with noexcept functions containing SCYLLA_ASSERT
- Must remain as SCYLLA_ASSERT
2. **db/large_data_handler.cc**
- Line 86: `maybe_delete_large_data_entries()` - marked noexcept but contains SCYLLA_ASSERT
- Analysis shows this is actually safe (not truly noexcept)
3. **db/row_cache.cc**
- Multiple SCYLLA_ASSERT usages in noexcept member functions
4. **db/schema_tables.cc**
- SCYLLA_ASSERT in noexcept contexts
5. **raft/server.cc**
- Multiple noexcept functions with SCYLLA_ASSERT
### Files with SCYLLA_ASSERT in destructors
1. **reader_concurrency_semaphore.cc**
- Line 1116: SCYLLA_ASSERT in destructor
2. **api/column_family.cc**
- Line 102: SCYLLA_ASSERT in destructor
3. **utils/logalloc.cc**
- Line 1991: SCYLLA_ASSERT in destructor
4. **utils/file_lock.cc**
- Lines 34, 36: SCYLLA_ASSERT in destructor
5. **utils/disk_space_monitor.cc**
- Line 66: SCYLLA_ASSERT in destructor
## Conversion Strategy
### Phase 1: Infrastructure (Completed)
- Created `scylla_assert()` macro in `utils/assert.hh`
- Uses `on_internal_error()` for exception-based error handling
- Supports optional message parameters
### Phase 2: Safe Conversions
Convert SCYLLA_ASSERT to scylla_assert in contexts where:
- Function is not noexcept
- Not in a destructor
- Exception propagation is safe
### Phase 3: Document Remaining Uses
For contexts that cannot be converted:
- Add comments explaining why SCYLLA_ASSERT must remain
- Consider alternative approaches (e.g., using `on_fatal_internal_error()` in noexcept)
## Converted Files
### Completed Conversions
1. **db/large_data_handler.cc** (3 conversions)
- Line 42: `maybe_record_large_partitions()`
- Line 86: `maybe_delete_large_data_entries()`
- Line 250: `delete_large_data_entries()`
2. **db/large_data_handler.hh** (2 conversions)
- Line 83: `maybe_record_large_rows()`
- Line 103: `maybe_record_large_cells()`
3. **db/schema_applier.cc** (1 conversion)
- Line 1124: `commit()` coroutine
4. **db/system_distributed_keyspace.cc** (1 conversion)
- Line 234: `get_updated_service_levels()`
5. **db/commitlog/commitlog_replayer.cc** (1 conversion)
- Line 168: `recover()` coroutine
6. **db/view/row_locking.cc** (2 conversions)
- Line 156: `unlock()` - partition lock check
- Line 163: `unlock()` - row lock check
7. **db/size_estimates_virtual_reader.cc** (1 conversion)
- Line 190: Lambda in `get_local_ranges()`
8. **db/corrupt_data_handler.cc** (2 conversions)
- Line 78: `set_cell_raw` lambda
- Line 85: `set_cell` lambda
9. **raft/tracker.cc** (2 conversions)
- Line 49: Switch default case with descriptive error
- Line 90: Switch default case with descriptive error
10. **service/topology_coordinator.cc** (11 conversions)
- Line 363: Node lookup assertion in `retake_node()`
- Line 2313: Bootstrapping state ring check
- Line 2362: Replacing state ring check
- Line 2365: Normal nodes lookup assertion
- Line 2366: Node ring and state validation
- Line 3025: Join request ring check
- Line 3036: Leave request ring check
- Line 3049: Remove request ring check
- Line 3061: Replace request ring check
- Line 3166: Transition nodes empty check
- Line 4016: Barrier validation in `stop()`
11. **service/storage_service.cc** (28 conversions, 3 unsafe kept as SCYLLA_ASSERT)
- Lines 603, 691, 857, 901, 969: Core service operations
- Lines 1523, 1575, 1844, 2086, 2170, 2195: Bootstrap and join operations
- Lines 2319, 2352, 2354: Replacement operations
- Lines 3003, 3028, 3228: Cluster join and drain operations
- Lines 3995, 4047, 4353: Decommission and removenode operations
- Lines 4473, 5787, 5834, 5958: CDC and topology change operations
- Lines 6490, 6491: Tablet streaming operations
- Line 7512: Join node response handler
- **Unsafe (kept as SCYLLA_ASSERT)**: Lines 3398, 5760, 5775 (noexcept functions)
12. **sstables/** (58 conversions across 22 files)
- **sstables/trie/bti_node_reader.cc** (6): Node reading operations
- **sstables/mx/writer.cc** (6): MX format writing
- **sstables/sstable_set.cc** (5): SSTable set management
- **sstables/compressor.cc** (5): Compression/decompression
- **sstables/trie/trie_writer.hh** (4): Trie writing
- **sstables/downsampling.hh** (4): Downsampling operations
- **sstables/storage.{cc,hh}** (6): Storage operations
- **sstables/sstables_manager.{cc,hh}** (6): SSTable lifecycle management
- **sstables/trie/writer_node.{hh,impl.hh}** (4): Trie node writing
- **sstables/trie/bti_key_translation.cc** (2): Key translation
- **sstables/sstable_directory.cc** (2): Directory management
- **sstables/trie/trie_writer.cc** (1): Trie writer implementation
- **sstables/trie/trie_traversal.hh** (1): Trie traversal
- **sstables/sstables.cc** (1): Core SSTable operations
- **sstables/partition_index_cache.hh** (1): Index caching
- **sstables/generation_type.hh** (1): Generation management
- **sstables/compress.{cc,hh}** (2): Compression utilities
- **sstables/exceptions.hh** (1): Comment update
## Testing
### Manual Testing
Created `test/manual/test_scylla_assert.cc` to verify:
- Passing assertions succeed
- Failing assertions throw exceptions
- Custom messages are properly formatted
### Integration Testing
- Run existing test suite with converted assertions
- Verify no regressions in error handling
- Confirm exception propagation works correctly
## Future Work
1. **Automated Analysis Tool**
- Create tool to identify safe vs unsafe conversion contexts
- Generate reports of remaining conversions
2. **Gradual Conversion**
- Convert additional safe usages incrementally
- Monitor for any unexpected issues
3. **noexcept Review**
- Review functions marked noexcept that contain SCYLLA_ASSERT
- Consider if they should use `on_fatal_internal_error()` instead
## References
- `utils/assert.hh` - Implementation of both SCYLLA_ASSERT and scylla_assert
- `utils/on_internal_error.hh` - Exception-based error handling infrastructure
- GitHub Issue: [Link to original issue tracking this work]

View File

@@ -1,614 +0,0 @@
# Unsafe SCYLLA_ASSERT Locations
This document lists specific locations where SCYLLA_ASSERT cannot be safely converted to scylla_assert().
## Summary
- Files with noexcept SCYLLA_ASSERT: 50
- Files with destructor SCYLLA_ASSERT: 25
- Total unsafe SCYLLA_ASSERT in noexcept: 187
- Total unsafe SCYLLA_ASSERT in destructors: 36
## SCYLLA_ASSERT in noexcept Functions
### auth/cache.cc
- Line 118: `SCYLLA_ASSERT(this_shard_id() == 0);`
Total: 1 usages
### db/cache_mutation_reader.hh
- Line 309: `SCYLLA_ASSERT(sr->is_static_row());`
Total: 1 usages
### db/commitlog/commitlog.cc
- Line 531: `SCYLLA_ASSERT(!*this);`
- Line 544: `SCYLLA_ASSERT(!*this);`
- Line 662: `SCYLLA_ASSERT(_iter != _end);`
- Line 1462: `SCYLLA_ASSERT(i->second >= count);`
Total: 4 usages
### db/hints/manager.hh
- Line 167: `SCYLLA_ASSERT(_ep_managers.empty());`
Total: 1 usages
### db/partition_snapshot_row_cursor.hh
- Line 384: `SCYLLA_ASSERT(_latest_it);`
Total: 1 usages
### db/row_cache.cc
- Line 1365: `SCYLLA_ASSERT(it->is_last_dummy());`
Total: 1 usages
### db/schema_tables.cc
- Line 774: `SCYLLA_ASSERT(this_shard_id() == 0);`
Total: 1 usages
### db/view/view.cc
- Line 3623: `SCYLLA_ASSERT(thread::running_in_thread());`
Total: 1 usages
### gms/gossiper.cc
- Line 876: `SCYLLA_ASSERT(ptr->pid == _permit_id);`
Total: 1 usages
### locator/production_snitch_base.hh
- Line 77: `SCYLLA_ASSERT(_backreference != nullptr);`
- Line 82: `SCYLLA_ASSERT(_backreference != nullptr);`
- Line 87: `SCYLLA_ASSERT(_backreference != nullptr);`
Total: 3 usages
### locator/topology.cc
- Line 135: `SCYLLA_ASSERT(_shard == this_shard_id());`
Total: 1 usages
### mutation/counters.hh
- Line 314: `SCYLLA_ASSERT(_cell.is_live());`
- Line 315: `SCYLLA_ASSERT(!_cell.is_counter_update());`
Total: 2 usages
### mutation/mutation_partition_v2.hh
- Line 271: `SCYLLA_ASSERT(s.version() == _schema_version);`
Total: 1 usages
### mutation/partition_version.cc
- Line 364: `SCYLLA_ASSERT(!_snapshot->is_locked());`
- Line 701: `SCYLLA_ASSERT(!rows.empty());`
- Line 703: `SCYLLA_ASSERT(last_dummy.is_last_dummy());`
- Line 746: `SCYLLA_ASSERT(!_snapshot->is_locked());`
- Line 770: `SCYLLA_ASSERT(at_latest_version());`
- Line 777: `SCYLLA_ASSERT(at_latest_version());`
Total: 6 usages
### mutation/partition_version.hh
- Line 211: `SCYLLA_ASSERT(_schema);`
- Line 217: `SCYLLA_ASSERT(_schema);`
- Line 254: `SCYLLA_ASSERT(!_version->_backref);`
- Line 282: `SCYLLA_ASSERT(_version);`
- Line 286: `SCYLLA_ASSERT(_version);`
- Line 290: `SCYLLA_ASSERT(_version);`
- Line 294: `SCYLLA_ASSERT(_version);`
Total: 7 usages
### mutation/partition_version_list.hh
- Line 36: `SCYLLA_ASSERT(!_head->is_referenced_from_entry());`
- Line 42: `SCYLLA_ASSERT(!_tail->is_referenced_from_entry());`
- Line 70: `SCYLLA_ASSERT(!_head->is_referenced_from_entry());`
Total: 3 usages
### mutation/range_tombstone_list.cc
- Line 412: `SCYLLA_ASSERT (it != rt_list.end());`
- Line 422: `SCYLLA_ASSERT (it != rt_list.end());`
Total: 2 usages
### raft/server.cc
- Line 1720: `SCYLLA_ASSERT(_non_joint_conf_commit_promise);`
Total: 1 usages
### reader_concurrency_semaphore.cc
- Line 109: `SCYLLA_ASSERT(_permit == o._permit);`
- Line 432: `SCYLLA_ASSERT(_need_cpu_branches);`
- Line 455: `SCYLLA_ASSERT(_awaits_branches);`
- Line 1257: `SCYLLA_ASSERT(!_stopped);`
- Line 1585: `SCYLLA_ASSERT(_stats.need_cpu_permits);`
- Line 1587: `SCYLLA_ASSERT(_stats.need_cpu_permits >= _stats.awaits_permits);`
- Line 1593: `SCYLLA_ASSERT(_stats.need_cpu_permits >= _stats.awaits_permits);`
- Line 1598: `SCYLLA_ASSERT(_stats.awaits_permits);`
Total: 8 usages
### readers/multishard.cc
- Line 296: `SCYLLA_ASSERT(!_irh);`
Total: 1 usages
### repair/repair.cc
- Line 1073: `SCYLLA_ASSERT(table_names().size() == table_ids.size());`
Total: 1 usages
### replica/database.cc
- Line 3299: `SCYLLA_ASSERT(!_cf_lock.try_write_lock()); // lock should be acquired before the`
- Line 3304: `SCYLLA_ASSERT(!_cf_lock.try_write_lock()); // lock should be acquired before the`
Total: 2 usages
### replica/database.hh
- Line 1971: `SCYLLA_ASSERT(_user_sstables_manager);`
- Line 1976: `SCYLLA_ASSERT(_system_sstables_manager);`
Total: 2 usages
### replica/dirty_memory_manager.cc
- Line 67: `SCYLLA_ASSERT(!child->_heap_handle);`
Total: 1 usages
### replica/dirty_memory_manager.hh
- Line 261: `SCYLLA_ASSERT(_shutdown_requested);`
Total: 1 usages
### replica/memtable.cc
- Line 563: `SCYLLA_ASSERT(_mt._flushed_memory <= static_cast<int64_t>(_mt.occupancy().total_`
- Line 860: `SCYLLA_ASSERT(!reclaiming_enabled());`
Total: 2 usages
### replica/table.cc
- Line 2829: `SCYLLA_ASSERT(!trange.start()->is_inclusive() && trange.end()->is_inclusive());`
Total: 1 usages
### schema/schema.hh
- Line 1022: `SCYLLA_ASSERT(_schema->is_view());`
Total: 1 usages
### schema/schema_registry.cc
- Line 257: `SCYLLA_ASSERT(_state >= state::LOADED);`
- Line 262: `SCYLLA_ASSERT(_state >= state::LOADED);`
- Line 329: `SCYLLA_ASSERT(o._cpu_of_origin == current);`
Total: 3 usages
### service/direct_failure_detector/failure_detector.cc
- Line 628: `SCYLLA_ASSERT(alive != endpoint_liveness.marked_alive);`
Total: 1 usages
### service/storage_service.cc
- Line 3398: `SCYLLA_ASSERT(this_shard_id() == 0);`
- Line 5760: `SCYLLA_ASSERT(this_shard_id() == 0);`
- Line 5775: `SCYLLA_ASSERT(this_shard_id() == 0);`
- Line 5787: `SCYLLA_ASSERT(this_shard_id() == 0);`
Total: 4 usages
### sstables/generation_type.hh
- Line 132: `SCYLLA_ASSERT(bool(gen));`
Total: 1 usages
### sstables/partition_index_cache.hh
- Line 62: `SCYLLA_ASSERT(!ready());`
Total: 1 usages
### sstables/sstables_manager.hh
- Line 244: `SCYLLA_ASSERT(_sstables_registry && "sstables_registry is not plugged");`
Total: 1 usages
### sstables/storage.hh
- Line 86: `SCYLLA_ASSERT(false && "Changing directory not implemented");`
- Line 89: `SCYLLA_ASSERT(false && "Direct links creation not implemented");`
- Line 92: `SCYLLA_ASSERT(false && "Direct move not implemented");`
Total: 3 usages
### sstables_loader.cc
- Line 735: `SCYLLA_ASSERT(p);`
Total: 1 usages
### tasks/task_manager.cc
- Line 56: `SCYLLA_ASSERT(inserted);`
- Line 76: `SCYLLA_ASSERT(child->get_status().progress_units == progress_units);`
- Line 454: `SCYLLA_ASSERT(this_shard_id() == 0);`
Total: 3 usages
### tools/schema_loader.cc
- Line 281: `SCYLLA_ASSERT(p);`
Total: 1 usages
### utils/UUID.hh
- Line 59: `SCYLLA_ASSERT(is_timestamp());`
Total: 1 usages
### utils/bptree.hh
- Line 289: `SCYLLA_ASSERT(n.is_leftmost());`
- Line 301: `SCYLLA_ASSERT(n.is_rightmost());`
- Line 343: `SCYLLA_ASSERT(leaf->is_leaf());`
- Line 434: `SCYLLA_ASSERT(d->attached());`
- Line 453: `SCYLLA_ASSERT(n._num_keys > 0);`
- Line 505: `SCYLLA_ASSERT(n->is_leftmost());`
- Line 511: `SCYLLA_ASSERT(n->is_rightmost());`
- Line 517: `SCYLLA_ASSERT(n->is_root());`
- Line 557: `SCYLLA_ASSERT(!is_end());`
- Line 566: `SCYLLA_ASSERT(!is_end());`
- Line 613: `SCYLLA_ASSERT(n->_num_keys > 0);`
- Line 833: `SCYLLA_ASSERT(_left->_num_keys > 0);`
- Line 926: `SCYLLA_ASSERT(rl == rb);`
- Line 927: `SCYLLA_ASSERT(rl <= nr);`
- Line 1037: `SCYLLA_ASSERT(is_leaf());`
- Line 1042: `SCYLLA_ASSERT(is_leaf());`
- Line 1047: `SCYLLA_ASSERT(is_leaf());`
- Line 1052: `SCYLLA_ASSERT(is_leaf());`
- Line 1062: `SCYLLA_ASSERT(t->_right == this);`
- Line 1083: `SCYLLA_ASSERT(t->_left == this);`
- Line 1091: `SCYLLA_ASSERT(t->_right == this);`
- Line 1103: `SCYLLA_ASSERT(false);`
- Line 1153: `SCYLLA_ASSERT(i <= _num_keys);`
- Line 1212: `SCYLLA_ASSERT(off <= _num_keys);`
- Line 1236: `SCYLLA_ASSERT(from._num_keys > 0);`
- Line 1389: `SCYLLA_ASSERT(!is_root());`
- Line 1450: `SCYLLA_ASSERT(_num_keys == NodeSize);`
- Line 1563: `SCYLLA_ASSERT(_num_keys < NodeSize);`
- Line 1577: `SCYLLA_ASSERT(i != 0 || left_kid_sorted(k, less));`
- Line 1647: `SCYLLA_ASSERT(nodes.empty());`
- Line 1684: `SCYLLA_ASSERT(_num_keys > 0);`
- Line 1686: `SCYLLA_ASSERT(p._kids[i].n == this);`
- Line 1788: `SCYLLA_ASSERT(_num_keys == 0);`
- Line 1789: `SCYLLA_ASSERT(is_root() || !is_leaf() || (get_prev() == this && get_next() == th`
- Line 1821: `SCYLLA_ASSERT(_parent->_kids[i].n == &other);`
- Line 1841: `SCYLLA_ASSERT(i <= _num_keys);`
- Line 1856: `SCYLLA_ASSERT(!_nodes.empty());`
- Line 1938: `SCYLLA_ASSERT(!attached());`
- Line 1943: `SCYLLA_ASSERT(attached());`
Total: 39 usages
### utils/cached_file.hh
- Line 104: `SCYLLA_ASSERT(!_use_count);`
Total: 1 usages
### utils/compact-radix-tree.hh
- Line 1026: `SCYLLA_ASSERT(check_capacity(head, ni));`
- Line 1027: `SCYLLA_ASSERT(!_data.has(ni));`
- Line 1083: `SCYLLA_ASSERT(next_cap > head._capacity);`
- Line 1149: `SCYLLA_ASSERT(capacity != 0);`
- Line 1239: `SCYLLA_ASSERT(i < Size);`
- Line 1240: `SCYLLA_ASSERT(_idx[i] == unused_node_index);`
- Line 1470: `SCYLLA_ASSERT(kid != nullptr);`
- Line 1541: `SCYLLA_ASSERT(ret.first != nullptr);`
- Line 1555: `SCYLLA_ASSERT(leaf_depth >= depth);`
- Line 1614: `SCYLLA_ASSERT(n->check_prefix(key, depth));`
- Line 1850: `SCYLLA_ASSERT(_root.is(nil_root));`
Total: 11 usages
### utils/cross-shard-barrier.hh
- Line 134: `SCYLLA_ASSERT(w.has_value());`
Total: 1 usages
### utils/double-decker.hh
- Line 200: `SCYLLA_ASSERT(!hint.match);`
- Line 366: `SCYLLA_ASSERT(nb == end._bucket);`
Total: 2 usages
### utils/intrusive-array.hh
- Line 217: `SCYLLA_ASSERT(!is_single_element());`
- Line 218: `SCYLLA_ASSERT(pos < max_len);`
- Line 225: `SCYLLA_ASSERT(pos > 0);`
- Line 238: `SCYLLA_ASSERT(train_len < max_len);`
- Line 329: `SCYLLA_ASSERT(idx < max_len); // may the force be with us...`
Total: 5 usages
### utils/intrusive_btree.hh
- Line 148: `SCYLLA_ASSERT(to.num_keys == 0);`
- Line 157: `SCYLLA_ASSERT(!attached());`
- Line 227: `SCYLLA_ASSERT(n->is_inline());`
- Line 232: `SCYLLA_ASSERT(n->is_inline());`
- Line 288: `SCYLLA_ASSERT(n.is_root());`
- Line 294: `SCYLLA_ASSERT(n.is_leftmost());`
- Line 302: `SCYLLA_ASSERT(n.is_rightmost());`
- Line 368: `SCYLLA_ASSERT(_root->is_leaf());`
- Line 371: `SCYLLA_ASSERT(_inline.empty());`
- Line 601: `SCYLLA_ASSERT(n->is_leaf());`
- Line 673: `SCYLLA_ASSERT(!is_end());`
- Line 674: `SCYLLA_ASSERT(h->attached());`
- Line 677: `SCYLLA_ASSERT(_idx < cur.n->_base.num_keys);`
- Line 679: `SCYLLA_ASSERT(_hook->attached());`
- Line 690: `SCYLLA_ASSERT(!is_end());`
- Line 764: `SCYLLA_ASSERT(n->num_keys > 0);`
- Line 994: `SCYLLA_ASSERT(!_it.is_end());`
- Line 1178: `SCYLLA_ASSERT(is_leaf());`
- Line 1183: `SCYLLA_ASSERT(is_root());`
- Line 1261: `SCYLLA_ASSERT(!is_root());`
- Line 1268: `SCYLLA_ASSERT(p->_base.num_keys > 0 && p->_kids[0] == this);`
- Line 1275: `SCYLLA_ASSERT(p->_base.num_keys > 0 && p->_kids[p->_base.num_keys] == this);`
- Line 1286: `SCYLLA_ASSERT(false);`
- Line 1291: `SCYLLA_ASSERT(!nb->is_inline());`
- Line 1296: `SCYLLA_ASSERT(!nb->is_inline());`
- Line 1338: `SCYLLA_ASSERT(_base.num_keys == 0);`
- Line 1373: `SCYLLA_ASSERT(!(is_leftmost() || is_rightmost()));`
- Line 1378: `SCYLLA_ASSERT(p->_kids[i] != this);`
- Line 1396: `SCYLLA_ASSERT(!is_leaf());`
- Line 1537: `SCYLLA_ASSERT(src != _base.num_keys); // need more keys for the next leaf`
- Line 1995: `SCYLLA_ASSERT(_parent.n->_base.num_keys > 0);`
- Line 2135: `SCYLLA_ASSERT(is_leaf());`
- Line 2144: `SCYLLA_ASSERT(_base.num_keys != 0);`
- Line 2160: `SCYLLA_ASSERT(_base.num_keys != 0);`
- Line 2172: `SCYLLA_ASSERT(!empty());`
- Line 2198: `SCYLLA_ASSERT(leaf == ret->is_leaf());`
Total: 36 usages
### utils/loading_shared_values.hh
- Line 203: `SCYLLA_ASSERT(!_set.size());`
Total: 1 usages
### utils/logalloc.cc
- Line 544: `SCYLLA_ASSERT(!_background_reclaimer);`
- Line 926: `SCYLLA_ASSERT(idx < _segments.size());`
- Line 933: `SCYLLA_ASSERT(idx < _segments.size());`
- Line 957: `SCYLLA_ASSERT(i != _segments.end());`
- Line 1323: `SCYLLA_ASSERT(_lsa_owned_segments_bitmap.test(idx_from_segment(seg)));`
- Line 1366: `SCYLLA_ASSERT(desc._region);`
- Line 1885: `SCYLLA_ASSERT(desc._buf_pointers.empty());`
- Line 1911: `SCYLLA_ASSERT(&desc == old_ptr->_desc);`
- Line 2105: `SCYLLA_ASSERT(seg);`
- Line 2116: `SCYLLA_ASSERT(seg);`
- Line 2341: `SCYLLA_ASSERT(pool.current_emergency_reserve_goal() >= n_segments);`
Total: 11 usages
### utils/logalloc.hh
- Line 307: `SCYLLA_ASSERT(this_shard_id() == _cpu);`
Total: 1 usages
### utils/reusable_buffer.hh
- Line 60: `SCYLLA_ASSERT(_refcount == 0);`
Total: 1 usages
## SCYLLA_ASSERT in Destructors
### api/column_family.cc
- Line 102: `SCYLLA_ASSERT(this_shard_id() == 0);`
Total: 1 usages
### cdc/generation.cc
- Line 846: `SCYLLA_ASSERT(_stopped);`
Total: 1 usages
### cdc/log.cc
- Line 173: `SCYLLA_ASSERT(_stopped);`
Total: 1 usages
### compaction/compaction_manager.cc
- Line 1074: `SCYLLA_ASSERT(_state == state::none || _state == state::stopped);`
Total: 1 usages
### db/hints/internal/hint_endpoint_manager.cc
- Line 188: `SCYLLA_ASSERT(stopped());`
Total: 1 usages
### mutation/partition_version.cc
- Line 347: `SCYLLA_ASSERT(!_snapshot->is_locked());`
Total: 1 usages
### reader_concurrency_semaphore.cc
- Line 1116: `SCYLLA_ASSERT(!_stats.waiters);`
- Line 1125: `SCYLLA_ASSERT(_inactive_reads.empty() && !_close_readers_gate.get_count() && !_p`
Total: 2 usages
### repair/row_level.cc
- Line 3647: `SCYLLA_ASSERT(_state == state::none || _state == state::stopped);`
Total: 1 usages
### replica/cell_locking.hh
- Line 371: `SCYLLA_ASSERT(_partitions.empty());`
Total: 1 usages
### replica/distributed_loader.cc
- Line 305: `SCYLLA_ASSERT(_sstable_directories.empty());`
Total: 1 usages
### schema/schema_registry.cc
- Line 45: `SCYLLA_ASSERT(!_schema);`
Total: 1 usages
### service/direct_failure_detector/failure_detector.cc
- Line 378: `SCYLLA_ASSERT(_ping_fiber.available());`
- Line 379: `SCYLLA_ASSERT(_notify_fiber.available());`
- Line 701: `SCYLLA_ASSERT(_shard_workers.empty());`
- Line 702: `SCYLLA_ASSERT(_destroy_subscriptions.available());`
- Line 703: `SCYLLA_ASSERT(_update_endpoint_fiber.available());`
- Line 707: `SCYLLA_ASSERT(!_impl);`
Total: 6 usages
### service/load_broadcaster.hh
- Line 37: `SCYLLA_ASSERT(_stopped);`
Total: 1 usages
### service/paxos/paxos_state.cc
- Line 323: `SCYLLA_ASSERT(_stopped);`
Total: 1 usages
### service/storage_proxy.cc
- Line 281: `SCYLLA_ASSERT(_stopped);`
- Line 3207: `SCYLLA_ASSERT(!_remote);`
Total: 2 usages
### service/tablet_allocator.cc
- Line 3288: `SCYLLA_ASSERT(_stopped);`
Total: 1 usages
### sstables/compressor.cc
- Line 1271: `SCYLLA_ASSERT(thread::running_in_thread());`
Total: 1 usages
### sstables/sstables_manager.cc
- Line 58: `SCYLLA_ASSERT(_closing);`
- Line 59: `SCYLLA_ASSERT(_active.empty());`
- Line 60: `SCYLLA_ASSERT(_undergoing_close.empty());`
Total: 3 usages
### sstables/sstables_manager.hh
- Line 188: `SCYLLA_ASSERT(_storage != nullptr);`
Total: 1 usages
### utils/cached_file.hh
- Line 477: `SCYLLA_ASSERT(_cache.empty());`
Total: 1 usages
### utils/disk_space_monitor.cc
- Line 66: `SCYLLA_ASSERT(_poller_fut.available());`
Total: 1 usages
### utils/file_lock.cc
- Line 34: `SCYLLA_ASSERT(_fd.get() != -1);`
- Line 36: `SCYLLA_ASSERT(r == 0);`
Total: 2 usages
### utils/logalloc.cc
- Line 1991: `SCYLLA_ASSERT(desc.is_empty());`
- Line 1996: `SCYLLA_ASSERT(segment_pool().descriptor(_active).is_empty());`
Total: 2 usages
### utils/lru.hh
- Line 41: `SCYLLA_ASSERT(!_lru_link.is_linked());`
Total: 1 usages
### utils/replicator.hh
- Line 221: `SCYLLA_ASSERT(_stopped);`
Total: 1 usages

View File

@@ -37,7 +37,7 @@ Getting Started
:id: "getting-started"
:class: my-panel
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
* :doc:`ScyllaDB Drivers</using-scylla/drivers/index>`
* `Get Started Lesson on ScyllaDB University <https://university.scylladb.com/courses/scylla-essentials-overview/lessons/quick-wins-install-and-run-scylla/>`_
* :doc:`CQL Reference </cql/index>`
* :doc:`cqlsh - the CQL shell </cql/cqlsh/>`

View File

@@ -35,7 +35,7 @@ Documentation Highlights
* :doc:`Cluster Management Procedures </operating-scylla/procedures/cluster-management/index>`
* :doc:`Upgrade ScyllaDB </upgrade/index>`
* :doc:`CQL Reference </cql/index>`
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`
* :doc:`Features </features/index>`
ScyllaDB Support

View File

@@ -172,7 +172,7 @@ For example:
* `ScyllaDB Java Driver <https://github.com/scylladb/java-driver/tree/3.7.1-scylla/manual/compression>`_
* `Go Driver <https://godoc.org/github.com/gocql/gocql#Compressor>`_
Refer to `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ for more drivers.
Refer to the :doc:`Drivers Page </using-scylla/drivers/index>` for more drivers.
.. _internode-compression:

View File

@@ -206,7 +206,7 @@ This is 19% of the latency compared to no batching.
Driver Guidelines
-----------------
Use the `ScyllaDB drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ that are available for Java, Python, Go, and C/C++.
Use the :doc:`ScyllaDB drivers </using-scylla/drivers/index>` that are available for Java, Python, Go, and C/C++.
They provide much better performance than third-party drivers because they are shard aware &emdash; they can route requests to the right CPU core (shard).
When the driver starts, it gets the topology of the cluster and therefore it knows exactly which CPU core should get a request.
Our latest shard-aware drivers also improve the efficiency of our Change Data Capture (CDC) feature.

View File

@@ -121,7 +121,7 @@ Driver Compression
This refers to compressing traffic between the client and ScyllaDB.
Verify your client driver is using compressed traffic when connected to ScyllaDB.
As compression is driver settings dependent, please check your client driver manual. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
As compression is driver settings dependent, please check your client driver manual or :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`.
Connectivity
@@ -130,7 +130,7 @@ Connectivity
Drivers Settings
================
* Use shard aware drivers wherever possible. `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ (not third-party drivers) are shard aware.
* Use shard aware drivers wherever possible. :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` (not third-party drivers) are shard aware.
* Configure connection pool - open more connections (>3 per shard) and/Or more clients. See `this blog <https://www.scylladb.com/2019/11/20/maximizing-performance-via-concurrency-while-minimizing-timeouts-in-distributed-databases/>`_.
Management

View File

@@ -25,8 +25,8 @@ Actions
If your cluster is having timeouts during overload, check first if you are not making the overload situation worse through retries, and pay attention to the following:
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults.
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults.
* Make sure the server neither runs speculative retry nor runs it based on percentiles (as those can fluctuate aggressively). Server-side speculative retries are a per-table setting that can be changed with the ALTER TABLE command. See the :ref:`documentation <speculative-retry-options>` for details.

View File

@@ -9,19 +9,9 @@ To ensure a successful upgrade, follow
the :doc:`documented upgrade procedures <upgrade-guides/index>` tested by
ScyllaDB. This means that:
* You should follow the upgrade policy:
* Starting with version **2025.4**, upgrades can skip minor versions as long
as they remain within the same major version (for example, upgrading directly
from 2025.1 → 2025.4 is supported).
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
each successive X.Y version must be installed in order, **without skipping
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3
is not supported).
* You cannot skip major versions. Upgrades must move from one major version to
the next using the documented major-version upgrade path.
* You should upgrade to a supported version of ScyllaDB.
See `ScyllaDB Version Support <https://docs.scylladb.com/stable/versioning/version-support.html>`_.
* You should perform the upgrades consecutively - to each successive X.Y
version, **without skipping any major or minor version**, unless there is
a documented upgrade procedure to bypass a version.
* Before you upgrade to the next version, the whole cluster (each node) must
be upgraded to the previous version.
* You cannot perform an upgrade by replacing the nodes in the cluster with new

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

View File

@@ -0,0 +1,141 @@
=====================
ScyllaDB CQL Drivers
=====================
.. toctree::
:titlesonly:
:hidden:
scylla-python-driver
scylla-java-driver
scylla-go-driver
scylla-gocqlx-driver
scylla-cpp-driver
scylla-rust-driver
ScyllaDB Drivers
-----------------
The following ScyllaDB drivers are available:
* :doc:`Python Driver</using-scylla/drivers/cql-drivers/scylla-python-driver>`
* :doc:`Java Driver </using-scylla/drivers/cql-drivers/scylla-java-driver>`
* :doc:`Go Driver </using-scylla/drivers/cql-drivers/scylla-go-driver>`
* :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
* :doc:`C++ Driver </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
* `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
* :doc:`Rust Driver </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
* `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
We recommend using ScyllaDB drivers. All ScyllaDB drivers are shard-aware and provide additional
benefits over third-party drivers.
ScyllaDB supports the CQL binary protocol version 3, so any Apache Cassandra/CQL driver that implements
the same version works with ScyllaDB.
CDC Integration with ScyllaDB Drivers
-------------------------------------------
The following table specifies which ScyllaDB drivers include a library for
:doc:`CDC </features/cdc/cdc-intro>`.
.. list-table::
:widths: 40 60
:header-rows: 1
* - ScyllaDB Driver
- CDC Connector
* - :doc:`Python </using-scylla/drivers/cql-drivers/scylla-python-driver>`
- |x|
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
- |v|
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
- |v|
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
- |x|
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
- |x|
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
- |x|
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
- |v|
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
- |x|
Support for Tablets
-------------------------
The following table specifies which ScyllaDB drivers support
:doc:`tablets </architecture/tablets>` and since which version.
.. list-table::
:widths: 30 35 35
:header-rows: 1
* - ScyllaDB Driver
- Support for Tablets
- Since Version
* - :doc:`Python</using-scylla/drivers/cql-drivers/scylla-python-driver>`
- |v|
- 3.26.5
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
- |v|
- 4.18.0 (Java Driver 4.x)
3.11.5.2 (Java Driver 3.x)
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
- |v|
- 1.13.0
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
- |x|
- N/A
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
- |x|
- N/A
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
- |v|
- All versions
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
- |v|
- 0.13.0
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
- |v|
- All versions
Driver Support Policy
-------------------------------
We support the **two most recent minor releases** of our drivers.
* We test and validate the latest two minor versions.
* We typically patch only the latest minor release.
We recommend staying up to date with the latest supported versions to receive
updates and fixes.
At a minimum, upgrade your driver when upgrading to a new ScyllaDB version
to ensure compatibility between the driver and the database.
Third-party Drivers
----------------------
You can find the third-party driver documentation on the GitHub pages for each driver:
* `DataStax Java Driver <https://github.com/datastax/java-driver/>`_
* `DataStax Python Driver <https://github.com/datastax/python-driver/>`_
* `DataStax C# Driver <https://github.com/datastax/csharp-driver/>`_
* `DataStax Ruby Driver <https://github.com/datastax/ruby-driver/>`_
* `DataStax Node.js Driver <https://github.com/datastax/nodejs-driver/>`_
* `DataStax C++ Driver <https://github.com/datastax/cpp-driver/>`_
* `DataStax PHP Driver (Supported versions: 7.1) <https://github.com/datastax/php-driver>`_
* `He4rt PHP Driver (Supported versions: 8.1 and 8.2) <https://github.com/he4rt/scylladb-php-driver/>`_
* `Scala Phantom Project <https://github.com/outworkers/phantom>`_
* `Xandra Elixir Driver <https://github.com/lexhide/xandra>`_
* `Exandra Elixir Driver <https://github.com/vinniefranco/exandra>`_
Learn about ScyllaDB Drivers on ScyllaDB University
----------------------------------------------------
The free `Using ScyllaDB Drivers course <https://university.scylladb.com/courses/using-scylla-drivers/>`_
on ScyllaDB University covers the use of drivers in multiple languages to interact with a ScyllaDB
cluster. The languages covered include Java, CPP, Rust, Golang, Python, Node.JS, Scala, and others.

View File

@@ -0,0 +1,16 @@
===================
ScyllaDB C++ Driver
===================
The ScyllaDB C++ driver is a modern, feature-rich and **shard-aware** C/C++ client library for ScyllaDB using exclusively Cassandras binary protocol and Cassandra Query Language v3.
This driver is forked from Datastax cpp-driver.
Read the `documentation <https://cpp-driver.docs.scylladb.com>`_ to get started or visit the Github project `ScyllaDB C++ driver <https://github.com/scylladb/cpp-driver>`_.
More Information
----------------
* `C++ Driver Documentation <https://cpp-driver.docs.scylladb.com>`_
* `C/C++ Driver course at ScyllaDB University <https://university.scylladb.com/courses/using-scylla-drivers/lessons/cpp-driver-part-1/>`_
* `Blog: A Shard-Aware ScyllaDB C/C++ Driver <https://www.scylladb.com/2021/03/18/a-shard-aware-scylla-c-c-driver/>`_

View File

@@ -0,0 +1,28 @@
==================
ScyllaDB Go Driver
==================
The `ScyllaDB Go driver <https://github.com/scylladb/gocql>`_ is shard aware and contains extensions for a tokenAwareHostPolicy supported by ScyllaDB 2.3 and onwards.
It is is a fork of the `GoCQL Driver <https://github.com/gocql/gocql>`_ but has been enhanced with capabilities that take advantage of ScyllaDB's unique architecture.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
The protocol extension spec is `available here <https://github.com/scylladb/scylla/blob/master/docs/dev/protocol-extensions.md>`_.
The ScyllaDB Go Driver is a drop-in replacement for gocql.
As such, no code changes are needed to use this driver.
All you need to do is rebuild using the ``replace`` directive in your ``mod`` file.
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/gocql>`_.
Using CDC with Go
-----------------
When writing applications, you can now use our `Go Library <https://github.com/scylladb/scylla-cdc-go>`_ to simplify writing applications that read from ScyllaDB CDC.
More information
----------------
* `ScyllaDB Gocql Driver project page on GitHub <https://github.com/scylladb/gocql>`_ - contains the source code as well as a readme and documentation files.
* `ScyllaDB University: Golang and ScyllaDB <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-1/>`_
A three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Gocql driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Go application.

View File

@@ -0,0 +1,16 @@
=========================
ScyllaDB Gocql Extension
=========================
The ScyllaDB Gocqlx is an extension to gocql that provides usability features.
With gocqlx, you can bind the query parameters from maps and structs, use named query parameters (``:identifier``), and scan the query results into structs and slices.
The driver includes a fluent and flexible CQL query builder and a database migrations module.
More information
----------------
* `ScyllaDB Gocqlx Driver project page on GitHub <https://github.com/scylladb/gocqlx>`_ - contains the source code as well as a readme and documentation files.
* `ScyllaDB University: Golang and ScyllaDB Part 3 GoCQLX <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-3-gocqlx/>`_ - part three of the Golang three-part course which focuses on how to create a sample Go application that executes a few basic CQL statements with a ScyllaDB cluster using the GoCQLX package

View File

@@ -0,0 +1,31 @@
=====================
ScyllaDB Java Driver
=====================
ScyllaDB Java Driver is forked from `DataStax Java Driver <https://github.com/datastax/java-driver>`_ with enhanced capabilities, taking advantage of ScyllaDB's unique architecture.
The ScyllaDB Java driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
Use the ScyllaDB Java driver for better compatibility and support for ScyllaDB with Java-based applications.
Read the `documentation <https://java-driver.docs.scylladb.com/>`_ to get started or visit the `Github project <https://github.com/scylladb/java-driver>`_.
The driver architecture is based on layers. At the bottom lies the driver core.
This core handles everything related to the connections to a ScyllaDB cluster (for example, connection pool, discovering new nodes, etc.) and exposes a simple, relatively low-level API on top of which higher-level layers can be built.
The ScyllaDB Java Driver is a drop-in replacement for the DataStax Java Driver.
As such, no code changes are needed to use this driver.
Using CDC with Java
-------------------
When writing applications, you can now use our `Java Library <https://github.com/scylladb/scylla-cdc-java>`_ to simplify writing applications that read from ScyllaDB CDC.
More information
----------------
* `ScyllaDB Java Driver Docs <https://java-driver.docs.scylladb.com/>`_
* `ScyllaDB Java Driver project page on GitHub <https://github.com/scylladb/java-driver/>`_ - Source Code
* `ScyllaDB University: Coding with Java <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-java-part-1/>`_ - a three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Java driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Java application.

View File

@@ -0,0 +1,20 @@
======================
ScyllaDB Python Driver
======================
The ScyllaDB Python driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
Read the `documentation <https://python-driver.docs.scylladb.com/>`_ to get started or visit the Github project `ScyllaDB Python driver <https://github.com/scylladb/python-driver/>`_.
As the ScyllaDB Python Driver is a drop-in replacement for DataStax Python Driver, no code changes are needed to use the driver.
Use the ScyllaDB Python driver for better compatibility and support for ScyllaDB with Python-based applications.
More information
----------------
* `ScyllaDB Python Driver Documentation <https://python-driver.docs.scylladb.com/>`_
* `ScyllaDB Python Driver on GitHub <https://github.com/scylladb/python-driver/>`_
* `ScyllaDB University: Coding with Python <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-python/>`_

View File

@@ -0,0 +1,24 @@
=====================
ScyllaDB Rust Driver
=====================
The ScyllaDB Rust driver is a client-side, shard-aware driver written in pure Rust with a fully async API using Tokio.
Optimized for ScyllaDB, the driver is also compatible with Apache Cassandra®.
.. image:: ./images/monster-rust.png
:width: 150pt
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/scylla-rust-driver>`_.
Read the `Documentation <https://rust-driver.docs.scylladb.com>`_.
Using CDC with Rust
----------------------
When writing applications, you can use ScyllaDB's `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_
to simplify writing applications that read from ScyllaDB's CDC.
Use `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_ to read
:doc:`ScyllaDB's CDC </features/cdc/index>` update streams.

View File

@@ -0,0 +1,9 @@
========================
AWS DynamoDB Drivers
========================
ScyllaDB AWS DynamoDB Compatible API can be used with any AWS DynamoDB Driver.
For a list of AWS AWS DynamoDB drivers see `here <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStarted.html>`_

View File

@@ -0,0 +1,21 @@
================
ScyllaDB Drivers
================
.. toctree::
:titlesonly:
:hidden:
ScyllaDB CQL Drivers <cql-drivers/index>
ScyllaDB DynamoDB Drivers <dynamo-drivers/index>
You can use ScyllaDB with:
* :doc:`Apache Cassandra CQL Compatible Drivers <cql-drivers/index>`
* :doc:`Amazon DynamoDB Compatible API Drivers <dynamo-drivers/index>`
Additional drivers coming soon!
If you are looking for a ScyllaDB Integration Solution or a Connector, refer to :doc:`ScyllaDB Integrations </using-scylla/integrations/index>`.

View File

@@ -9,7 +9,7 @@ ScyllaDB for Developers
Tutorials and Example Projects <https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html>
Learn to Use ScyllaDB <https://docs.scylladb.com/stable/get-started/learn-resources/index.html>
ScyllaDB Alternator <alternator/index>
ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>
ScyllaDB Drivers <drivers/index>
.. panel-box::
@@ -26,7 +26,7 @@ ScyllaDB for Developers
:id: "getting-started"
:class: my-panel
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ - ScyllaDB and third-party drivers for CQL and DynamoDB
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` - ScyllaDB and third-party drivers for CQL and DynamoDB
* :doc:`ScyllaDB Alternator </using-scylla/alternator/index>` - The Open Source DynamoDB-compatible API
* :doc:`CQL Reference </cql/index>` - Reference for the Apache Cassandra Query Language (CQL) and its ScyllaDB extensions

View File

@@ -28,7 +28,7 @@ ScyllaDB Integrations and Connectors
:class: my-panel
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`).
Any application which uses a CQL driver will work with ScyllaDB.
The list below contains links to integration projects using ScyllaDB with third-party projects.

View File

@@ -2,7 +2,7 @@
Integrate ScyllaDB with Databricks
==================================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
Resource list
-------------

View File

@@ -3,7 +3,7 @@ Integrate ScyllaDB with Elasticsearch
=====================================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
The list below contains integration projects using ScyllaDB with Elasticsearch. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.

View File

@@ -13,11 +13,11 @@ The Jaeger Query service offers a web-based UI and API for users to explore, vis
Jaeger also supports integration with other observability tools like Prometheus and Grafana,
making it a popular choice for monitoring modern distributed applications.
Jaeger Server `can also be run <https://www.jaegertracing.io/docs/2.11/storage/cassandra/#compatible-backends>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
Jaeger Server `can also be run <https://github.com/jaegertracing/jaeger/tree/main/plugin/storage/scylladb>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
As a drop-in replacement for Cassandra, ScyllaDB implements the same protocol and provides a high-performance,
low-latency alternative. This compatibility allows Jaeger users to easily switch to ScyllaDB without making significant changes to their setup.
Using ScyllaDB as the storage backend for Jaeger Server can offer additional benefits,
such as improved performance, scalability, and resource efficiency.
This makes Jaeger even more effective for monitoring and troubleshooting distributed applications,
especially in high-traffic, demanding environments where a high-performance storage solution is critical.
especially in high-traffic, demanding environments where a high-performance storage solution is critical.

View File

@@ -3,7 +3,7 @@ Integrate ScyllaDB with Spark
=============================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
The list below contains integration projects using ScyllaDB with Spark. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.

View File

@@ -15,7 +15,6 @@
#include "db/config.hh"
#include "utils/log.hh"
#include "utils/hash.hh"
#include "utils/http.hh"
#include "utils/rjson.hh"
#include "utils/base64.hh"
#include "utils/loading_cache.hh"
@@ -268,6 +267,7 @@ std::tuple<std::string, std::string> azure_host::impl::parse_key(std::string_vie
std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std::string_view vault) {
static const boost::regex vault_name_re(R"([a-zA-Z0-9-]+)");
static const boost::regex vault_endpoint_re(R"((https?)://([^/:]+)(?::(\d+))?)");
boost::smatch match;
std::string tmp{vault};
@@ -277,12 +277,16 @@ std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std
return {"https", fmt::format(AKV_HOST_TEMPLATE, vault), 443};
}
try {
auto info = utils::http::parse_simple_url(tmp);
return {info.scheme, info.host, info.port};
} catch (...) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault)));
if (boost::regex_match(tmp, match, vault_endpoint_re)) {
std::string scheme = match[1];
std::string host = match[2];
std::string port_str = match[3];
unsigned port = (port_str.empty()) ? (scheme == "https" ? 443 : 80) : std::stoi(port_str);
return {scheme, host, port};
}
throw std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault));
}
future<shared_ptr<tls::certificate_credentials>> azure_host::impl::make_creds() {

View File

@@ -816,7 +816,6 @@ public:
future<data_sink> wrap_sink(const sstables::sstable& sst, sstables::component_type type, data_sink sink) override {
switch (type) {
case sstables::component_type::Scylla:
case sstables::component_type::TemporaryScylla:
case sstables::component_type::TemporaryTOC:
case sstables::component_type::TOC:
co_return sink;
@@ -845,7 +844,6 @@ public:
sstables::component_type type,
data_source src) override {
switch (type) {
case sstables::component_type::TemporaryScylla:
case sstables::component_type::Scylla:
case sstables::component_type::TemporaryTOC:
case sstables::component_type::TOC:

View File

@@ -36,7 +36,6 @@
#include "encryption_exceptions.hh"
#include "symmetric_key.hh"
#include "utils.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
#include "utils/UUID.hh"
@@ -164,8 +163,6 @@ private:
shared_ptr<seastar::tls::certificate_credentials> _creds;
std::unordered_map<bytes, shared_ptr<symmetric_key>> _cache;
bool _initialized = false;
abort_source _as;
};
template<typename T, typename C>
@@ -254,50 +251,24 @@ future<rjson::value> encryption::gcp_host::impl::gcp_auth_post_with_retry(std::s
auto& creds = i->second;
static constexpr auto max_retries = 10;
exponential_backoff_retry exr(10ms, 10000ms);
bool do_backoff = false;
bool did_auth_retry = false;
for (int retry = 0; ; ++retry) {
if (std::exchange(do_backoff, false)) {
co_await exr.retry(_as);
}
bool refreshing = true;
int retries = 0;
for (;;) {
try {
co_await creds.refresh(KMS_SCOPE, _certs);
refreshing = false;
} catch (...) {
std::throw_with_nested(permission_error("Error refreshing credentials"));
}
try {
auto res = co_await send_request(uri, _certs, body, httpd::operation_type::POST, key_values({
{ utils::gcp::AUTHORIZATION, utils::gcp::format_bearer(creds.token) },
}), &_as);
}));
co_return res;
} catch (httpd::unexpected_status_error& e) {
gcp_log.debug("{}: Got unexpected response: {}", uri, e.status());
switch (e.status()) {
default:
if (http::reply::classify_status(e.status()) != http::reply::status_class::server_error) {
break;
}
[[fallthrough]];
case httpclient::reply_status::request_timeout:
if (retry < max_retries) {
// service unavailable etc -> backoff + retry
do_backoff = true;
did_auth_retry = false; // reset this, since we might cause expiration due to backoff (not really, but...)
continue;
}
break;
}
if (refreshing) {
std::throw_with_nested(permission_error("Error refreshing credentials"));
}
if (e.status() == http::reply::status_type::unauthorized && retry < max_retries && !did_auth_retry) {
// refresh access token and retry. no backoff
did_auth_retry = true;
if (e.status() == http::reply::status_type::unauthorized && retries++ < 3) {
// refresh access token and retry.
continue;
}
if (e.status() == http::reply::status_type::unauthorized) {
@@ -351,7 +322,6 @@ future<> encryption::gcp_host::impl::init() {
}
future<> encryption::gcp_host::impl::stop() {
_as.request_abort();
co_await _attr_cache.stop();
co_await _id_cache.stop();
}

View File

@@ -38,7 +38,6 @@
#include "utils/loading_cache.hh"
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "utils/http.hh"
#include "marshal_exception.hh"
#include "db/config.hh"
@@ -323,26 +322,17 @@ future<> kmip_host::impl::connection::connect() {
f = f.then([this, cred] {
return cred->set_x509_trust_file(_options.truststore, seastar::tls::x509_crt_format::PEM);
});
} else {
f = f.then([cred] {
return cred->set_system_trust();
});
}
return f.then([this, cred] {
// TODO, find if we should do hostname verification
// TODO: connect all failovers already?
// Use the URL parser to handle ipv6 etc proper.
// Turn host arg into a URL.
auto info = utils::http::parse_simple_url("kmip://" + _host);
auto name = info.host;
auto port = info.port != 80 ? info.port : kmip_port;
auto i = _host.find_last_of(':');
auto name = _host.substr(0, i);
auto port = i != sstring::npos ? std::stoul(_host.substr(i + 1)) : kmip_port;
return seastar::net::dns::resolve_name(name).then([this, cred, port, name](seastar::net::inet_address addr) {
kmip_log.debug("Try connect {}:{}", addr, port);
// TODO: should we verify non-numeric hosts here? (opts.server_name)
// Adding this might break existing users with half-baked certs.
return seastar::tls::connect(cred, seastar::socket_address{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
return seastar::net::dns::resolve_name(name).then([this, cred, port](seastar::net::inet_address addr) {
return seastar::tls::connect(cred, seastar::ipv4_addr{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
kmip_log.debug("Successfully connected {}", _host);
// #998 Set keepalive to try avoiding connection going stale in between commands.
s.set_keepalive_parameters(net::tcp_keepalive_params{60s, 60s, 10});

View File

@@ -35,7 +35,6 @@
#include "utils/exponential_backoff_retry.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
#include "utils/http.hh"
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "utils/rjson.hh"
@@ -152,10 +151,15 @@ public:
{
// check if we have an explicit endpoint set.
if (!_options.endpoint.empty()) {
auto info = utils::http::parse_simple_url(_options.endpoint);
_options.https = info.is_https();
_options.host = info.host;
_options.port = info.port;
static std::regex simple_url(R"foo((https?):\/\/(?:([\w\.]+)|\[([\w:]+)\]):?(\d+)?\/?)foo");
std::transform(_options.endpoint.begin(), _options.endpoint.end(), _options.endpoint.begin(), ::tolower);
std::smatch m;
if (!std::regex_match(_options.endpoint, m, simple_url)) {
throw std::invalid_argument(fmt::format("Could not parse URL: {}", _options.endpoint));
}
_options.https = m[1].str() == "https";
_options.host = m[2].length() > 0 ? m[2].str() : m[3].str();
_options.port = m[4].length() > 0 ? std::stoi(m[4].str()) : 0;
}
if (_options.endpoint.empty() && _options.host.empty() && _options.aws_region.empty() && !_options.aws_use_ec2_region) {
throw std::invalid_argument("No AWS region or endpoint specified");

View File

@@ -55,7 +55,6 @@ debian_base_packages=(
librapidxml-dev
libcrypto++-dev
libxxhash-dev
zlib1g-dev
slapd
ldap-utils
libcpp-jwt-dev
@@ -118,7 +117,6 @@ fedora_packages=(
makeself
libzstd-static libzstd-devel
lz4-static lz4-devel
zlib-ng-compat-devel
rpm-build
devscripts
debhelper

View File

@@ -157,7 +157,6 @@ adjust_bin() {
export GNUTLS_SYSTEM_PRIORITY_FILE="\${GNUTLS_SYSTEM_PRIORITY_FILE-$prefix/libreloc/gnutls.config}"
export LD_LIBRARY_PATH="$prefix/libreloc"
export UBSAN_OPTIONS="${UBSAN_OPTIONS:+$UBSAN_OPTIONS:}suppressions=$prefix/libexec/ubsan-suppressions.supp"
${p11_trust_paths:+export SCYLLA_P11_TRUST_PATHS="$p11_trust_paths"}
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
EOF
chmod 755 "$root/$prefix/bin/$bin"
@@ -331,6 +330,7 @@ if ! $nonroot; then
rsysconfdir=$(realpath -m "$root/$sysconfdir")
rusr=$(realpath -m "$root/usr")
rsystemd=$(realpath -m "$rusr/lib/systemd/system")
rshare="$rprefix/share"
rdoc="$rprefix/share/doc"
rdata=$(realpath -m "$root/var/lib/scylla")
rhkdata=$(realpath -m "$root/var/lib/scylla-housekeeping")
@@ -338,6 +338,7 @@ else
retc="$rprefix/etc"
rsysconfdir="$rprefix/$sysconfdir"
rsystemd="$HOME/.config/systemd/user"
rshare="$rprefix/share"
rdoc="$rprefix/share/doc"
rdata="$rprefix"
fi
@@ -521,6 +522,16 @@ PRODUCT="$product"
EOS
chmod 644 "$rprefix"/scripts/scylla_product.py
install -d -m755 "$rshare"/p11-kit/modules
cat << EOS > "$rshare"/p11-kit/modules/p11-kit-trust.module
module: $prefix/libreloc/pkcs11/p11-kit-trust.so
priority: 1
trust-policy: yes
x-trust-lookup: pkcs11:library-description=PKCS%2311%20Kit%20Trust%20Module
disable-in: p11-kit-proxy
x-init-reserved: paths=$p11_trust_paths
EOS
if ! $nonroot && ! $without_systemd; then
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
install -m644 dist/common/systemd/scylla-server.service.d/dependencies.conf -Dt "$retc"/systemd/system/scylla-server.service.d

56
main.cc
View File

@@ -10,8 +10,6 @@
#include <functional>
#include <fmt/ranges.h>
#include <gnutls/pkcs11.h>
#include <seastar/util/closeable.hh>
#include <seastar/core/abort_source.hh>
#include "db/view/view_building_worker.hh"
@@ -120,11 +118,15 @@
#include "message/dictionary_service.hh"
#include "sstable_dict_autotrainer.hh"
#include "utils/disk_space_monitor.hh"
#include "auth/cache.hh"
#include "utils/labels.hh"
#include "tools/utils.hh"
#define P11_KIT_FUTURE_UNSTABLE_API
extern "C" {
#include <p11-kit/p11-kit.h>
}
namespace fs = std::filesystem;
#include <seastar/core/metrics_api.hh>
#include <seastar/core/relabel_config.hh>
@@ -706,6 +708,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
print_starting_message(ac, av, parsed_opts);
}
// We have to override p11-kit config path before p11-kit initialization.
// And the initialization will invoke on seastar initialization, so it has to
// be before app.run()
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe"));
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
auto p11_modules_str = p11_modules.string<char>();
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
sharded<locator::shared_token_metadata> token_metadata;
sharded<locator::effective_replication_map_factory> erm_factory;
sharded<service::migration_notifier> mm_notifier;
@@ -717,7 +727,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
service::load_meter load_meter;
sharded<service::storage_proxy> proxy;
sharded<auth::cache> auth_cache;
sharded<service::storage_service> ss;
sharded<service::migration_manager> mm;
sharded<tasks::task_manager> task_manager;
@@ -780,7 +789,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return seastar::async([&app, cfg, ext, &disk_space_monitor_shard0, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
&repair, &sst_loader, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
&hashing_worker, &vector_store_client] {
try {
if (opts.contains("relabel-config-file") && !opts["relabel-config-file"].as<sstring>().empty()) {
@@ -1793,12 +1802,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "starting auth cache");
auth_cache.start(std::ref(qp)).get();
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [&] {
auth_cache.stop().get();
});
checkpoint(stop_signal, "initializing storage service");
debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
@@ -1807,7 +1810,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
std::ref(messaging), std::ref(repair),
std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch),
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(view_builder), std::ref(view_building_worker), std::ref(qp), std::ref(sl_controller),
std::ref(auth_cache),
std::ref(tsm), std::ref(vbsm), std::ref(task_manager), std::ref(gossip_address_map),
compression_dict_updated_callback,
only_on_shard0(&*disk_space_monitor_shard0)
@@ -1823,6 +1825,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ss.stop().get();
});
api::set_server_storage_service(ctx, ss, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});
checkpoint(stop_signal, "initializing query processor remote part");
// TODO: do this together with proxy.start_remote(...)
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(mapreduce_service),
@@ -2063,7 +2070,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache), std::ref(hashing_worker)).get();
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(hashing_worker)).get();
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
@@ -2177,11 +2184,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// This will also disable migration manager schema pulls if needed.
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
api::set_server_storage_service(ctx, ss, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});
with_scheduling_group(maintenance_scheduling_group, [&] {
return messaging.invoke_on_all([&] (auto& ms) {
return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) {
@@ -2339,7 +2341,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache), std::ref(hashing_worker)).get();
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(hashing_worker)).get();
std::any stop_auth_service;
// Has to be called after node joined the cluster (join_cluster())
@@ -2685,15 +2687,13 @@ int main(int ac, char** av) {
// #3583 - need to potentially ensure this for tools as well, since at least
// sstable* might need crypto libraries.
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe")); // could just be argv[0] I guess...
auto p11_trust_paths_from_env = std::getenv("SCYLLA_P11_TRUST_PATHS");
auto trust_module_path = scylla_path.parent_path().parent_path().append("libreloc/pkcs11/p11-kit-trust.so");
if (fs::exists(trust_module_path) && p11_trust_paths_from_env) {
gnutls_pkcs11_init(GNUTLS_PKCS11_FLAG_MANUAL, nullptr);
auto trust_config = fmt::format("p11-kit:paths={} trusted=yes", p11_trust_paths_from_env);
auto ret = gnutls_pkcs11_add_provider(trust_module_path.string().c_str(), trust_config.c_str());
if (ret != GNUTLS_E_SUCCESS) {
startlog.warn("Could not initialize p11-kit trust module: {}\n", gnutls_strerror(ret));
}
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
// Note: must be in scope for application lifetime. p11_kit_override_system_files does _not_
// copy input strings.
auto p11_modules_str = p11_modules.string<char>();
// #3392 only do this if we are actually packaged and the path exists.
if (fs::exists(p11_modules)) {
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
}
return main_func(ac, av);

View File

@@ -46,7 +46,7 @@ bool follower_progress::is_stray_reject(const append_reply::rejected& rejected)
// any reject during snapshot transfer is stray one
return true;
default:
scylla_assert(false, "invalid follower_progress state: {}", static_cast<int>(state));
SCYLLA_ASSERT(false);
}
return false;
}
@@ -87,7 +87,7 @@ bool follower_progress::can_send_to() {
// before starting to sync the log.
return false;
}
scylla_assert(false, "invalid follower_progress state in can_send_to: {}", static_cast<int>(state));
SCYLLA_ASSERT(false);
return false;
}

View File

@@ -2329,7 +2329,11 @@ future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_m
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now()- start);
rlogger.info("repair[{}]: Finished tablet repair for table={}.{} range={} duration={} replicas={} global_tablet_id={} flush_time={}",
id.uuid(), keyspace_name, table_name, range, duration, replicas, gid, flush_time);
co_return flush_time;
if (!flush_time.has_value()) {
throw std::runtime_error(format("Batchlog reply failed for table={}.{} range={} replicas={} global_tablet_id={}",
id.uuid(), keyspace_name, table_name, range, replicas, gid));
}
co_return flush_time.value();
}
tasks::is_user_task repair::tablet_repair_task_impl::is_user_task() const noexcept {
@@ -2406,11 +2410,9 @@ future<> repair::tablet_repair_task_impl::run() {
});
auto parent_shard = this_shard_id();
auto flush_time = _flush_time;
auto res = rs.container().map_reduce0([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<std::pair<gc_clock::time_point, bool>> {
std::vector<std::optional<gc_clock::time_point>> flush_times(smp::count, gc_clock::time_point{});
rs.container().invoke_on_all([&idx, &flush_times, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<> {
std::exception_ptr error;
gc_clock::time_point shard_flush_time;
bool flush_failed = false;
for (auto& m : metas) {
if (m.master_shard_id != this_shard_id()) {
continue;
@@ -2464,24 +2466,27 @@ future<> repair::tablet_repair_task_impl::run() {
error = std::move(ep);
}
}
auto time = task->get_flush_time();
shard_flush_time = shard_flush_time == gc_clock::time_point() ? time : std::min(shard_flush_time, time);
flush_failed = flush_failed || (needs_flush_before_repair && !hints_batchlog_flushed);
auto current = flush_times[this_shard_id()];
if ((needs_flush_before_repair &&!hints_batchlog_flushed) || !current.has_value()) {
flush_times[this_shard_id()] = std::nullopt;
} else {
auto time = task->get_flush_time();
flush_times[this_shard_id()] = current == gc_clock::time_point() ? time : std::min(current.value(), time);
}
}
if (error) {
co_await coroutine::return_exception_ptr(std::move(error));
}
co_return std::make_pair(shard_flush_time, flush_failed);
}, std::make_pair<gc_clock::time_point, bool>(std::move(flush_time), false), [] (const auto& p1, const auto& p2) {
auto& [time1, failed1] = p1;
auto& [time2, failed2] = p2;
auto flush_time = time1 == gc_clock::time_point() ? time2 :
(time2 == gc_clock::time_point() ? time1 : std::min(time1, time2));
auto failed = failed1 || failed2;
return std::make_pair(flush_time, failed);
}).get();
_flush_time = res.first;
_should_flush_and_flush_failed = res.second;
for (auto& time : flush_times) {
if (!time.has_value()) {
_flush_time = std::nullopt;
break;
}
if (time != gc_clock::time_point()) {
_flush_time = _flush_time == gc_clock::time_point() ? time : std::min(_flush_time.value(), time.value());
}
}
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time);
rlogger.info("repair[{}]: Finished user-requested repair for tablet keyspace={} tables={} repair_id={} tablets_repaired={} duration={}",
id.uuid(), _keyspace, _tables, id.id, _metas.size(), duration);

View File

@@ -2529,7 +2529,7 @@ future<repair_update_system_table_response> repair_service::repair_update_system
}
}
if (req.range.end()) {
if (!req.range.end()->is_inclusive() && req.range.end()->value() != dht::maximum_token()) {
if (!req.range.end()->is_inclusive()) {
is_valid_range = false;
}
}

View File

@@ -112,8 +112,7 @@ private:
optimized_optional<abort_source::subscription> _abort_subscription;
std::optional<int> _ranges_parallelism;
size_t _metas_size = 0;
gc_clock::time_point _flush_time = gc_clock::time_point();
bool _should_flush_and_flush_failed = false;
std::optional<gc_clock::time_point> _flush_time = gc_clock::time_point();
service::frozen_topology_guard _topo_guard;
bool _skip_flush;
public:
@@ -135,12 +134,7 @@ public:
return tasks::is_abortable(!_abort_subscription);
}
gc_clock::time_point get_flush_time() const {
if (_should_flush_and_flush_failed) {
throw std::runtime_error(fmt::format("Flush is needed for repair {} with parent {}, but failed", id(), _parent_id));
}
return _flush_time;
}
std::optional<gc_clock::time_point> get_flush_time() const { return _flush_time; }
tasks::is_user_task is_user_task() const noexcept override;
virtual future<> release_resources() noexcept override;

View File

@@ -3704,7 +3704,7 @@ future<utils::chunked_vector<temporary_buffer<char>>> database::sample_data_file
}), std::ref(state));
// [1, 2, 3, 0] --> [0, 1, 3, 6]
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), uint64_t(0), std::plus());
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), 0, std::plus());
// We can't generate random non-negative integers smaller than 0,
// so let's just deal with the `total_chunks == 0` case with an early return.

View File

@@ -606,6 +606,10 @@ public:
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
// Adds a newly created sstable to the table. If adding fails, the sstable is deleted from disk.
// This is intended for use during streaming to prevent orphaned sstables.
future<> add_new_sstable(sstables::shared_sstable sst,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
sstables::shared_sstable make_sstable();

View File

@@ -1389,6 +1389,19 @@ table::add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offs
co_await do_add_sstable_and_update_cache(std::move(sst), offstrategy, do_trigger_compaction);
}
future<>
table::add_new_sstable(sstables::shared_sstable sst, sstables::offstrategy offstrategy) {
try {
co_await add_sstable_and_update_cache(sst, offstrategy);
} catch (...) {
// If attaching the sstable fails, delete it to prevent data resurrection
// and issues with tablet splits. The sstable is already sealed on disk
// and must be removed before the topology guard expires.
co_await sst->unlink();
throw;
}
}
future<>
table::add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts) {
constexpr bool do_not_trigger_compaction = false;

View File

@@ -301,7 +301,6 @@ protected:
class ghost_row_deleting_query_pager : public service::pager::query_pager {
service::storage_proxy& _proxy;
db::timeout_clock::duration _timeout_duration;
size_t _concurrency;
public:
ghost_row_deleting_query_pager(schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
@@ -310,12 +309,10 @@ public:
dht::partition_range_vector ranges,
cql3::cql_stats& stats,
service::storage_proxy& proxy,
db::timeout_clock::duration timeout_duration,
size_t concurrency)
db::timeout_clock::duration timeout_duration)
: query_pager(proxy, s, selection, state, options, std::move(cmd), std::move(ranges), std::nullopt)
, _proxy(proxy)
, _timeout_duration(timeout_duration)
, _concurrency(concurrency)
{}
virtual ~ghost_row_deleting_query_pager() {}
@@ -325,12 +322,8 @@ public:
_query_read_repair_decision = qr.read_repair_decision;
qr.query_result->ensure_counts();
return seastar::async([this, query_result = std::move(qr.query_result), page_size, now] () mutable -> result<> {
std::exception_ptr ex;
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration, _concurrency, ex},
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration},
std::move(query_result), page_size, now);
if (ex) {
std::rethrow_exception(ex);
}
return bo::success();
});
}));
@@ -510,8 +503,7 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
dht::partition_range_vector ranges,
cql3::cql_stats& stats,
storage_proxy& proxy,
db::timeout_clock::duration duration,
size_t concurrency) {
db::timeout_clock::duration duration) {
return ::make_shared<ghost_row_deleting_query_pager>(std::move(s), std::move(selection), state,
options, std::move(cmd), std::move(ranges), stats, proxy, duration, concurrency);
options, std::move(cmd), std::move(ranges), stats, proxy, duration);
}

View File

@@ -47,8 +47,7 @@ public:
dht::partition_range_vector,
cql3::cql_stats& stats,
storage_proxy& proxy,
db::timeout_clock::duration timeout_duration,
size_t concurrency);
db::timeout_clock::duration timeout_duration);
};
}

View File

@@ -6,7 +6,6 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "service/raft/group0_state_machine.hh"
#include "auth/cache.hh"
#include "db/schema_tables.hh"
#include "mutation/atomic_cell.hh"
#include "cql3/selection/selection.hh"
@@ -175,7 +174,6 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) {
bool update_service_levels_effective_cache = false;
bool make_view_building_state_transition = false;
std::unordered_set<table_id> update_cdc_streams;
std::unordered_set<auth::cache::role_name_t> update_auth_cache_roles;
for (const auto& m : modules.entries) {
if (m.table == db::system_keyspace::service_levels_v2()->id()) {
@@ -199,12 +197,6 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) {
const auto elements = m.pk.explode(*db::system_keyspace::cdc_streams_history());
auto cdc_log_table_id = table_id(value_cast<utils::UUID>(uuid_type->deserialize_value(elements.front())));
update_cdc_streams.insert(cdc_log_table_id);
} else if (auth::cache::includes_table(m.table)) {
auto schema = _ss.get_database().find_schema(m.table);
const auto elements = m.pk.explode(*schema);
auto role = value_cast<sstring>(schema->partition_key_type()->
types().front()->deserialize(elements.front()));
update_auth_cache_roles.insert(std::move(role));
}
}
@@ -217,9 +209,6 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) {
if (update_cdc_streams.size()) {
co_await _ss.load_cdc_streams(std::move(update_cdc_streams));
}
if (update_auth_cache_roles.size()) {
co_await _ss.auth_cache().load_roles(std::move(update_auth_cache_roles));
}
}
future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merger) {
@@ -386,7 +375,6 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
if (_feature_service.cdc_with_tablets) {
co_await _ss.load_cdc_streams();
}
co_await _ss.auth_cache().load_all();
_ss._topology_state_machine.event.broadcast();
_ss._view_building_state_machine.event.broadcast();
}
@@ -455,8 +443,6 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
co_await mutate_locally(std::move(raft_snp->mutations), _sp);
}
co_await _ss.auth_cache().load_all();
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
} catch (const abort_requested_exception&) {
throw raft::request_aborted(fmt::format(

View File

@@ -18,7 +18,6 @@
#include "compaction/task_manager_module.hh"
#include "gc_clock.hh"
#include "raft/raft.hh"
#include "auth/cache.hh"
#include <ranges>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sleep.hh>
@@ -204,7 +203,6 @@ storage_service::storage_service(abort_source& abort_source,
sharded<db::view::view_building_worker>& view_building_worker,
cql3::query_processor& qp,
sharded<qos::service_level_controller>& sl_controller,
auth::cache& auth_cache,
topology_state_machine& topology_state_machine,
db::view::view_building_state_machine& view_building_state_machine,
tasks::task_manager& tm,
@@ -223,7 +221,6 @@ storage_service::storage_service(abort_source& abort_source,
, _stream_manager(stream_manager)
, _snitch(snitch)
, _sl_controller(sl_controller)
, _auth_cache(auth_cache)
, _group0(nullptr)
, _async_gate("storage_service")
, _node_ops_abort_thread(node_ops_abort_thread())
@@ -277,10 +274,6 @@ node_ops::task_manager_module& storage_service::get_node_ops_module() noexcept {
return *_node_ops_module;
}
auth::cache& storage_service::auth_cache() noexcept {
return _auth_cache;
}
enum class node_external_status {
UNKNOWN = 0,
STARTING = 1,
@@ -600,7 +593,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip));
break;
case node_state::replacing: {
scylla_assert(_topology_state_machine._topology.req_param.contains(id));
SCYLLA_ASSERT(_topology_state_machine._topology.req_param.contains(id));
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
@@ -688,7 +681,7 @@ future<> storage_service::notify_nodes_after_sync(nodes_to_notify_after_sync&& n
future<> storage_service::topology_state_load(state_change_hint hint) {
#ifdef SEASTAR_DEBUG
static bool running = false;
scylla_assert(!running); // The function is not re-entrant
SCYLLA_ASSERT(!running); // The function is not re-entrant
auto d = defer([] {
running = false;
});
@@ -715,14 +708,11 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
co_return;
}
if (_qp.auth_version < db::system_keyspace::auth_version_t::v2) {
co_await _qp.container().invoke_on_all([] (cql3::query_processor& qp) {
// auth-v2 gets enabled when consistent topology changes are enabled
// (see topology::upgrade_state_type::done above) as we use the same migration procedure
co_await _qp.container().invoke_on_all([] (cql3::query_processor& qp) {
qp.auth_version = db::system_keyspace::auth_version_t::v2;
});
co_await auth_cache().load_all();
}
qp.auth_version = db::system_keyspace::auth_version_t::v2;
});
co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) {
sl_controller.upgrade_to_v2(_qp, _group0->client());
@@ -854,7 +844,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
}
future<> storage_service::topology_transition(state_change_hint hint) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
co_await topology_state_load(std::move(hint)); // reload new state
_topology_state_machine.event.broadcast();
@@ -898,7 +888,7 @@ future<> storage_service::view_building_state_load() {
}
future<> storage_service::view_building_transition() {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
co_await view_building_state_load();
_view_building_state_machine.event.broadcast();
@@ -966,7 +956,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
}
future<> storage_service::update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache, qos::query_context ctx) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
if (_sl_controller.local().is_v2()) {
// Skip cache update unless the topology upgrade is done
co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx);
@@ -1520,7 +1510,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
}
future<> storage_service::start_upgrade_to_raft_topology() {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) {
co_return;
@@ -1572,7 +1562,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
}
topology::upgrade_state_type storage_service::get_topology_upgrade_state() const {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
return _topology_state_machine._topology.upgrade_state;
}
@@ -1841,7 +1831,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
slogger.info("Nodes {} are alive", get_sync_nodes());
}
scylla_assert(_group0);
SCYLLA_ASSERT(_group0);
join_node_request_params join_params {
.host_id = _group0->load_my_id(),
@@ -2083,7 +2073,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
if (!_sys_ks.local().bootstrap_complete()) {
// If we're not bootstrapping then we shouldn't have chosen a CDC streams timestamp yet.
scylla_assert(should_bootstrap() || !cdc_gen_id);
SCYLLA_ASSERT(should_bootstrap() || !cdc_gen_id);
// Don't try rewriting CDC stream description tables.
// See cdc.md design notes, `Streams description table V1 and rewriting` section, for explanation.
@@ -2167,7 +2157,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
throw std::runtime_error(err);
}
scylla_assert(_group0);
SCYLLA_ASSERT(_group0);
co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local(), false);
co_await _cdc_gens.local().after_join(std::move(cdc_gen_id));
@@ -2192,7 +2182,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
}
future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded<service::storage_proxy>& proxy) {
scylla_assert(_group0);
SCYLLA_ASSERT(_group0);
while (true) {
_group0_as.check();
@@ -2316,7 +2306,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
// We don't do any other generation switches (unless we crash before complecting bootstrap).
scylla_assert(!cdc_gen_id);
SCYLLA_ASSERT(!cdc_gen_id);
cdc_gen_id = _cdc_gens.local().legacy_make_new_generation(bootstrap_tokens, !is_first_node()).get();
@@ -2349,9 +2339,9 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
slogger.debug("Removing replaced endpoint {} from system.peers", replace_addr);
_sys_ks.local().remove_endpoint(replace_addr).get();
scylla_assert(replaced_host_id);
SCYLLA_ASSERT(replaced_host_id);
auto raft_id = raft::server_id{replaced_host_id.uuid()};
scylla_assert(_group0);
SCYLLA_ASSERT(_group0);
bool raft_available = _group0->wait_for_raft().get();
if (raft_available) {
slogger.info("Replace: removing {}/{} from group 0...", replace_addr, raft_id);
@@ -3000,7 +2990,7 @@ future<> storage_service::stop_transport() {
}
future<> storage_service::drain_on_shutdown() {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
return (_operation_mode == mode::DRAINING || _operation_mode == mode::DRAINED) ?
_drain_finished.get_future() : do_drain();
}
@@ -3025,7 +3015,7 @@ bool storage_service::is_topology_coordinator_enabled() const {
future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
start_hint_manager start_hm, gms::generation_type new_generation) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
if (_sys_ks.local().was_decommissioned()) {
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless "
@@ -3225,7 +3215,7 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
}
future<token_metadata_change> storage_service::prepare_token_metadata_change(mutable_token_metadata_ptr tmptr, const schema_getter& schema_getter) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
std::exception_ptr ex;
token_metadata_change change;
@@ -3992,7 +3982,7 @@ future<> storage_service::decommission() {
slogger.info("DECOMMISSIONING: starts");
ctl.req.leaving_nodes = std::list<gms::inet_address>{endpoint};
scylla_assert(ss._group0);
SCYLLA_ASSERT(ss._group0);
bool raft_available = ss._group0->wait_for_raft().get();
try {
@@ -4044,7 +4034,7 @@ future<> storage_service::decommission() {
if (raft_available && left_token_ring) {
slogger.info("decommission[{}]: leaving Raft group 0", uuid);
scylla_assert(ss._group0);
SCYLLA_ASSERT(ss._group0);
ss._group0->leave_group0().get();
slogger.info("decommission[{}]: left Raft group 0", uuid);
}
@@ -4350,7 +4340,7 @@ future<> storage_service::removenode(locator::host_id host_id, locator::host_id_
auto stop_ctl = deferred_stop(ctl);
auto uuid = ctl.uuid();
const auto& tmptr = ctl.tmptr;
scylla_assert(ss._group0);
SCYLLA_ASSERT(ss._group0);
auto raft_id = raft::server_id{host_id.uuid()};
bool raft_available = ss._group0->wait_for_raft().get();
bool is_group0_member = raft_available && ss._group0->is_member(raft_id, false);
@@ -4470,7 +4460,7 @@ future<> storage_service::removenode(locator::host_id host_id, locator::host_id_
}
future<> storage_service::check_and_repair_cdc_streams() {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
if (!_cdc_gens.local_is_initialized()) {
return make_exception_future<>(std::runtime_error("CDC generation service not initialized yet"));
@@ -5784,7 +5774,7 @@ future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_
}
future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
try {
locator::dc_rack_fn get_dc_rack_by_host_id([this, &tm = *tmptr] (locator::host_id host_id) -> std::optional<locator::endpoint_dc_rack> {
@@ -5831,7 +5821,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
}
future<locator::mutable_token_metadata_ptr> storage_service::prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint, mutable_token_metadata_ptr pending_token_metadata) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
if (hint) {
co_await replica::update_tablet_metadata(_db.local(), _qp, pending_token_metadata->tablets(), hint);
} else {
@@ -5955,7 +5945,7 @@ void storage_service::start_tablet_split_monitor() {
}
future<> storage_service::snitch_reconfigured() {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
auto& snitch = _snitch.local();
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
// re-read local rack and DC info
@@ -6487,8 +6477,8 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
co_await utils::get_local_injector().inject("block_tablet_streaming", [this, &tablet] (auto& handler) -> future<> {
const auto keyspace = handler.get("keyspace");
const auto table = handler.get("table");
scylla_assert(keyspace);
scylla_assert(table);
SCYLLA_ASSERT(keyspace);
SCYLLA_ASSERT(table);
auto s = _db.local().find_column_family(tablet.table).schema();
bool should_block = s->ks_name() == *keyspace && s->cf_name() == *table;
while (should_block && !handler.poll_for_message() && !_async_gate.is_closed()) {
@@ -7509,7 +7499,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
}
future<join_node_response_result> storage_service::join_node_response_handler(join_node_response_params params) {
scylla_assert(this_shard_id() == 0);
SCYLLA_ASSERT(this_shard_id() == 0);
// Usually this handler will only run once, but there are some cases where we might get more than one RPC,
// possibly happening at the same time, e.g.:

View File

@@ -114,8 +114,6 @@ namespace replica {
class tablet_mutation_builder;
}
namespace auth { class cache; }
namespace utils {
class disk_space_monitor;
}
@@ -201,7 +199,6 @@ private:
sharded<streaming::stream_manager>& _stream_manager;
sharded<locator::snitch_ptr>& _snitch;
sharded<qos::service_level_controller>& _sl_controller;
auth::cache& _auth_cache;
// Engaged on shard 0 before `join_cluster`.
service::raft_group0* _group0;
@@ -268,7 +265,6 @@ public:
sharded<db::view::view_building_worker>& view_building_worker,
cql3::query_processor& qp,
sharded<qos::service_level_controller>& sl_controller,
auth::cache& auth_cache,
topology_state_machine& topology_state_machine,
db::view::view_building_state_machine& view_building_state_machine,
tasks::task_manager& tm,
@@ -1002,8 +998,6 @@ public:
// update_both_cache_levels::no - update only effective service levels cache
future<> update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache = qos::update_both_cache_levels::yes, qos::query_context ctx = qos::query_context::unspecified);
auth::cache& auth_cache() noexcept;
// Should be called whenever new compression dictionaries are published to system.dicts.
// This is an arbitrary callback passed through the constructor,
// but its intended usage is to set up the RPC connections to use the new dictionaries.

View File

@@ -360,7 +360,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto& topo = _topo_sm._topology;
auto it = topo.find(id);
scylla_assert(it);
SCYLLA_ASSERT(it);
std::optional<topology_request> req;
auto rit = topo.requests.find(id);
@@ -1643,27 +1643,25 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
break;
case locator::tablet_transition_stage::cleanup_target:
if (do_barrier()) {
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
if (!trinfo.pending_replica) {
rtlogger.info("Tablet cleanup of {} skipped because no replicas pending", gid);
return make_ready_future<>();
}
locator::tablet_replica dst = *trinfo.pending_replica;
if (is_excluded(raft::server_id(dst.host.uuid()))) {
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded and doesn't need to revert migration", gid, dst);
return make_ready_future<>();
}
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
transition_to(locator::tablet_transition_stage::revert_migration);
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
if (!trinfo.pending_replica) {
rtlogger.info("Tablet cleanup of {} skipped because no replicas pending", gid);
return make_ready_future<>();
}
locator::tablet_replica dst = *trinfo.pending_replica;
if (is_excluded(raft::server_id(dst.host.uuid()))) {
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded and doesn't need to revert migration", gid, dst);
return make_ready_future<>();
}
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
transition_to(locator::tablet_transition_stage::revert_migration);
}
break;
case locator::tablet_transition_stage::revert_migration:
@@ -2310,7 +2308,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
switch (node.rs->state) {
case node_state::bootstrapping: {
scylla_assert(!node.rs->ring);
SCYLLA_ASSERT(!node.rs->ring);
auto num_tokens = std::get<join_param>(node.req_param.value()).num_tokens;
auto tokens_string = std::get<join_param>(node.req_param.value()).tokens_string;
@@ -2359,11 +2357,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
break;
case node_state::replacing: {
scylla_assert(!node.rs->ring);
SCYLLA_ASSERT(!node.rs->ring);
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
scylla_assert(it != _topo_sm._topology.normal_nodes.end());
scylla_assert(it->second.ring && it->second.state == node_state::normal);
SCYLLA_ASSERT(it != _topo_sm._topology.normal_nodes.end());
SCYLLA_ASSERT(it->second.ring && it->second.state == node_state::normal);
topology_mutation_builder builder(node.guard.write_timestamp());
@@ -3022,7 +3020,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtbuilder.set("start_time", db_clock::now());
switch (node.request.value()) {
case topology_request::join: {
scylla_assert(!node.rs->ring);
SCYLLA_ASSERT(!node.rs->ring);
// Write chosen tokens through raft.
builder.set_transition_state(topology::transition_state::join_group0)
.with_node(node.id)
@@ -3033,7 +3031,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
break;
}
case topology_request::leave:
scylla_assert(node.rs->ring);
SCYLLA_ASSERT(node.rs->ring);
// start decommission and put tokens of decommissioning nodes into write_both_read_old state
// meaning that reads will go to the replica being decommissioned
// but writes will go to new owner as well
@@ -3046,7 +3044,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
"start decommission");
break;
case topology_request::remove: {
scylla_assert(node.rs->ring);
SCYLLA_ASSERT(node.rs->ring);
builder.set_transition_state(topology::transition_state::tablet_draining)
.set_version(_topo_sm._topology.version + 1)
@@ -3058,7 +3056,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
break;
}
case topology_request::replace: {
scylla_assert(!node.rs->ring);
SCYLLA_ASSERT(!node.rs->ring);
builder.set_transition_state(topology::transition_state::join_group0)
.with_node(node.id)
@@ -3163,7 +3161,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto id = node.id;
scylla_assert(!_topo_sm._topology.transition_nodes.empty());
SCYLLA_ASSERT(!_topo_sm._topology.transition_nodes.empty());
release_node(std::move(node));
@@ -4013,7 +4011,7 @@ future<> topology_coordinator::stop() {
// but let's check all of them because we never reset these holders
// once they are added as barriers
for (auto& [stage, barrier]: tablet_state.barriers) {
scylla_assert(barrier.has_value());
SCYLLA_ASSERT(barrier.has_value());
co_await stop_background_action(barrier, gid, [stage] { return format("at stage {}", tablet_transition_stage_to_string(stage)); });
}

View File

@@ -27,7 +27,6 @@ enum class component_type {
TemporaryTOC,
TemporaryStatistics,
Scylla,
TemporaryScylla,
Rows,
Partitions,
TemporaryHashes,
@@ -77,8 +76,6 @@ struct fmt::formatter<sstables::component_type> : fmt::formatter<string_view> {
return formatter<string_view>::format("TemporaryStatistics", ctx);
case Scylla:
return formatter<string_view>::format("Scylla", ctx);
case TemporaryScylla:
return formatter<string_view>::format("TemporaryScylla", ctx);
case Partitions:
return formatter<string_view>::format("Partitions", ctx);
case Rows:

View File

@@ -251,7 +251,7 @@ void compression::discard_hidden_options() {
}
compressor& compression::get_compressor() const {
scylla_assert(_compressor);
SCYLLA_ASSERT(_compressor);
return *_compressor.get();
}

View File

@@ -170,7 +170,7 @@ struct compression {
const_iterator(const const_iterator& other) = default;
const_iterator& operator=(const const_iterator& other) {
scylla_assert(&_offsets == &other._offsets);
SCYLLA_ASSERT(&_offsets == &other._offsets);
_index = other._index;
return *this;
}

View File

@@ -24,7 +24,6 @@
#include "sstables/sstable_compressor_factory.hh"
#include "compressor.hh"
#include "exceptions/exceptions.hh"
#include "utils/assert.hh"
#include "utils/config_file_impl.hh"
#include "utils/class_registrator.hh"
#include "gms/feature_service.hh"
@@ -296,7 +295,7 @@ size_t zstd_processor::uncompress(const char* input, size_t input_len, char* out
if (_ddict) {
return ZSTD_decompress_usingDDict(dctx, output, output_len, input, input_len, _ddict->dict());
} else {
scylla_assert(!_cdict && "Write-only compressor used for reading");
SCYLLA_ASSERT(!_cdict && "Write-only compressor used for reading");
return ZSTD_decompressDCtx(dctx, output, output_len, input, input_len);
}
});
@@ -311,7 +310,7 @@ size_t zstd_processor::compress(const char* input, size_t input_len, char* outpu
if (_cdict) {
return ZSTD_compress_usingCDict(cctx, output, output_len, input, input_len, _cdict->dict());
} else {
scylla_assert(!_ddict && "Read-only compressor used for writing");
SCYLLA_ASSERT(!_ddict && "Read-only compressor used for writing");
return ZSTD_compressCCtx(cctx, output, output_len, input, input_len, _compression_level);
}
});
@@ -628,7 +627,7 @@ size_t lz4_processor::uncompress(const char* input, size_t input_len,
if (_ddict) {
ret = LZ4_decompress_safe_usingDict(input, output, input_len, output_len, reinterpret_cast<const char*>(_ddict->raw().data()), _ddict->raw().size());
} else {
scylla_assert(!_cdict && "Write-only compressor used for reading");
SCYLLA_ASSERT(!_cdict && "Write-only compressor used for reading");
ret = LZ4_decompress_safe(input, output, input_len, output_len);
}
if (ret < 0) {
@@ -658,7 +657,7 @@ size_t lz4_processor::compress(const char* input, size_t input_len,
LZ4_resetStream_fast(ctx);
}
} else {
scylla_assert(!_ddict && "Read-only compressor used for writing");
SCYLLA_ASSERT(!_ddict && "Read-only compressor used for writing");
ret = LZ4_compress_default(input, output + 4, input_len, LZ4_compressBound(input_len));
}
if (ret == 0) {
@@ -1269,7 +1268,7 @@ lz4_cdict::~lz4_cdict() {
}
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
scylla_assert(thread::running_in_thread());
SCYLLA_ASSERT(thread::running_in_thread());
struct wrapper : sstable_compressor_factory {
using impl = default_sstable_compressor_factory;
sharded<impl> _impl;

View File

@@ -44,14 +44,14 @@ public:
* @return A list of `sampling_level` unique indices between 0 and `sampling_level`
*/
static const std::vector<int>& get_sampling_pattern(int sampling_level) {
scylla_assert(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
SCYLLA_ASSERT(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
auto& entry = _sample_pattern_cache[sampling_level-1];
if (!entry.empty()) {
return entry;
}
if (sampling_level <= 1) {
scylla_assert(_sample_pattern_cache[0].empty());
SCYLLA_ASSERT(_sample_pattern_cache[0].empty());
_sample_pattern_cache[0].push_back(0);
return _sample_pattern_cache[0];
}
@@ -96,7 +96,7 @@ public:
* @return a list of original indexes for current summary entries
*/
static const std::vector<int>& get_original_indexes(int sampling_level) {
scylla_assert(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
SCYLLA_ASSERT(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
auto& entry = _original_index_cache[sampling_level-1];
if (!entry.empty()) {
return entry;
@@ -128,7 +128,7 @@ public:
* @return the number of partitions before the next index summary entry, inclusive on one end
*/
static int get_effective_index_interval_after_index(int index, int sampling_level, int min_index_interval) {
scylla_assert(index >= -1);
SCYLLA_ASSERT(index >= -1);
const std::vector<int>& original_indexes = get_original_indexes(sampling_level);
if (index == -1) {
return original_indexes[0] * min_index_interval;

View File

@@ -31,7 +31,7 @@ public:
[[noreturn]] void on_parse_error(sstring message, std::optional<component_name> filename);
[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos);
// Use this instead of scylla_assert() or assert() in code that is used while parsing SSTables.
// Use this instead of SCYLLA_ASSERT() or assert() in code that is used while parsing SSTables.
// SSTables can be corrupted either by ScyllaDB itself or by a freak accident like cosmic background
// radiation hitting the disk the wrong way. Either way a corrupt SSTable should not bring down the
// whole server. This method will call on_internal_error() if the condition is false.

View File

@@ -129,7 +129,7 @@ public:
/// way to determine that is overlapping its partition-ranges with the shard's
/// owned ranges.
static bool maybe_owned_by_this_shard(const sstables::generation_type& gen) {
scylla_assert(bool(gen));
SCYLLA_ASSERT(bool(gen));
int64_t hint = 0;
if (gen.is_uuid_based()) {
hint = std::hash<utils::UUID>{}(gen.as_uuid());

View File

@@ -91,7 +91,7 @@ public:
{}
void increment() {
scylla_assert(_range);
SCYLLA_ASSERT(_range);
if (!_range->next()) {
_range = nullptr;
}
@@ -102,7 +102,7 @@ public:
}
const ValueType dereference() const {
scylla_assert(_range);
SCYLLA_ASSERT(_range);
return _range->get_value();
}
@@ -153,7 +153,7 @@ public:
auto limit = std::min(_serialization_limit_size, _offset + clustering_block::max_block_size);
_current_block = {};
scylla_assert (_offset % clustering_block::max_block_size == 0);
SCYLLA_ASSERT (_offset % clustering_block::max_block_size == 0);
while (_offset < limit) {
auto shift = _offset % clustering_block::max_block_size;
if (_offset < _prefix.size(_schema)) {
@@ -280,7 +280,7 @@ public:
++_current_index;
}
} else {
scylla_assert(_mode == encoding_mode::large_encode_missing);
SCYLLA_ASSERT(_mode == encoding_mode::large_encode_missing);
while (_current_index < total_size) {
auto cell = _row.find_cell(_columns[_current_index].get().id);
if (!cell) {
@@ -632,10 +632,6 @@ private:
std::unique_ptr<file_writer> close_writer(std::unique_ptr<file_writer>& w);
void close_data_writer();
void close_index_writer();
void close_rows_writer();
void close_partitions_writer();
void ensure_tombstone_is_written() {
if (!_tombstone_written) {
consume(tombstone());
@@ -948,16 +944,17 @@ void writer::init_file_writers() {
_sst._schema->get_compressor_params(),
std::move(compressor)), _sst.get_filename());
}
if (_sst.has_component(component_type::Index)) {
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Index).get();
_index_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, _sst.index_filename());
_index_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), _sst.index_filename());
}
if (_sst.has_component(component_type::Partitions) && _sst.has_component(component_type::Rows)) {
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Rows).get();
_rows_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Rows));
_rows_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Rows));
_bti_row_index_writer = trie::bti_row_index_writer(*_rows_writer);
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Partitions).get();
_partitions_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Partitions));
_partitions_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Partitions));
_bti_partition_index_writer = trie::bti_partition_index_writer(*_partitions_writer);
}
if (_delayed_filter) {
@@ -985,41 +982,6 @@ void writer::close_data_writer() {
}
}
void writer::close_index_writer() {
if (_index_writer) {
auto writer = close_writer(_index_writer);
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
_sst.get_components_digests().index_digest = chksum_wr->full_checksum();
}
}
void writer::close_partitions_writer() {
if (_partitions_writer) {
_sst._partitions_db_footer = std::move(*_bti_partition_index_writer).finish(
_sst.get_version(),
_first_key.value(),
_last_key.value());
auto writer = close_writer(_partitions_writer);
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
_sst.get_components_digests().partitions_digest = chksum_wr->full_checksum();
}
}
void writer::close_rows_writer() {
if (_rows_writer) {
// Append some garbage padding to the file just to ensure that it's never empty.
// (Otherwise it would be empty if the sstable contains only small partitions).
// This is a hack to work around some bad interactions between zero-sized files
// and object storage. (It seems that e.g. minio considers a zero-sized file
// upload to be a no-op, which breaks some assumptions).
uint32_t garbage = seastar::cpu_to_be(0x13371337);
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
auto writer = close_writer(_rows_writer);
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
_sst.get_components_digests().rows_digest = chksum_wr->full_checksum();
}
}
void writer::consume_new_partition(const dht::decorated_key& dk) {
_c_stats.start_offset = _data_writer->offset();
_prev_row_start = _data_writer->offset();
@@ -1180,7 +1142,7 @@ void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clus
if (cdef.is_counter()) {
if (!is_deleted) {
scylla_assert(!cell.is_counter_update());
SCYLLA_ASSERT(!cell.is_counter_update());
auto ccv = counter_cell_view(cell);
write_counter_value(ccv, writer, _sst.get_version(), [] (bytes_ostream& out, uint32_t value) {
return write_vint(out, value);
@@ -1489,7 +1451,7 @@ template <typename W>
requires Writer<W>
static void write_clustering_prefix(sstable_version_types v, W& writer, bound_kind_m kind,
const schema& s, const clustering_key_prefix& clustering) {
scylla_assert(kind != bound_kind_m::static_clustering);
SCYLLA_ASSERT(kind != bound_kind_m::static_clustering);
write(v, writer, kind);
auto is_ephemerally_full = ephemerally_full_prefix{s.is_compact_table()};
if (kind != bound_kind_m::clustering) {
@@ -1668,10 +1630,27 @@ void writer::consume_end_of_stream() {
_collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
}
close_index_writer();
if (_index_writer) {
close_writer(_index_writer);
}
close_partitions_writer();
close_rows_writer();
if (_partitions_writer) {
_sst._partitions_db_footer = std::move(*_bti_partition_index_writer).finish(
_sst.get_version(),
_first_key.value(),
_last_key.value());
close_writer(_partitions_writer);
}
if (_rows_writer) {
// Append some garbage padding to the file just to ensure that it's never empty.
// (Otherwise it would be empty if the sstable contains only small partitions).
// This is a hack to work around some bad interactions between zero-sized files
// and object storage. (It seems that e.g. minio considers a zero-sized file
// upload to be a no-op, which breaks some assumptions).
uint32_t garbage = seastar::cpu_to_be(0x13371337);
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
close_writer(_rows_writer);
}
if (_hashes_writer) {
close_writer(_hashes_writer);

View File

@@ -59,7 +59,7 @@ private:
// Live entry_ptr should keep the entry alive, except when the entry failed on loading.
// In that case, entry_ptr holders are not supposed to use the pointer, so it's safe
// to nullify those entry_ptrs.
scylla_assert(!ready());
SCYLLA_ASSERT(!ready());
}
}

View File

@@ -496,7 +496,7 @@ sstable_directory::move_foreign_sstables(sharded<sstable_directory>& source_dire
return make_ready_future<>();
}
// Should be empty, since an SSTable that belongs to this shard is not remote.
scylla_assert(shard_id != this_shard_id());
SCYLLA_ASSERT(shard_id != this_shard_id());
dirlog.debug("Moving {} unshared SSTables of {}.{} to shard {} ", info_vec.size(), _schema->ks_name(), _schema->cf_name(), shard_id);
return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec));
});
@@ -540,7 +540,7 @@ sstable_directory::collect_output_unshared_sstables(std::vector<sstables::shared
dirlog.debug("Collecting {} output SSTables (remote={})", resharded_sstables.size(), remote_ok);
return parallel_for_each(std::move(resharded_sstables), [this, remote_ok] (sstables::shared_sstable sst) {
auto shards = sst->get_shards_for_this_sstable();
scylla_assert(shards.size() == 1);
SCYLLA_ASSERT(shards.size() == 1);
auto shard = shards[0];
if (shard == this_shard_id()) {

View File

@@ -283,7 +283,7 @@ bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) cons
}
sstlog.info("SSTable {}, as_unleveled={}, expect_unleveled={}, sst_tr={}, overlap_ratio={}",
sst->generation(), as_unleveled, expect_unleveled, sst_tr, dht::overlap_ratio(_token_range, sst_tr));
scylla_assert(as_unleveled == expect_unleveled);
SCYLLA_ASSERT(as_unleveled == expect_unleveled);
});
return as_unleveled;
@@ -712,8 +712,8 @@ public:
// by !empty(bound) and `_it` invariant:
// _it != _end, _it->first <= bound, and filter(*_it->second) == true
scylla_assert(_cmp(_it->first, bound) <= 0);
// we don't scylla_assert(filter(*_it->second)) due to the requirement that `filter` is called at most once for each sstable
SCYLLA_ASSERT(_cmp(_it->first, bound) <= 0);
// we don't SCYLLA_ASSERT(filter(*_it->second)) due to the requirement that `filter` is called at most once for each sstable
// Find all sstables with the same position as `_it` (they form a contiguous range in the container).
auto next = std::find_if(std::next(_it), _end, [this] (const value_t& v) { return _cmp(v.first, _it->first) != 0; });
@@ -1301,7 +1301,7 @@ sstable_set::create_single_key_sstable_reader(
mutation_reader::forwarding fwd_mr,
const sstable_predicate& predicate,
sstables::integrity_check integrity) const {
scylla_assert(pr.is_singular() && pr.start()->value().has_key());
SCYLLA_ASSERT(pr.is_singular() && pr.start()->value().has_key());
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity);
}
@@ -1408,7 +1408,7 @@ sstable_set::make_local_shard_sstable_reader(
{
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, &predicate, integrity]
(shared_sstable& sst, const dht::partition_range& pr) mutable {
scylla_assert(!sst->is_shared());
SCYLLA_ASSERT(!sst->is_shared());
if (!predicate(*sst)) {
return make_empty_mutation_reader(s, permit);
}

Some files were not shown because too many files have changed in this diff Show More