Compare commits
63 Commits
copilot/fi
...
copilot/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2c75edccd | ||
|
|
a5c217aef4 | ||
|
|
3e8c1e47c8 | ||
|
|
a4fc85c915 | ||
|
|
ff155a2c32 | ||
|
|
e698e89113 | ||
|
|
efe3e73b5c | ||
|
|
13644ff110 | ||
|
|
307262ca27 | ||
|
|
f7e1ca23f7 | ||
|
|
254c7e8cc9 | ||
|
|
f447c4464b | ||
|
|
62cda957bc | ||
|
|
d68f071b91 | ||
|
|
866c96f536 | ||
|
|
367633270a | ||
|
|
e97a504775 | ||
|
|
a5c971d21c | ||
|
|
a0809f0032 | ||
|
|
bb6e41f97a | ||
|
|
4df6b51ac2 | ||
|
|
0c8730ba05 | ||
|
|
bc2e83bc1f | ||
|
|
f4c3d5c1b7 | ||
|
|
e54abde3e8 | ||
|
|
9696ee64d0 | ||
|
|
8dd69f02a8 | ||
|
|
d000fa3335 | ||
|
|
4e289e8e6a | ||
|
|
9d2f7c3f52 | ||
|
|
e3e81a9a7a | ||
|
|
b82f92b439 | ||
|
|
f00e00fde0 | ||
|
|
b0727d3f2a | ||
|
|
4169bdb7a6 | ||
|
|
c5580399a8 | ||
|
|
1d42770936 | ||
|
|
d287b054b9 | ||
|
|
4f803aad22 | ||
|
|
a54bf50290 | ||
|
|
06dd3b2e64 | ||
|
|
6163fedd2e | ||
|
|
67f1c6d36c | ||
|
|
669286b1d6 | ||
|
|
b9199e8b24 | ||
|
|
1ff7f5941b | ||
|
|
3b70154f0a | ||
|
|
6ae72ed134 | ||
|
|
a191503ddf | ||
|
|
619bf3ac4b | ||
|
|
62802b119b | ||
|
|
323e5cd171 | ||
|
|
dd461e0472 | ||
|
|
0c9b2e5332 | ||
|
|
b29c42adce | ||
|
|
ea3dc0b0de | ||
|
|
2a6bef96d6 | ||
|
|
19da1cb656 | ||
|
|
2cf1ca43b5 | ||
|
|
642f468c59 | ||
|
|
bd7c87731b | ||
|
|
4c667e87ec | ||
|
|
aacf883a8b |
182
SCYLLA_ASSERT_CONVERSION_SUMMARY.md
Normal file
182
SCYLLA_ASSERT_CONVERSION_SUMMARY.md
Normal file
@@ -0,0 +1,182 @@
|
||||
# SCYLLA_ASSERT to scylla_assert() Conversion Summary
|
||||
|
||||
## Objective
|
||||
|
||||
Replace crash-inducing `SCYLLA_ASSERT` with exception-throwing `scylla_assert()` to prevent cluster-wide crashes and maintain availability.
|
||||
|
||||
## What Was Done
|
||||
|
||||
### 1. Infrastructure Implementation ✓
|
||||
|
||||
Created new `scylla_assert()` macro in `utils/assert.hh`:
|
||||
- Based on `on_internal_error()` for exception-based error handling
|
||||
- Supports optional custom error messages via variadic arguments
|
||||
- Uses `seastar::format()` for string formatting
|
||||
- Compatible with C++23 standard (uses `__VA_OPT__`)
|
||||
|
||||
**Key difference from SCYLLA_ASSERT:**
|
||||
```cpp
|
||||
// Old: Crashes the process immediately
|
||||
SCYLLA_ASSERT(condition);
|
||||
|
||||
// New: Throws exception (or aborts based on config)
|
||||
scylla_assert(condition);
|
||||
scylla_assert(condition, "custom error message: {}", value);
|
||||
```
|
||||
|
||||
### 2. Comprehensive Analysis ✓
|
||||
|
||||
Analyzed entire codebase to identify safe vs unsafe conversion locations:
|
||||
|
||||
**Statistics:**
|
||||
- Total SCYLLA_ASSERT usages: ~1307 (including tests)
|
||||
- Non-test usages: ~886
|
||||
- **Unsafe to convert**: 223 usages (25%)
|
||||
- In noexcept functions: 187 usages across 50 files
|
||||
- In destructors: 36 usages across 25 files
|
||||
- **Safe to convert**: ~668 usages (75%)
|
||||
- **Converted in this PR**: 112 usages (16.8% of safe conversions)
|
||||
|
||||
### 3. Documentation ✓
|
||||
|
||||
Created comprehensive documentation:
|
||||
|
||||
1. **Conversion Guide** (`docs/dev/scylla_assert_conversion.md`)
|
||||
- Explains safe vs unsafe contexts
|
||||
- Provides conversion strategy
|
||||
- Lists all completed conversions
|
||||
- Includes testing guidance
|
||||
|
||||
2. **Unsafe Locations Report** (`docs/dev/unsafe_scylla_assert_locations.md`)
|
||||
- Detailed listing of 223 unsafe locations
|
||||
- Organized by file with line numbers
|
||||
- Separated into noexcept and destructor categories
|
||||
|
||||
### 4. Sample Conversions ✓
|
||||
|
||||
Converted 112 safe SCYLLA_ASSERT usages across 32 files as demonstration:
|
||||
|
||||
| File | Conversions | Context |
|
||||
|------|------------|---------|
|
||||
| db/large_data_handler.{cc,hh} | 5 | Future-returning functions |
|
||||
| db/schema_applier.cc | 1 | Coroutine function |
|
||||
| db/system_distributed_keyspace.cc | 1 | Regular function |
|
||||
| db/commitlog/commitlog_replayer.cc | 1 | Coroutine function |
|
||||
| db/view/row_locking.cc | 2 | Regular function |
|
||||
| db/size_estimates_virtual_reader.cc | 1 | Lambda in coroutine |
|
||||
| db/corrupt_data_handler.cc | 2 | Lambdas in future-returning function |
|
||||
| raft/tracker.cc | 2 | Unreachable code (switch defaults) |
|
||||
| service/topology_coordinator.cc | 11 | Coroutine functions (topology operations) |
|
||||
| service/storage_service.cc | 28 | Critical node lifecycle operations |
|
||||
| sstables/* (22 files) | 58 | SSTable operations (read/write/compress/index) |
|
||||
|
||||
All conversions were in **safe contexts** (non-noexcept, non-destructor functions). 3 assertions in storage_service.cc remain as SCYLLA_ASSERT (in noexcept functions).
|
||||
|
||||
## Why These Cannot Be Converted
|
||||
|
||||
### Unsafe Context #1: noexcept Functions (187 usages)
|
||||
|
||||
**Problem**: Throwing from noexcept causes `std::terminate()`, same as crash.
|
||||
|
||||
**Example** (from `locator/production_snitch_base.hh`):
|
||||
```cpp
|
||||
virtual bool prefer_local() const noexcept override {
|
||||
SCYLLA_ASSERT(_backreference != nullptr); // Cannot convert!
|
||||
return _backreference->prefer_local();
|
||||
}
|
||||
```
|
||||
|
||||
**Solution for these**: Keep as SCYLLA_ASSERT or use `on_fatal_internal_error()`.
|
||||
|
||||
### Unsafe Context #2: Destructors (36 usages)
|
||||
|
||||
**Problem**: Destructors are implicitly noexcept, throwing causes `std::terminate()`.
|
||||
|
||||
**Example** (from `utils/file_lock.cc`):
|
||||
```cpp
|
||||
~file_lock() noexcept {
|
||||
if (_fd.get() != -1) {
|
||||
SCYLLA_ASSERT(_fd.get() != -1); // Cannot convert!
|
||||
auto r = ::flock(_fd.get(), LOCK_UN);
|
||||
SCYLLA_ASSERT(r == 0); // Cannot convert!
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Solution for these**: Keep as SCYLLA_ASSERT.
|
||||
|
||||
## Benefits of scylla_assert()
|
||||
|
||||
1. **Prevents Cluster-Wide Crashes**
|
||||
- Exception can be caught and handled gracefully
|
||||
- Failed node doesn't bring down entire cluster
|
||||
|
||||
2. **Maintains Availability**
|
||||
- Service can continue with degraded functionality
|
||||
- Better than complete crash
|
||||
|
||||
3. **Better Error Reporting**
|
||||
- Includes backtrace via `on_internal_error()`
|
||||
- Supports custom error messages
|
||||
- Configurable abort-on-error for testing
|
||||
|
||||
4. **Backward Compatible**
|
||||
- SCYLLA_ASSERT still exists for unsafe contexts
|
||||
- Can be gradually adopted
|
||||
|
||||
## Testing
|
||||
|
||||
- Created manual test in `test/manual/test_scylla_assert.cc`
|
||||
- Verifies passing and failing assertions
|
||||
- Tests custom error messages
|
||||
- Code review passed with improvements made
|
||||
|
||||
## Next Steps (Future Work)
|
||||
|
||||
1. **Gradual Conversion**
|
||||
- Convert remaining ~653 safe SCYLLA_ASSERT usages incrementally
|
||||
- Prioritize high-impact code paths first
|
||||
|
||||
2. **Review noexcept Functions**
|
||||
- Evaluate if some can be made non-noexcept
|
||||
- Consider using `on_fatal_internal_error()` where appropriate
|
||||
|
||||
3. **Integration Testing**
|
||||
- Run full test suite with conversions
|
||||
- Monitor for any unexpected behavior
|
||||
- Validate exception propagation
|
||||
|
||||
4. **Automated Analysis Tool**
|
||||
- Create tool to identify safe conversion candidates
|
||||
- Generate conversion patches automatically
|
||||
- Track conversion progress
|
||||
|
||||
## Files Modified in This PR
|
||||
|
||||
### Core Implementation
|
||||
- `utils/assert.hh` - Added scylla_assert() macro
|
||||
|
||||
### Conversions
|
||||
- `db/large_data_handler.cc`
|
||||
- `db/large_data_handler.hh`
|
||||
- `db/schema_applier.cc`
|
||||
- `db/system_distributed_keyspace.cc`
|
||||
- `db/commitlog/commitlog_replayer.cc`
|
||||
- `db/view/row_locking.cc`
|
||||
- `db/size_estimates_virtual_reader.cc`
|
||||
- `db/corrupt_data_handler.cc`
|
||||
- `raft/tracker.cc`
|
||||
- `service/topology_coordinator.cc`
|
||||
- `service/storage_service.cc`
|
||||
- `sstables/` (22 files across trie/, mx/, and core sstables)
|
||||
|
||||
### Documentation
|
||||
- `docs/dev/scylla_assert_conversion.md`
|
||||
- `docs/dev/unsafe_scylla_assert_locations.md`
|
||||
- `test/manual/test_scylla_assert.cc`
|
||||
|
||||
## Conclusion
|
||||
|
||||
This PR establishes the infrastructure and methodology for replacing SCYLLA_ASSERT with scylla_assert() to improve cluster availability. The sample conversions demonstrate the approach, while comprehensive documentation enables future work.
|
||||
|
||||
**Key Achievement**: Provided a safe path forward for converting 75% (~668) of SCYLLA_ASSERT usages to exception-based assertions, while clearly documenting the 25% (~223) that must remain as crash-inducing assertions due to language constraints. Converted 112 usages as demonstration (16.8% of safe conversions), prioritizing critical files like storage_service.cc (node lifecycle) and all sstables files (data persistence), with ~556 remaining.
|
||||
@@ -9,6 +9,7 @@ target_sources(scylla_auth
|
||||
allow_all_authorizer.cc
|
||||
authenticated_user.cc
|
||||
authenticator.cc
|
||||
cache.cc
|
||||
certificate_authenticator.cc
|
||||
common.cc
|
||||
default_authorizer.cc
|
||||
|
||||
@@ -23,6 +23,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "auth/authenticated_user.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
@@ -29,7 +30,7 @@ extern const std::string_view allow_all_authenticator_name;
|
||||
|
||||
class allow_all_authenticator final : public authenticator {
|
||||
public:
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&) {
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
|
||||
}
|
||||
|
||||
virtual future<> start() override {
|
||||
|
||||
180
auth/cache.cc
Normal file
180
auth/cache.cc
Normal file
@@ -0,0 +1,180 @@
|
||||
/*
|
||||
* Copyright (C) 2017-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "auth/roles-metadata.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include <iterator>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/core/format.hh>
|
||||
|
||||
namespace auth {
|
||||
|
||||
logging::logger logger("auth-cache");
|
||||
|
||||
cache::cache(cql3::query_processor& qp) noexcept
|
||||
: _current_version(0)
|
||||
, _qp(qp) {
|
||||
}
|
||||
|
||||
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
|
||||
auto it = _roles.find(role);
|
||||
if (it == _roles.end()) {
|
||||
return {};
|
||||
}
|
||||
return it->second;
|
||||
}
|
||||
|
||||
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
|
||||
auto rec = make_lw_shared<role_record>();
|
||||
rec->version = _current_version;
|
||||
|
||||
auto fetch = [this, &role](const sstring& q) {
|
||||
return _qp.execute_internal(q, db::consistency_level::LOCAL_ONE,
|
||||
internal_distributed_query_state(), {role},
|
||||
cql3::query_processor::cache_internal::yes);
|
||||
};
|
||||
// roles
|
||||
{
|
||||
static const sstring q = format("SELECT * FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, meta::roles_table::name);
|
||||
auto rs = co_await fetch(q);
|
||||
if (!rs->empty()) {
|
||||
auto& r = rs->one();
|
||||
rec->is_superuser = r.get_or<bool>("is_superuser", false);
|
||||
rec->can_login = r.get_or<bool>("can_login", false);
|
||||
rec->salted_hash = r.get_or<sstring>("salted_hash", "");
|
||||
if (r.has("member_of")) {
|
||||
auto mo = r.get_set<sstring>("member_of");
|
||||
rec->member_of.insert(
|
||||
std::make_move_iterator(mo.begin()),
|
||||
std::make_move_iterator(mo.end()));
|
||||
}
|
||||
} else {
|
||||
// role got deleted
|
||||
co_return nullptr;
|
||||
}
|
||||
}
|
||||
// members
|
||||
{
|
||||
static const sstring q = format("SELECT role, member FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_MEMBERS_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
rec->members.insert(r.get_as<sstring>("member"));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
// attributes
|
||||
{
|
||||
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
rec->attributes[r.get_as<sstring>("name")] =
|
||||
r.get_as<sstring>("value");
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
// permissions
|
||||
{
|
||||
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
|
||||
auto rs = co_await fetch(q);
|
||||
for (const auto& r : *rs) {
|
||||
auto resource = r.get_as<sstring>("resource");
|
||||
auto perms_strings = r.get_set<sstring>("permissions");
|
||||
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
|
||||
auto pset = permissions::from_strings(perms_set);
|
||||
rec->permissions[std::move(resource)] = std::move(pset);
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return rec;
|
||||
}
|
||||
|
||||
future<> cache::prune_all() noexcept {
|
||||
for (auto it = _roles.begin(); it != _roles.end(); ) {
|
||||
if (it->second->version != _current_version) {
|
||||
_roles.erase(it++);
|
||||
co_await coroutine::maybe_yield();
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> cache::load_all() {
|
||||
if (legacy_mode(_qp)) {
|
||||
co_return;
|
||||
}
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
++_current_version;
|
||||
|
||||
logger.info("Loading all roles");
|
||||
const uint32_t page_size = 128;
|
||||
auto loader = [this](const cql3::untyped_result_set::row& r) -> future<stop_iteration> {
|
||||
const auto name = r.get_as<sstring>("role");
|
||||
auto role = co_await fetch_role(name);
|
||||
if (role) {
|
||||
_roles[name] = role;
|
||||
}
|
||||
co_return stop_iteration::no;
|
||||
};
|
||||
co_await _qp.query_internal(format("SELECT * FROM {}.{}",
|
||||
db::system_keyspace::NAME, meta::roles_table::name),
|
||||
db::consistency_level::LOCAL_ONE, {}, page_size, loader);
|
||||
|
||||
co_await prune_all();
|
||||
for (const auto& [name, role] : _roles) {
|
||||
co_await distribute_role(name, role);
|
||||
}
|
||||
co_await container().invoke_on_others([this](cache& c) -> future<> {
|
||||
c._current_version = _current_version;
|
||||
co_await c.prune_all();
|
||||
});
|
||||
}
|
||||
|
||||
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
|
||||
if (legacy_mode(_qp)) {
|
||||
co_return;
|
||||
}
|
||||
for (const auto& name : roles) {
|
||||
logger.info("Loading role {}", name);
|
||||
auto role = co_await fetch_role(name);
|
||||
if (role) {
|
||||
_roles[name] = role;
|
||||
} else {
|
||||
_roles.erase(name);
|
||||
}
|
||||
co_await distribute_role(name, role);
|
||||
}
|
||||
}
|
||||
|
||||
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
|
||||
auto role_ptr = role.get();
|
||||
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
|
||||
if (!role_ptr) {
|
||||
c._roles.erase(name);
|
||||
return;
|
||||
}
|
||||
auto role_copy = make_lw_shared<role_record>(*role_ptr);
|
||||
c._roles[name] = std::move(role_copy);
|
||||
});
|
||||
}
|
||||
|
||||
bool cache::includes_table(const table_id& id) noexcept {
|
||||
return id == db::system_keyspace::roles()->id()
|
||||
|| id == db::system_keyspace::role_members()->id()
|
||||
|| id == db::system_keyspace::role_attributes()->id()
|
||||
|| id == db::system_keyspace::role_permissions()->id();
|
||||
}
|
||||
|
||||
} // namespace auth
|
||||
61
auth/cache.hh
Normal file
61
auth/cache.hh
Normal file
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
#include "auth/permission.hh"
|
||||
#include "auth/common.hh"
|
||||
|
||||
namespace cql3 { class query_processor; }
|
||||
|
||||
namespace auth {
|
||||
|
||||
class cache : public peering_sharded_service<cache> {
|
||||
public:
|
||||
using role_name_t = sstring;
|
||||
using version_tag_t = char;
|
||||
|
||||
struct role_record {
|
||||
bool can_login = false;
|
||||
bool is_superuser = false;
|
||||
std::unordered_set<role_name_t> member_of;
|
||||
std::unordered_set<role_name_t> members;
|
||||
sstring salted_hash;
|
||||
std::unordered_map<sstring, sstring> attributes;
|
||||
std::unordered_map<sstring, permission_set> permissions;
|
||||
version_tag_t version; // used for seamless cache reloads
|
||||
};
|
||||
|
||||
explicit cache(cql3::query_processor& qp) noexcept;
|
||||
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
|
||||
future<> load_all();
|
||||
future<> load_roles(std::unordered_set<role_name_t> roles);
|
||||
static bool includes_table(const table_id&) noexcept;
|
||||
|
||||
private:
|
||||
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
|
||||
roles_map _roles;
|
||||
version_tag_t _current_version;
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
|
||||
future<> prune_all() noexcept;
|
||||
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
|
||||
};
|
||||
|
||||
} // namespace auth
|
||||
@@ -48,6 +48,10 @@ extern constinit const std::string_view AUTH_PACKAGE_NAME;
|
||||
|
||||
} // namespace meta
|
||||
|
||||
constexpr std::string_view PERMISSIONS_CF = "role_permissions";
|
||||
constexpr std::string_view ROLE_MEMBERS_CF = "role_members";
|
||||
constexpr std::string_view ROLE_ATTRIBUTES_CF = "role_attributes";
|
||||
|
||||
// This is a helper to check whether auth-v2 is on.
|
||||
bool legacy_mode(cql3::query_processor& qp);
|
||||
|
||||
|
||||
@@ -37,7 +37,6 @@ std::string_view default_authorizer::qualified_java_name() const {
|
||||
static constexpr std::string_view ROLE_NAME = "role";
|
||||
static constexpr std::string_view RESOURCE_NAME = "resource";
|
||||
static constexpr std::string_view PERMISSIONS_NAME = "permissions";
|
||||
static constexpr std::string_view PERMISSIONS_CF = "role_permissions";
|
||||
|
||||
static logging::logger alogger("default_authorizer");
|
||||
|
||||
|
||||
@@ -83,17 +83,18 @@ static const class_registrator<
|
||||
ldap_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> registration(ldap_role_manager_full_name);
|
||||
::service::migration_manager&,
|
||||
cache&> registration(ldap_role_manager_full_name);
|
||||
|
||||
ldap_role_manager::ldap_role_manager(
|
||||
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
|
||||
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
|
||||
: _std_mgr(qp, rg0c, mm), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
|
||||
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
|
||||
: _std_mgr(qp, rg0c, mm, cache), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
|
||||
, _bind_password(bind_password)
|
||||
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this))) {
|
||||
}
|
||||
|
||||
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
|
||||
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
|
||||
: ldap_role_manager(
|
||||
qp.db().get_config().ldap_url_template(),
|
||||
qp.db().get_config().ldap_attr_role(),
|
||||
@@ -101,7 +102,8 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
|
||||
qp.db().get_config().ldap_bind_passwd(),
|
||||
qp,
|
||||
rg0c,
|
||||
mm) {
|
||||
mm,
|
||||
cache) {
|
||||
}
|
||||
|
||||
std::string_view ldap_role_manager::qualified_java_name() const noexcept {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
#include "ent/ldap/ldap_connection.hh"
|
||||
#include "standard_role_manager.hh"
|
||||
#include "auth/cache.hh"
|
||||
|
||||
namespace auth {
|
||||
|
||||
@@ -43,12 +44,13 @@ class ldap_role_manager : public role_manager {
|
||||
std::string_view bind_password, ///< LDAP bind credentials.
|
||||
cql3::query_processor& qp, ///< Passed to standard_role_manager.
|
||||
::service::raft_group0_client& rg0c, ///< Passed to standard_role_manager.
|
||||
::service::migration_manager& mm ///< Passed to standard_role_manager.
|
||||
::service::migration_manager& mm, ///< Passed to standard_role_manager.
|
||||
cache& cache ///< Passed to standard_role_manager.
|
||||
);
|
||||
|
||||
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
|
||||
/// class_registrator<role_manager>.
|
||||
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm);
|
||||
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
|
||||
|
||||
/// Thrown when query-template parsing fails.
|
||||
struct url_error : public std::runtime_error {
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include <stdexcept>
|
||||
#include <string_view>
|
||||
#include "auth/cache.hh"
|
||||
#include "cql3/description.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
@@ -23,7 +24,8 @@ static const class_registrator<
|
||||
maintenance_socket_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> registration(sstring{maintenance_socket_role_manager_name});
|
||||
::service::migration_manager&,
|
||||
cache&> registration(sstring{maintenance_socket_role_manager_name});
|
||||
|
||||
|
||||
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/resource.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
@@ -29,7 +30,7 @@ extern const std::string_view maintenance_socket_role_manager_name;
|
||||
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
|
||||
class maintenance_socket_role_manager final : public role_manager {
|
||||
public:
|
||||
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {}
|
||||
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
|
||||
|
||||
virtual std::string_view qualified_java_name() const noexcept override;
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
|
||||
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
|
||||
@@ -63,10 +64,11 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
|
||||
password_authenticator::~password_authenticator() {
|
||||
}
|
||||
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(default_superuser(qp.db().get_config()))
|
||||
, _hashing_worker(hashing_worker)
|
||||
@@ -315,11 +317,20 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
const sstring password = credentials.at(PASSWORD_KEY);
|
||||
|
||||
try {
|
||||
const std::optional<sstring> salted_hash = co_await get_password_hash(username);
|
||||
if (!salted_hash) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
std::optional<sstring> salted_hash;
|
||||
if (legacy_mode(_qp)) {
|
||||
salted_hash = co_await get_password_hash(username);
|
||||
if (!salted_hash) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
} else {
|
||||
auto role = _cache.get(username);
|
||||
if (!role || role->salted_hash.empty()) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
salted_hash = role->salted_hash;
|
||||
}
|
||||
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash = std::move(salted_hash)]{
|
||||
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
|
||||
return passwords::check(password, *salted_hash);
|
||||
});
|
||||
if (!password_match) {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/passwords.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
@@ -41,6 +42,7 @@ class password_authenticator : public authenticator {
|
||||
cql3::query_processor& _qp;
|
||||
::service::raft_group0_client& _group0_client;
|
||||
::service::migration_manager& _migration_manager;
|
||||
cache& _cache;
|
||||
future<> _stopped;
|
||||
abort_source _as;
|
||||
std::string _superuser; // default superuser name from the config (may or may not be present in roles table)
|
||||
@@ -53,7 +55,7 @@ public:
|
||||
static db::consistency_level consistency_for_user(std::string_view role_name);
|
||||
static std::string default_superuser(const db::config&);
|
||||
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
|
||||
~password_authenticator();
|
||||
|
||||
|
||||
@@ -35,9 +35,10 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
|
||||
: _socket_path(qp.db().get_config().saslauthd_socket_path())
|
||||
{}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
@@ -29,7 +30,7 @@ namespace auth {
|
||||
class saslauthd_authenticator : public authenticator {
|
||||
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
|
||||
public:
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
|
||||
|
||||
future<> start() override;
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#include <chrono>
|
||||
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
@@ -157,6 +158,7 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
|
||||
|
||||
service::service(
|
||||
utils::loading_cache_config c,
|
||||
cache& cache,
|
||||
cql3::query_processor& qp,
|
||||
::service::raft_group0_client& g0,
|
||||
::service::migration_notifier& mn,
|
||||
@@ -166,6 +168,7 @@ service::service(
|
||||
maintenance_socket_enabled used_by_maintenance_socket)
|
||||
: _loading_cache_config(std::move(c))
|
||||
, _permissions_cache(nullptr)
|
||||
, _cache(cache)
|
||||
, _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _mnotifier(mn)
|
||||
@@ -188,15 +191,17 @@ service::service(
|
||||
::service::migration_manager& mm,
|
||||
const service_config& sc,
|
||||
maintenance_socket_enabled used_by_maintenance_socket,
|
||||
cache& cache,
|
||||
utils::alien_worker& hashing_worker)
|
||||
: service(
|
||||
std::move(c),
|
||||
cache,
|
||||
qp,
|
||||
g0,
|
||||
mn,
|
||||
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, hashing_worker),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
|
||||
used_by_maintenance_socket) {
|
||||
}
|
||||
|
||||
@@ -232,6 +237,9 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
|
||||
auto auth_version = co_await sys_ks.get_auth_version();
|
||||
// version is set in query processor to be easily available in various places we call auth::legacy_mode check.
|
||||
_qp.auth_version = auth_version;
|
||||
if (this_shard_id() == 0) {
|
||||
co_await _cache.load_all();
|
||||
}
|
||||
if (!_used_by_maintenance_socket) {
|
||||
// this legacy keyspace is only used by cqlsh
|
||||
// it's needed when executing `list roles` or `list users`
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "auth/permissions_cache.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "cql3/description.hh"
|
||||
@@ -77,6 +78,7 @@ public:
|
||||
class service final : public seastar::peering_sharded_service<service> {
|
||||
utils::loading_cache_config _loading_cache_config;
|
||||
std::unique_ptr<permissions_cache> _permissions_cache;
|
||||
cache& _cache;
|
||||
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
@@ -107,6 +109,7 @@ class service final : public seastar::peering_sharded_service<service> {
|
||||
public:
|
||||
service(
|
||||
utils::loading_cache_config,
|
||||
cache& cache,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_notifier&,
|
||||
@@ -128,6 +131,7 @@ public:
|
||||
::service::migration_manager&,
|
||||
const service_config&,
|
||||
maintenance_socket_enabled,
|
||||
cache&,
|
||||
utils::alien_worker&);
|
||||
|
||||
future<> start(::service::migration_manager&, db::system_keyspace&);
|
||||
|
||||
@@ -41,21 +41,6 @@
|
||||
|
||||
namespace auth {
|
||||
|
||||
namespace meta {
|
||||
|
||||
namespace role_members_table {
|
||||
|
||||
constexpr std::string_view name{"role_members" , 12};
|
||||
|
||||
}
|
||||
|
||||
namespace role_attributes_table {
|
||||
|
||||
constexpr std::string_view name{"role_attributes", 15};
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static logging::logger log("standard_role_manager");
|
||||
|
||||
@@ -64,7 +49,8 @@ static const class_registrator<
|
||||
standard_role_manager,
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&> registration("org.apache.cassandra.auth.CassandraRoleManager");
|
||||
::service::migration_manager&,
|
||||
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
|
||||
|
||||
struct record final {
|
||||
sstring name;
|
||||
@@ -121,10 +107,11 @@ static bool has_can_login(const cql3::untyped_result_set_row& row) {
|
||||
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob_unfragmented("can_login")).is_null());
|
||||
}
|
||||
|
||||
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
|
||||
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
|
||||
{}
|
||||
@@ -136,7 +123,7 @@ std::string_view standard_role_manager::qualified_java_name() const noexcept {
|
||||
const resource_set& standard_role_manager::protected_resources() const {
|
||||
static const resource_set resources({
|
||||
make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name),
|
||||
make_data_resource(meta::legacy::AUTH_KS, meta::role_members_table::name)});
|
||||
make_data_resource(meta::legacy::AUTH_KS, ROLE_MEMBERS_CF)});
|
||||
|
||||
return resources;
|
||||
}
|
||||
@@ -160,7 +147,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
|
||||
" PRIMARY KEY (role, member)"
|
||||
")",
|
||||
meta::legacy::AUTH_KS,
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
static const sstring create_role_attributes_query = seastar::format(
|
||||
"CREATE TABLE {}.{} ("
|
||||
" role text,"
|
||||
@@ -169,7 +156,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
|
||||
" PRIMARY KEY(role, name)"
|
||||
")",
|
||||
meta::legacy::AUTH_KS,
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
return when_all_succeed(
|
||||
create_legacy_metadata_table_if_missing(
|
||||
meta::roles_table::name,
|
||||
@@ -177,12 +164,12 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
|
||||
create_roles_query,
|
||||
_migration_manager),
|
||||
create_legacy_metadata_table_if_missing(
|
||||
meta::role_members_table::name,
|
||||
ROLE_MEMBERS_CF,
|
||||
_qp,
|
||||
create_role_members_query,
|
||||
_migration_manager),
|
||||
create_legacy_metadata_table_if_missing(
|
||||
meta::role_attributes_table::name,
|
||||
ROLE_ATTRIBUTES_CF,
|
||||
_qp,
|
||||
create_role_attributes_query,
|
||||
_migration_manager)).discard_result();
|
||||
@@ -429,7 +416,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
|
||||
const auto revoke_from_members = [this, role_name, &mc] () -> future<> {
|
||||
const sstring query = seastar::format("SELECT member FROM {}.{} WHERE role = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
const auto members = co_await _qp.execute_internal(
|
||||
query,
|
||||
consistency_for_role(role_name),
|
||||
@@ -461,7 +448,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
|
||||
const auto remove_attributes_of = [this, role_name, &mc] () -> future<> {
|
||||
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await _qp.execute_internal(query, {sstring(role_name)},
|
||||
cql3::query_processor::cache_internal::yes).discard_result();
|
||||
@@ -517,7 +504,7 @@ standard_role_manager::legacy_modify_membership(
|
||||
case membership_change::add: {
|
||||
const sstring insert_query = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
co_return co_await _qp.execute_internal(
|
||||
insert_query,
|
||||
consistency_for_role(role_name),
|
||||
@@ -529,7 +516,7 @@ standard_role_manager::legacy_modify_membership(
|
||||
case membership_change::remove: {
|
||||
const sstring delete_query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
co_return co_await _qp.execute_internal(
|
||||
delete_query,
|
||||
consistency_for_role(role_name),
|
||||
@@ -567,12 +554,12 @@ standard_role_manager::modify_membership(
|
||||
case membership_change::add:
|
||||
modify_role_members = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
break;
|
||||
case membership_change::remove:
|
||||
modify_role_members = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
break;
|
||||
default:
|
||||
on_internal_error(log, format("unknown membership_change value: {}", int(ch)));
|
||||
@@ -666,7 +653,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
|
||||
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT * FROM {}.{}",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_members_table::name);
|
||||
ROLE_MEMBERS_CF);
|
||||
|
||||
const auto results = co_await _qp.execute_internal(
|
||||
query,
|
||||
@@ -731,15 +718,21 @@ future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
|
||||
}
|
||||
|
||||
future<bool> standard_role_manager::can_login(std::string_view role_name) {
|
||||
return require_record(_qp, role_name).then([](record r) {
|
||||
return r.can_login;
|
||||
});
|
||||
if (legacy_mode(_qp)) {
|
||||
const auto r = co_await require_record(_qp, role_name);
|
||||
co_return r.can_login;
|
||||
}
|
||||
auto role = _cache.get(sstring(role_name));
|
||||
if (!role) {
|
||||
throw nonexistant_role(role_name);
|
||||
}
|
||||
co_return role->can_login;
|
||||
}
|
||||
|
||||
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
|
||||
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
|
||||
if (!result_set->empty()) {
|
||||
const cql3::untyped_result_set_row &row = result_set->one();
|
||||
@@ -770,7 +763,7 @@ future<> standard_role_manager::set_attribute(std::string_view role_name, std::s
|
||||
}
|
||||
const sstring query = seastar::format("INSERT INTO {}.{} (role, name, value) VALUES (?, ?, ?)",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
|
||||
} else {
|
||||
@@ -785,7 +778,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
|
||||
}
|
||||
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND name = ?",
|
||||
get_auth_ks_name(_qp),
|
||||
meta::role_attributes_table::name);
|
||||
ROLE_ATTRIBUTES_CF);
|
||||
if (legacy_mode(_qp)) {
|
||||
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
|
||||
} else {
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "auth/common.hh"
|
||||
#include "auth/role_manager.hh"
|
||||
#include "auth/cache.hh"
|
||||
|
||||
#include <string_view>
|
||||
|
||||
@@ -36,13 +37,14 @@ class standard_role_manager final : public role_manager {
|
||||
cql3::query_processor& _qp;
|
||||
::service::raft_group0_client& _group0_client;
|
||||
::service::migration_manager& _migration_manager;
|
||||
cache& _cache;
|
||||
future<> _stopped;
|
||||
abort_source _as;
|
||||
std::string _superuser;
|
||||
shared_promise<> _superuser_created_promise;
|
||||
|
||||
public:
|
||||
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
|
||||
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
|
||||
virtual std::string_view qualified_java_name() const noexcept override;
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include "auth/authorizer.hh"
|
||||
#include "auth/default_authorizer.hh"
|
||||
#include "auth/password_authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
@@ -37,8 +38,8 @@ class transitional_authenticator : public authenticator {
|
||||
public:
|
||||
static const sstring PASSWORD_AUTHENTICATOR_NAME;
|
||||
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, hashing_worker)) {
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
|
||||
}
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a)) {
|
||||
@@ -240,6 +241,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
auth::cache&,
|
||||
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
|
||||
static const class_registrator<
|
||||
|
||||
@@ -445,6 +445,7 @@ ldap_tests = set([
|
||||
scylla_tests = set([
|
||||
'test/boost/combined_tests',
|
||||
'test/boost/UUID_test',
|
||||
'test/boost/url_parse_test',
|
||||
'test/boost/advanced_rpc_compressor_test',
|
||||
'test/boost/allocation_strategy_test',
|
||||
'test/boost/alternator_unit_test',
|
||||
@@ -1195,6 +1196,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'auth/allow_all_authorizer.cc',
|
||||
'auth/authenticated_user.cc',
|
||||
'auth/authenticator.cc',
|
||||
'auth/cache.cc',
|
||||
'auth/common.cc',
|
||||
'auth/default_authorizer.cc',
|
||||
'auth/resource.cc',
|
||||
@@ -1646,6 +1648,7 @@ deps['test/boost/bytes_ostream_test'] = [
|
||||
]
|
||||
deps['test/boost/input_stream_test'] = ['test/boost/input_stream_test.cc']
|
||||
deps['test/boost/UUID_test'] = ['clocks-impl.cc', 'utils/UUID_gen.cc', 'test/boost/UUID_test.cc', 'utils/uuid.cc', 'utils/dynamic_bitset.cc', 'utils/hashers.cc', 'utils/on_internal_error.cc']
|
||||
deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test.cc', ]
|
||||
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
|
||||
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
|
||||
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']
|
||||
|
||||
12
cql3/Cql.g
12
cql3/Cql.g
@@ -575,6 +575,15 @@ usingTimeoutServiceLevelClauseObjective[std::unique_ptr<cql3::attributes::raw>&
|
||||
| serviceLevel sl_name=serviceLevelOrRoleName { attrs->service_level = std::move(sl_name); }
|
||||
;
|
||||
|
||||
usingTimeoutConcurrencyClause[std::unique_ptr<cql3::attributes::raw>& attrs]
|
||||
: K_USING usingTimeoutConcurrencyClauseObjective[attrs] ( K_AND usingTimeoutConcurrencyClauseObjective[attrs] )*
|
||||
;
|
||||
|
||||
usingTimeoutConcurrencyClauseObjective[std::unique_ptr<cql3::attributes::raw>& attrs]
|
||||
: K_TIMEOUT to=term { attrs->timeout = std::move(to); }
|
||||
| K_CONCURRENCY c=term { attrs->concurrency = std::move(c); }
|
||||
;
|
||||
|
||||
/**
|
||||
* UPDATE <CF>
|
||||
* USING TIMESTAMP <long>
|
||||
@@ -666,7 +675,7 @@ pruneMaterializedViewStatement returns [std::unique_ptr<raw::select_statement> e
|
||||
auto attrs = std::make_unique<cql3::attributes::raw>();
|
||||
expression wclause = conjunction{};
|
||||
}
|
||||
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingClause[attrs] )?
|
||||
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingTimeoutConcurrencyClause[attrs] )?
|
||||
{
|
||||
auto params = make_lw_shared<raw::select_statement::parameters>(std::move(orderings), is_distinct, allow_filtering, statement_subtype, bypass_cache);
|
||||
return std::make_unique<raw::select_statement>(std::move(cf), std::move(params),
|
||||
@@ -2370,6 +2379,7 @@ K_LIKE: L I K E;
|
||||
|
||||
K_TIMEOUT: T I M E O U T;
|
||||
K_PRUNE: P R U N E;
|
||||
K_CONCURRENCY: C O N C U R R E N C Y;
|
||||
|
||||
K_EXECUTE: E X E C U T E;
|
||||
|
||||
|
||||
@@ -20,19 +20,21 @@
|
||||
namespace cql3 {
|
||||
|
||||
std::unique_ptr<attributes> attributes::none() {
|
||||
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}}};
|
||||
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}, {}}};
|
||||
}
|
||||
|
||||
attributes::attributes(std::optional<cql3::expr::expression>&& timestamp,
|
||||
std::optional<cql3::expr::expression>&& time_to_live,
|
||||
std::optional<cql3::expr::expression>&& timeout,
|
||||
std::optional<sstring> service_level)
|
||||
std::optional<sstring> service_level,
|
||||
std::optional<cql3::expr::expression>&& concurrency)
|
||||
: _timestamp_unset_guard(timestamp)
|
||||
, _timestamp{std::move(timestamp)}
|
||||
, _time_to_live_unset_guard(time_to_live)
|
||||
, _time_to_live{std::move(time_to_live)}
|
||||
, _timeout{std::move(timeout)}
|
||||
, _service_level(std::move(service_level))
|
||||
, _concurrency{std::move(concurrency)}
|
||||
{ }
|
||||
|
||||
bool attributes::is_timestamp_set() const {
|
||||
@@ -51,6 +53,10 @@ bool attributes::is_service_level_set() const {
|
||||
return bool(_service_level);
|
||||
}
|
||||
|
||||
bool attributes::is_concurrency_set() const {
|
||||
return bool(_concurrency);
|
||||
}
|
||||
|
||||
int64_t attributes::get_timestamp(int64_t now, const query_options& options) {
|
||||
if (!_timestamp.has_value() || _timestamp_unset_guard.is_unset(options)) {
|
||||
return now;
|
||||
@@ -123,6 +129,27 @@ qos::service_level_options attributes::get_service_level(qos::service_level_cont
|
||||
return sl_controller.get_service_level(sl_name).slo;
|
||||
}
|
||||
|
||||
std::optional<int32_t> attributes::get_concurrency(const query_options& options) const {
|
||||
if (!_concurrency.has_value()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
cql3::raw_value concurrency_raw = expr::evaluate(*_concurrency, options);
|
||||
if (concurrency_raw.is_null()) {
|
||||
throw exceptions::invalid_request_exception("Invalid null value of concurrency");
|
||||
}
|
||||
int32_t concurrency;
|
||||
try {
|
||||
concurrency = concurrency_raw.view().validate_and_deserialize<int32_t>(*int32_type);
|
||||
} catch (marshal_exception& e) {
|
||||
throw exceptions::invalid_request_exception("Invalid concurrency value");
|
||||
}
|
||||
if (concurrency <= 0) {
|
||||
throw exceptions::invalid_request_exception("Concurrency must be a positive integer");
|
||||
}
|
||||
return concurrency;
|
||||
}
|
||||
|
||||
void attributes::fill_prepare_context(prepare_context& ctx) {
|
||||
if (_timestamp.has_value()) {
|
||||
expr::fill_prepare_context(*_timestamp, ctx);
|
||||
@@ -133,10 +160,13 @@ void attributes::fill_prepare_context(prepare_context& ctx) {
|
||||
if (_timeout.has_value()) {
|
||||
expr::fill_prepare_context(*_timeout, ctx);
|
||||
}
|
||||
if (_concurrency.has_value()) {
|
||||
expr::fill_prepare_context(*_concurrency, ctx);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const {
|
||||
std::optional<expr::expression> ts, ttl, to;
|
||||
std::optional<expr::expression> ts, ttl, to, conc;
|
||||
|
||||
if (timestamp.has_value()) {
|
||||
ts = prepare_expression(*timestamp, db, ks_name, nullptr, timestamp_receiver(ks_name, cf_name));
|
||||
@@ -153,7 +183,12 @@ std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database d
|
||||
verify_no_aggregate_functions(*timeout, "USING clause");
|
||||
}
|
||||
|
||||
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level)}};
|
||||
if (concurrency.has_value()) {
|
||||
conc = prepare_expression(*concurrency, db, ks_name, nullptr, concurrency_receiver(ks_name, cf_name));
|
||||
verify_no_aggregate_functions(*concurrency, "USING clause");
|
||||
}
|
||||
|
||||
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level), std::move(conc)}};
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_specification> attributes::raw::timestamp_receiver(const sstring& ks_name, const sstring& cf_name) const {
|
||||
@@ -168,4 +203,8 @@ lw_shared_ptr<column_specification> attributes::raw::timeout_receiver(const sstr
|
||||
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[timeout]", true), duration_type);
|
||||
}
|
||||
|
||||
lw_shared_ptr<column_specification> attributes::raw::concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const {
|
||||
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[concurrency]", true), data_type_for<int32_t>());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -36,13 +36,15 @@ private:
|
||||
std::optional<cql3::expr::expression> _time_to_live;
|
||||
std::optional<cql3::expr::expression> _timeout;
|
||||
std::optional<sstring> _service_level;
|
||||
std::optional<cql3::expr::expression> _concurrency;
|
||||
public:
|
||||
static std::unique_ptr<attributes> none();
|
||||
private:
|
||||
attributes(std::optional<cql3::expr::expression>&& timestamp,
|
||||
std::optional<cql3::expr::expression>&& time_to_live,
|
||||
std::optional<cql3::expr::expression>&& timeout,
|
||||
std::optional<sstring> service_level);
|
||||
std::optional<sstring> service_level,
|
||||
std::optional<cql3::expr::expression>&& concurrency);
|
||||
public:
|
||||
bool is_timestamp_set() const;
|
||||
|
||||
@@ -52,6 +54,8 @@ public:
|
||||
|
||||
bool is_service_level_set() const;
|
||||
|
||||
bool is_concurrency_set() const;
|
||||
|
||||
int64_t get_timestamp(int64_t now, const query_options& options);
|
||||
|
||||
std::optional<int32_t> get_time_to_live(const query_options& options);
|
||||
@@ -60,6 +64,8 @@ public:
|
||||
|
||||
qos::service_level_options get_service_level(qos::service_level_controller& sl_controller) const;
|
||||
|
||||
std::optional<int32_t> get_concurrency(const query_options& options) const;
|
||||
|
||||
void fill_prepare_context(prepare_context& ctx);
|
||||
|
||||
class raw final {
|
||||
@@ -68,6 +74,7 @@ public:
|
||||
std::optional<cql3::expr::expression> time_to_live;
|
||||
std::optional<cql3::expr::expression> timeout;
|
||||
std::optional<sstring> service_level;
|
||||
std::optional<cql3::expr::expression> concurrency;
|
||||
|
||||
std::unique_ptr<attributes> prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const;
|
||||
private:
|
||||
@@ -76,6 +83,8 @@ public:
|
||||
lw_shared_ptr<column_specification> time_to_live_receiver(const sstring& ks_name, const sstring& cf_name) const;
|
||||
|
||||
lw_shared_ptr<column_specification> timeout_receiver(const sstring& ks_name, const sstring& cf_name) const;
|
||||
|
||||
lw_shared_ptr<column_specification> concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const;
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -279,11 +279,15 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
|
||||
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
|
||||
}
|
||||
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
// Regular secondary indexes require rf-rack-validity.
|
||||
// Custom indexes need to validate this property themselves, if they need it.
|
||||
if (!_properties || !_properties->custom_class) {
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
validate_for_local_index(*schema);
|
||||
|
||||
@@ -21,7 +21,7 @@ namespace cql3 {
|
||||
namespace statements {
|
||||
|
||||
static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, std::vector<query::clustering_range> clustering_bounds, view_ptr view,
|
||||
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration) {
|
||||
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration, size_t concurrency) {
|
||||
auto key_columns = std::ranges::to<std::vector<const column_definition*>>(
|
||||
view->all_columns()
|
||||
| std::views::filter([] (const column_definition& cdef) { return cdef.is_primary_key(); })
|
||||
@@ -35,7 +35,7 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
|
||||
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
|
||||
|
||||
auto p = service::pager::query_pagers::ghost_row_deleting_pager(schema_ptr(view), selection, state,
|
||||
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration);
|
||||
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration, concurrency);
|
||||
|
||||
int32_t page_size = std::max(options.get_page_size(), 1000);
|
||||
auto now = gc_clock::now();
|
||||
@@ -62,7 +62,8 @@ future<::shared_ptr<cql_transport::messages::result_message>> prune_materialized
|
||||
auto timeout_duration = get_timeout(state.get_client_state(), options);
|
||||
dht::partition_range_vector key_ranges = _restrictions->get_partition_key_ranges(options);
|
||||
std::vector<query::clustering_range> clustering_bounds = _restrictions->get_clustering_bounds(options);
|
||||
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration).then([] {
|
||||
size_t concurrency = _attrs->is_concurrency_set() ? _attrs->get_concurrency(options).value() : 1;
|
||||
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration, concurrency).then([] {
|
||||
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(::make_shared<cql_transport::messages::result_message::void_message>());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -165,7 +165,7 @@ future<> db::commitlog_replayer::impl::init() {
|
||||
|
||||
future<db::commitlog_replayer::impl::stats>
|
||||
db::commitlog_replayer::impl::recover(const commitlog::descriptor& d, const commitlog::replay_state& rpstate) const {
|
||||
SCYLLA_ASSERT(_column_mappings.local_is_initialized());
|
||||
scylla_assert(_column_mappings.local_is_initialized());
|
||||
|
||||
replay_position rp{d};
|
||||
auto gp = min_pos(rp.shard_id());
|
||||
|
||||
18
db/config.cc
18
db/config.cc
@@ -1172,6 +1172,17 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"* default_weight: (Default: 1 **) How many requests are handled during each turn of the RoundRobin.\n"
|
||||
"* weights: (Default: Keyspace: 1) Takes a list of keyspaces. It sets how many requests are handled during each turn of the RoundRobin, based on the request_scheduler_id.")
|
||||
/**
|
||||
* @Group Vector search settings
|
||||
* @GroupDescription Settings for configuring and tuning vector search functionality.
|
||||
*/
|
||||
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "",
|
||||
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
|
||||
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
|
||||
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
|
||||
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
|
||||
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
|
||||
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
|
||||
/**
|
||||
* @Group Security properties
|
||||
* @GroupDescription Server and client security settings.
|
||||
*/
|
||||
@@ -1459,13 +1470,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
|
||||
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
|
||||
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
|
||||
, vector_store_primary_uri(
|
||||
this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "", "A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
|
||||
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
|
||||
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
|
||||
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
|
||||
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in vector_store_primary_uri and vector_store_secondary_uri. The available options are:\n"
|
||||
"* truststore: (Default: <not set. use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
|
||||
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
|
||||
|
||||
@@ -344,6 +344,9 @@ public:
|
||||
named_value<sstring> request_scheduler;
|
||||
named_value<sstring> request_scheduler_id;
|
||||
named_value<string_map> request_scheduler_options;
|
||||
named_value<sstring> vector_store_primary_uri;
|
||||
named_value<sstring> vector_store_secondary_uri;
|
||||
named_value<string_map> vector_store_encryption_options;
|
||||
named_value<sstring> authenticator;
|
||||
named_value<sstring> internode_authenticator;
|
||||
named_value<sstring> authorizer;
|
||||
@@ -471,10 +474,6 @@ public:
|
||||
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
|
||||
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
|
||||
|
||||
named_value<sstring> vector_store_primary_uri;
|
||||
named_value<sstring> vector_store_secondary_uri;
|
||||
named_value<string_map> vector_store_encryption_options;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
named_value<bool> sanitizer_report_backtrace;
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "reader_concurrency_semaphore.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "utils/assert.hh"
|
||||
|
||||
static logging::logger corrupt_data_logger("corrupt_data");
|
||||
|
||||
@@ -75,14 +76,14 @@ future<corrupt_data_handler::entry_id> system_table_corrupt_data_handler::do_rec
|
||||
|
||||
auto set_cell_raw = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, managed_bytes cell_value) {
|
||||
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
|
||||
SCYLLA_ASSERT(cdef);
|
||||
scylla_assert(cdef);
|
||||
|
||||
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value, _entry_ttl));
|
||||
};
|
||||
|
||||
auto set_cell = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, data_value cell_value) {
|
||||
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
|
||||
SCYLLA_ASSERT(cdef);
|
||||
scylla_assert(cdef);
|
||||
|
||||
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value.serialize_nonnull(), _entry_ttl));
|
||||
};
|
||||
|
||||
@@ -39,7 +39,7 @@ large_data_handler::large_data_handler(uint64_t partition_threshold_bytes, uint6
|
||||
}
|
||||
|
||||
future<large_data_handler::partition_above_threshold> large_data_handler::maybe_record_large_partitions(const sstables::sstable& sst, const sstables::key& key, uint64_t partition_size, uint64_t rows, uint64_t range_tombstones, uint64_t dead_rows) {
|
||||
SCYLLA_ASSERT(running());
|
||||
scylla_assert(running());
|
||||
partition_above_threshold above_threshold{partition_size > _partition_threshold_bytes, rows > _rows_count_threshold};
|
||||
static_assert(std::is_same_v<decltype(above_threshold.size), bool>);
|
||||
_stats.partitions_bigger_than_threshold += above_threshold.size; // increment if true
|
||||
@@ -83,7 +83,7 @@ sstring large_data_handler::sst_filename(const sstables::sstable& sst) {
|
||||
}
|
||||
|
||||
future<> large_data_handler::maybe_delete_large_data_entries(sstables::shared_sstable sst) {
|
||||
SCYLLA_ASSERT(running());
|
||||
scylla_assert(running());
|
||||
auto schema = sst->get_schema();
|
||||
auto filename = sst_filename(*sst);
|
||||
using ldt = sstables::large_data_type;
|
||||
@@ -247,7 +247,7 @@ future<> cql_table_large_data_handler::record_large_rows(const sstables::sstable
|
||||
|
||||
future<> cql_table_large_data_handler::delete_large_data_entries(const schema& s, sstring sstable_name, std::string_view large_table_name) const {
|
||||
auto sys_ks = _sys_ks.get_permit();
|
||||
SCYLLA_ASSERT(sys_ks);
|
||||
scylla_assert(sys_ks);
|
||||
const sstring req =
|
||||
seastar::format("DELETE FROM system.{} WHERE keyspace_name = ? AND table_name = ? AND sstable_name = ?",
|
||||
large_table_name);
|
||||
|
||||
@@ -80,7 +80,7 @@ public:
|
||||
|
||||
future<bool> maybe_record_large_rows(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, uint64_t row_size) {
|
||||
SCYLLA_ASSERT(running());
|
||||
scylla_assert(running());
|
||||
if (row_size > _row_threshold_bytes) [[unlikely]] {
|
||||
return with_sem([&sst, &partition_key, clustering_key, row_size, this] {
|
||||
return record_large_rows(sst, partition_key, clustering_key, row_size);
|
||||
@@ -100,7 +100,7 @@ public:
|
||||
|
||||
future<bool> maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
|
||||
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements) {
|
||||
SCYLLA_ASSERT(running());
|
||||
scylla_assert(running());
|
||||
if (cell_size > _cell_threshold_bytes || collection_elements > _collection_elements_count_threshold) [[unlikely]] {
|
||||
return with_sem([&sst, &partition_key, clustering_key, &cdef, cell_size, collection_elements, this] {
|
||||
return record_large_cells(sst, partition_key, clustering_key, cdef, cell_size, collection_elements);
|
||||
|
||||
@@ -1121,7 +1121,7 @@ future<> schema_applier::commit() {
|
||||
// Run func first on shard 0
|
||||
// to allow "seeding" of the effective_replication_map
|
||||
// with a new e_r_m instance.
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
commit_on_shard(sharded_db.local());
|
||||
co_await sharded_db.invoke_on_others([this] (replica::database& db) {
|
||||
commit_on_shard(db);
|
||||
|
||||
@@ -187,7 +187,7 @@ static future<std::vector<token_range>> get_local_ranges(replica::database& db,
|
||||
auto ranges = db.get_token_metadata().get_primary_ranges_for(std::move(tokens));
|
||||
std::vector<token_range> local_ranges;
|
||||
auto to_bytes = [](const std::optional<dht::token_range::bound>& b) {
|
||||
SCYLLA_ASSERT(b);
|
||||
scylla_assert(b);
|
||||
return utf8_type->decompose(b->value().to_sstring());
|
||||
};
|
||||
// We merge the ranges to be compatible with how Cassandra shows it's size estimates table.
|
||||
|
||||
@@ -231,7 +231,7 @@ static schema_ptr get_current_service_levels(data_dictionary::database db) {
|
||||
}
|
||||
|
||||
static schema_ptr get_updated_service_levels(data_dictionary::database db, bool workload_prioritization_enabled) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
auto schema = get_current_service_levels(db);
|
||||
schema_builder b(schema);
|
||||
for (const auto& col : new_service_levels_columns(workload_prioritization_enabled)) {
|
||||
|
||||
@@ -9,6 +9,8 @@
|
||||
#include "query/query-result-reader.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
@@ -25,8 +27,14 @@ class delete_ghost_rows_visitor {
|
||||
replica::table& _view_table;
|
||||
schema_ptr _base_schema;
|
||||
std::optional<partition_key> _view_pk;
|
||||
db::timeout_semaphore _concurrency_semaphore;
|
||||
seastar::gate _gate;
|
||||
std::exception_ptr& _ex;
|
||||
|
||||
public:
|
||||
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration);
|
||||
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex);
|
||||
delete_ghost_rows_visitor(delete_ghost_rows_visitor&&) = default;
|
||||
~delete_ghost_rows_visitor() noexcept;
|
||||
|
||||
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
|
||||
}
|
||||
@@ -45,6 +53,9 @@ public:
|
||||
uint32_t accept_partition_end(const query::result_row_view& static_row) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private:
|
||||
future<> do_accept_new_row(partition_key pk, clustering_key ck);
|
||||
};
|
||||
|
||||
} //namespace db::view
|
||||
|
||||
@@ -153,14 +153,14 @@ row_locker::unlock(const dht::decorated_key* pk, bool partition_exclusive,
|
||||
mylog.error("column_family::local_base_lock_holder::~local_base_lock_holder() can't find lock for partition", *pk);
|
||||
return;
|
||||
}
|
||||
SCYLLA_ASSERT(&pli->first == pk);
|
||||
scylla_assert(&pli->first == pk);
|
||||
if (cpk) {
|
||||
auto rli = pli->second._row_locks.find(*cpk);
|
||||
if (rli == pli->second._row_locks.end()) {
|
||||
mylog.error("column_family::local_base_lock_holder::~local_base_lock_holder() can't find lock for row", *cpk);
|
||||
return;
|
||||
}
|
||||
SCYLLA_ASSERT(&rli->first == cpk);
|
||||
scylla_assert(&rli->first == cpk);
|
||||
mylog.debug("releasing {} lock for row {} in partition {}", (row_exclusive ? "exclusive" : "shared"), *cpk, *pk);
|
||||
auto& lock = rli->second;
|
||||
if (row_exclusive) {
|
||||
|
||||
@@ -3597,7 +3597,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem
|
||||
})
|
||||
{ }
|
||||
|
||||
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration)
|
||||
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex)
|
||||
: _proxy(proxy)
|
||||
, _state(state)
|
||||
, _timeout_duration(timeout_duration)
|
||||
@@ -3605,8 +3605,20 @@ delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& pro
|
||||
, _view_table(_proxy.get_db().local().find_column_family(view))
|
||||
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
|
||||
, _view_pk()
|
||||
, _concurrency_semaphore(concurrency)
|
||||
, _ex(ex)
|
||||
{}
|
||||
|
||||
|
||||
delete_ghost_rows_visitor::~delete_ghost_rows_visitor() noexcept {
|
||||
try {
|
||||
_gate.close().get();
|
||||
} catch (...) {
|
||||
// Closing the gate should never throw, but if it does anyway, capture the exception.
|
||||
_ex = std::current_exception();
|
||||
}
|
||||
}
|
||||
|
||||
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
SCYLLA_ASSERT(thread::running_in_thread());
|
||||
_view_pk = key;
|
||||
@@ -3614,7 +3626,18 @@ void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, u
|
||||
|
||||
// Assumes running in seastar::thread
|
||||
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
auto view_exploded_pk = _view_pk->explode();
|
||||
auto units = get_units(_concurrency_semaphore, 1).get();
|
||||
(void)seastar::try_with_gate(_gate, [this, pk = _view_pk.value(), units = std::move(units), ck] () mutable {
|
||||
return do_accept_new_row(std::move(pk), std::move(ck)).then_wrapped([this, units = std::move(units)] (future<>&& f) mutable {
|
||||
if (f.failed()) {
|
||||
_ex = f.get_exception();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clustering_key ck) {
|
||||
auto view_exploded_pk = pk.explode();
|
||||
auto view_exploded_ck = ck.explode();
|
||||
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
|
||||
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
|
||||
@@ -3649,17 +3672,17 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
|
||||
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
|
||||
auto timeout = db::timeout_clock::now() + _timeout_duration;
|
||||
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
|
||||
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get();
|
||||
auto base_qr = co_await _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts);
|
||||
query::result& result = *base_qr.query_result;
|
||||
auto delete_ghost_row = [&]() {
|
||||
mutation m(_view, *_view_pk);
|
||||
auto delete_ghost_row = [&]() -> future<> {
|
||||
mutation m(_view, pk);
|
||||
auto& row = m.partition().clustered_row(*_view, ck);
|
||||
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
timeout = db::timeout_clock::now() + _timeout_duration;
|
||||
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
|
||||
return _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no);
|
||||
};
|
||||
if (result.row_count().value_or(0) == 0) {
|
||||
delete_ghost_row();
|
||||
co_await delete_ghost_row();
|
||||
} else if (!view_key_cols_not_in_base_key.empty()) {
|
||||
if (result.row_count().value_or(0) != 1) {
|
||||
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
|
||||
@@ -3669,7 +3692,7 @@ void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const q
|
||||
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
|
||||
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
|
||||
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
|
||||
delete_ghost_row();
|
||||
co_await delete_ghost_row();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
1
dist/debian/debian/scylla-server.install
vendored
1
dist/debian/debian/scylla-server.install
vendored
@@ -2,7 +2,6 @@ etc/default/scylla-server
|
||||
etc/default/scylla-housekeeping
|
||||
etc/scylla.d/*.conf
|
||||
etc/bash_completion.d/nodetool-completion
|
||||
opt/scylladb/share/p11-kit/modules/*
|
||||
opt/scylladb/share/doc/scylla/*
|
||||
opt/scylladb/share/doc/scylla/licenses/
|
||||
usr/lib/systemd/system/*.timer
|
||||
|
||||
1
dist/redhat/scylla.spec
vendored
1
dist/redhat/scylla.spec
vendored
@@ -122,7 +122,6 @@ ln -sfT /etc/scylla /var/lib/scylla/conf
|
||||
%config(noreplace) %{_sysconfdir}/sysconfig/scylla-housekeeping
|
||||
%attr(0755,root,root) %dir %{_sysconfdir}/scylla.d
|
||||
%config(noreplace) %{_sysconfdir}/scylla.d/*.conf
|
||||
/opt/scylladb/share/p11-kit/modules/*
|
||||
/opt/scylladb/share/doc/scylla/*
|
||||
%{_unitdir}/scylla-fstrim.service
|
||||
%{_unitdir}/scylla-housekeeping-daily.service
|
||||
|
||||
@@ -1,6 +1,18 @@
|
||||
### a dictionary of redirections
|
||||
#old path: new path
|
||||
|
||||
# Move the diver information to another project
|
||||
|
||||
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html
|
||||
/stable/using-scylla/drivers/dynamo-drivers/index.html: https://docs.scylladb.com/stable/drivers/dynamo-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/index.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-python-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-java-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-go-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-gocqlx-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-cpp-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
/stable/using-scylla/drivers/cql-drivers/scylla-rust-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
|
||||
|
||||
# Redirect 2025.1 upgrade guides that are not on master but were indexed by Google (404 reported)
|
||||
|
||||
/master/upgrade/upgrade-guides/upgrade-guide-from-2024.x-to-2025.1/upgrade-guide-from-2024.x-to-2025.1.html: https://docs.scylladb.com/manual/stable/upgrade/index.html
|
||||
|
||||
@@ -106,6 +106,15 @@ which is recommended in order to make the operation less heavyweight
|
||||
and allow for running multiple parallel pruning statements for non-overlapping
|
||||
token ranges.
|
||||
|
||||
By default, the PRUNE MATERIALIZED VIEW statement is relatively slow, only
|
||||
performing one base read or write at a time. This can be changed with the
|
||||
USING CONCURRENCY clause. If the clause is used, the concurrency of reads
|
||||
and writes from the base table will be allowed to increase up to the specified
|
||||
value. For example, to run the PRUNE with 100 parallel reads/writes, you can use:
|
||||
```cql
|
||||
PRUNE MATERIALIZED VIEW my_view WHERE v = 19 USING CONCURRENCY 100;
|
||||
```
|
||||
|
||||
## Synchronous materialized views
|
||||
|
||||
Usually, when a table with materialized views is updated, the update to the
|
||||
|
||||
198
docs/dev/scylla_assert_conversion.md
Normal file
198
docs/dev/scylla_assert_conversion.md
Normal file
@@ -0,0 +1,198 @@
|
||||
# SCYLLA_ASSERT to scylla_assert() Conversion Guide
|
||||
|
||||
## Overview
|
||||
|
||||
This document tracks the conversion of `SCYLLA_ASSERT` to the new `scylla_assert()` macro based on `on_internal_error()`. The new macro throws exceptions instead of crashing the process, preventing cluster-wide crashes and loss of availability.
|
||||
|
||||
## Status Summary
|
||||
|
||||
- **Total SCYLLA_ASSERT usages**: ~1307 (including tests)
|
||||
- **Non-test usages**: ~886
|
||||
- **Unsafe conversions (noexcept)**: ~187
|
||||
- **Unsafe conversions (destructors)**: ~36
|
||||
- **Safe conversions possible**: ~668
|
||||
- **Converted so far**: 112
|
||||
|
||||
## Safe vs Unsafe Contexts
|
||||
|
||||
### Safe to Convert ✓
|
||||
- Regular functions (non-noexcept)
|
||||
- Coroutine functions (returning `future<T>`)
|
||||
- Member functions without noexcept specifier
|
||||
- Functions where exception propagation is acceptable
|
||||
|
||||
### Unsafe to Convert ✗
|
||||
1. **noexcept functions** - throwing exceptions from noexcept causes `std::terminate()`
|
||||
2. **Destructors** - destructors are implicitly noexcept
|
||||
3. **noexcept lambdas and callbacks**
|
||||
4. **Code with explicit exception-safety requirements** that cannot handle exceptions
|
||||
|
||||
## Files with Unsafe Conversions
|
||||
|
||||
### Files with SCYLLA_ASSERT in noexcept contexts (examples)
|
||||
|
||||
1. **reader_concurrency_semaphore.cc**
|
||||
- Lines with noexcept functions containing SCYLLA_ASSERT
|
||||
- Must remain as SCYLLA_ASSERT
|
||||
|
||||
2. **db/large_data_handler.cc**
|
||||
- Line 86: `maybe_delete_large_data_entries()` - marked noexcept but contains SCYLLA_ASSERT
|
||||
- Analysis shows this is actually safe (not truly noexcept)
|
||||
|
||||
3. **db/row_cache.cc**
|
||||
- Multiple SCYLLA_ASSERT usages in noexcept member functions
|
||||
|
||||
4. **db/schema_tables.cc**
|
||||
- SCYLLA_ASSERT in noexcept contexts
|
||||
|
||||
5. **raft/server.cc**
|
||||
- Multiple noexcept functions with SCYLLA_ASSERT
|
||||
|
||||
### Files with SCYLLA_ASSERT in destructors
|
||||
|
||||
1. **reader_concurrency_semaphore.cc**
|
||||
- Line 1116: SCYLLA_ASSERT in destructor
|
||||
|
||||
2. **api/column_family.cc**
|
||||
- Line 102: SCYLLA_ASSERT in destructor
|
||||
|
||||
3. **utils/logalloc.cc**
|
||||
- Line 1991: SCYLLA_ASSERT in destructor
|
||||
|
||||
4. **utils/file_lock.cc**
|
||||
- Lines 34, 36: SCYLLA_ASSERT in destructor
|
||||
|
||||
5. **utils/disk_space_monitor.cc**
|
||||
- Line 66: SCYLLA_ASSERT in destructor
|
||||
|
||||
## Conversion Strategy
|
||||
|
||||
### Phase 1: Infrastructure (Completed)
|
||||
- Created `scylla_assert()` macro in `utils/assert.hh`
|
||||
- Uses `on_internal_error()` for exception-based error handling
|
||||
- Supports optional message parameters
|
||||
|
||||
### Phase 2: Safe Conversions
|
||||
Convert SCYLLA_ASSERT to scylla_assert in contexts where:
|
||||
- Function is not noexcept
|
||||
- Not in a destructor
|
||||
- Exception propagation is safe
|
||||
|
||||
### Phase 3: Document Remaining Uses
|
||||
For contexts that cannot be converted:
|
||||
- Add comments explaining why SCYLLA_ASSERT must remain
|
||||
- Consider alternative approaches (e.g., using `on_fatal_internal_error()` in noexcept)
|
||||
|
||||
## Converted Files
|
||||
|
||||
### Completed Conversions
|
||||
|
||||
1. **db/large_data_handler.cc** (3 conversions)
|
||||
- Line 42: `maybe_record_large_partitions()`
|
||||
- Line 86: `maybe_delete_large_data_entries()`
|
||||
- Line 250: `delete_large_data_entries()`
|
||||
|
||||
2. **db/large_data_handler.hh** (2 conversions)
|
||||
- Line 83: `maybe_record_large_rows()`
|
||||
- Line 103: `maybe_record_large_cells()`
|
||||
|
||||
3. **db/schema_applier.cc** (1 conversion)
|
||||
- Line 1124: `commit()` coroutine
|
||||
|
||||
4. **db/system_distributed_keyspace.cc** (1 conversion)
|
||||
- Line 234: `get_updated_service_levels()`
|
||||
|
||||
5. **db/commitlog/commitlog_replayer.cc** (1 conversion)
|
||||
- Line 168: `recover()` coroutine
|
||||
|
||||
6. **db/view/row_locking.cc** (2 conversions)
|
||||
- Line 156: `unlock()` - partition lock check
|
||||
- Line 163: `unlock()` - row lock check
|
||||
|
||||
7. **db/size_estimates_virtual_reader.cc** (1 conversion)
|
||||
- Line 190: Lambda in `get_local_ranges()`
|
||||
|
||||
8. **db/corrupt_data_handler.cc** (2 conversions)
|
||||
- Line 78: `set_cell_raw` lambda
|
||||
- Line 85: `set_cell` lambda
|
||||
|
||||
9. **raft/tracker.cc** (2 conversions)
|
||||
- Line 49: Switch default case with descriptive error
|
||||
- Line 90: Switch default case with descriptive error
|
||||
|
||||
10. **service/topology_coordinator.cc** (11 conversions)
|
||||
- Line 363: Node lookup assertion in `retake_node()`
|
||||
- Line 2313: Bootstrapping state ring check
|
||||
- Line 2362: Replacing state ring check
|
||||
- Line 2365: Normal nodes lookup assertion
|
||||
- Line 2366: Node ring and state validation
|
||||
- Line 3025: Join request ring check
|
||||
- Line 3036: Leave request ring check
|
||||
- Line 3049: Remove request ring check
|
||||
- Line 3061: Replace request ring check
|
||||
- Line 3166: Transition nodes empty check
|
||||
- Line 4016: Barrier validation in `stop()`
|
||||
|
||||
11. **service/storage_service.cc** (28 conversions, 3 unsafe kept as SCYLLA_ASSERT)
|
||||
- Lines 603, 691, 857, 901, 969: Core service operations
|
||||
- Lines 1523, 1575, 1844, 2086, 2170, 2195: Bootstrap and join operations
|
||||
- Lines 2319, 2352, 2354: Replacement operations
|
||||
- Lines 3003, 3028, 3228: Cluster join and drain operations
|
||||
- Lines 3995, 4047, 4353: Decommission and removenode operations
|
||||
- Lines 4473, 5787, 5834, 5958: CDC and topology change operations
|
||||
- Lines 6490, 6491: Tablet streaming operations
|
||||
- Line 7512: Join node response handler
|
||||
- **Unsafe (kept as SCYLLA_ASSERT)**: Lines 3398, 5760, 5775 (noexcept functions)
|
||||
|
||||
12. **sstables/** (58 conversions across 22 files)
|
||||
- **sstables/trie/bti_node_reader.cc** (6): Node reading operations
|
||||
- **sstables/mx/writer.cc** (6): MX format writing
|
||||
- **sstables/sstable_set.cc** (5): SSTable set management
|
||||
- **sstables/compressor.cc** (5): Compression/decompression
|
||||
- **sstables/trie/trie_writer.hh** (4): Trie writing
|
||||
- **sstables/downsampling.hh** (4): Downsampling operations
|
||||
- **sstables/storage.{cc,hh}** (6): Storage operations
|
||||
- **sstables/sstables_manager.{cc,hh}** (6): SSTable lifecycle management
|
||||
- **sstables/trie/writer_node.{hh,impl.hh}** (4): Trie node writing
|
||||
- **sstables/trie/bti_key_translation.cc** (2): Key translation
|
||||
- **sstables/sstable_directory.cc** (2): Directory management
|
||||
- **sstables/trie/trie_writer.cc** (1): Trie writer implementation
|
||||
- **sstables/trie/trie_traversal.hh** (1): Trie traversal
|
||||
- **sstables/sstables.cc** (1): Core SSTable operations
|
||||
- **sstables/partition_index_cache.hh** (1): Index caching
|
||||
- **sstables/generation_type.hh** (1): Generation management
|
||||
- **sstables/compress.{cc,hh}** (2): Compression utilities
|
||||
- **sstables/exceptions.hh** (1): Comment update
|
||||
|
||||
## Testing
|
||||
|
||||
### Manual Testing
|
||||
Created `test/manual/test_scylla_assert.cc` to verify:
|
||||
- Passing assertions succeed
|
||||
- Failing assertions throw exceptions
|
||||
- Custom messages are properly formatted
|
||||
|
||||
### Integration Testing
|
||||
- Run existing test suite with converted assertions
|
||||
- Verify no regressions in error handling
|
||||
- Confirm exception propagation works correctly
|
||||
|
||||
## Future Work
|
||||
|
||||
1. **Automated Analysis Tool**
|
||||
- Create tool to identify safe vs unsafe conversion contexts
|
||||
- Generate reports of remaining conversions
|
||||
|
||||
2. **Gradual Conversion**
|
||||
- Convert additional safe usages incrementally
|
||||
- Monitor for any unexpected issues
|
||||
|
||||
3. **noexcept Review**
|
||||
- Review functions marked noexcept that contain SCYLLA_ASSERT
|
||||
- Consider if they should use `on_fatal_internal_error()` instead
|
||||
|
||||
## References
|
||||
|
||||
- `utils/assert.hh` - Implementation of both SCYLLA_ASSERT and scylla_assert
|
||||
- `utils/on_internal_error.hh` - Exception-based error handling infrastructure
|
||||
- GitHub Issue: [Link to original issue tracking this work]
|
||||
614
docs/dev/unsafe_scylla_assert_locations.md
Normal file
614
docs/dev/unsafe_scylla_assert_locations.md
Normal file
@@ -0,0 +1,614 @@
|
||||
# Unsafe SCYLLA_ASSERT Locations
|
||||
|
||||
This document lists specific locations where SCYLLA_ASSERT cannot be safely converted to scylla_assert().
|
||||
|
||||
## Summary
|
||||
|
||||
- Files with noexcept SCYLLA_ASSERT: 50
|
||||
- Files with destructor SCYLLA_ASSERT: 25
|
||||
- Total unsafe SCYLLA_ASSERT in noexcept: 187
|
||||
- Total unsafe SCYLLA_ASSERT in destructors: 36
|
||||
|
||||
## SCYLLA_ASSERT in noexcept Functions
|
||||
|
||||
### auth/cache.cc
|
||||
|
||||
- Line 118: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### db/cache_mutation_reader.hh
|
||||
|
||||
- Line 309: `SCYLLA_ASSERT(sr->is_static_row());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### db/commitlog/commitlog.cc
|
||||
|
||||
- Line 531: `SCYLLA_ASSERT(!*this);`
|
||||
- Line 544: `SCYLLA_ASSERT(!*this);`
|
||||
- Line 662: `SCYLLA_ASSERT(_iter != _end);`
|
||||
- Line 1462: `SCYLLA_ASSERT(i->second >= count);`
|
||||
|
||||
Total: 4 usages
|
||||
|
||||
### db/hints/manager.hh
|
||||
|
||||
- Line 167: `SCYLLA_ASSERT(_ep_managers.empty());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### db/partition_snapshot_row_cursor.hh
|
||||
|
||||
- Line 384: `SCYLLA_ASSERT(_latest_it);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### db/row_cache.cc
|
||||
|
||||
- Line 1365: `SCYLLA_ASSERT(it->is_last_dummy());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### db/schema_tables.cc
|
||||
|
||||
- Line 774: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### db/view/view.cc
|
||||
|
||||
- Line 3623: `SCYLLA_ASSERT(thread::running_in_thread());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### gms/gossiper.cc
|
||||
|
||||
- Line 876: `SCYLLA_ASSERT(ptr->pid == _permit_id);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### locator/production_snitch_base.hh
|
||||
|
||||
- Line 77: `SCYLLA_ASSERT(_backreference != nullptr);`
|
||||
- Line 82: `SCYLLA_ASSERT(_backreference != nullptr);`
|
||||
- Line 87: `SCYLLA_ASSERT(_backreference != nullptr);`
|
||||
|
||||
Total: 3 usages
|
||||
|
||||
### locator/topology.cc
|
||||
|
||||
- Line 135: `SCYLLA_ASSERT(_shard == this_shard_id());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### mutation/counters.hh
|
||||
|
||||
- Line 314: `SCYLLA_ASSERT(_cell.is_live());`
|
||||
- Line 315: `SCYLLA_ASSERT(!_cell.is_counter_update());`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### mutation/mutation_partition_v2.hh
|
||||
|
||||
- Line 271: `SCYLLA_ASSERT(s.version() == _schema_version);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### mutation/partition_version.cc
|
||||
|
||||
- Line 364: `SCYLLA_ASSERT(!_snapshot->is_locked());`
|
||||
- Line 701: `SCYLLA_ASSERT(!rows.empty());`
|
||||
- Line 703: `SCYLLA_ASSERT(last_dummy.is_last_dummy());`
|
||||
- Line 746: `SCYLLA_ASSERT(!_snapshot->is_locked());`
|
||||
- Line 770: `SCYLLA_ASSERT(at_latest_version());`
|
||||
- Line 777: `SCYLLA_ASSERT(at_latest_version());`
|
||||
|
||||
Total: 6 usages
|
||||
|
||||
### mutation/partition_version.hh
|
||||
|
||||
- Line 211: `SCYLLA_ASSERT(_schema);`
|
||||
- Line 217: `SCYLLA_ASSERT(_schema);`
|
||||
- Line 254: `SCYLLA_ASSERT(!_version->_backref);`
|
||||
- Line 282: `SCYLLA_ASSERT(_version);`
|
||||
- Line 286: `SCYLLA_ASSERT(_version);`
|
||||
- Line 290: `SCYLLA_ASSERT(_version);`
|
||||
- Line 294: `SCYLLA_ASSERT(_version);`
|
||||
|
||||
Total: 7 usages
|
||||
|
||||
### mutation/partition_version_list.hh
|
||||
|
||||
- Line 36: `SCYLLA_ASSERT(!_head->is_referenced_from_entry());`
|
||||
- Line 42: `SCYLLA_ASSERT(!_tail->is_referenced_from_entry());`
|
||||
- Line 70: `SCYLLA_ASSERT(!_head->is_referenced_from_entry());`
|
||||
|
||||
Total: 3 usages
|
||||
|
||||
### mutation/range_tombstone_list.cc
|
||||
|
||||
- Line 412: `SCYLLA_ASSERT (it != rt_list.end());`
|
||||
- Line 422: `SCYLLA_ASSERT (it != rt_list.end());`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### raft/server.cc
|
||||
|
||||
- Line 1720: `SCYLLA_ASSERT(_non_joint_conf_commit_promise);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### reader_concurrency_semaphore.cc
|
||||
|
||||
- Line 109: `SCYLLA_ASSERT(_permit == o._permit);`
|
||||
- Line 432: `SCYLLA_ASSERT(_need_cpu_branches);`
|
||||
- Line 455: `SCYLLA_ASSERT(_awaits_branches);`
|
||||
- Line 1257: `SCYLLA_ASSERT(!_stopped);`
|
||||
- Line 1585: `SCYLLA_ASSERT(_stats.need_cpu_permits);`
|
||||
- Line 1587: `SCYLLA_ASSERT(_stats.need_cpu_permits >= _stats.awaits_permits);`
|
||||
- Line 1593: `SCYLLA_ASSERT(_stats.need_cpu_permits >= _stats.awaits_permits);`
|
||||
- Line 1598: `SCYLLA_ASSERT(_stats.awaits_permits);`
|
||||
|
||||
Total: 8 usages
|
||||
|
||||
### readers/multishard.cc
|
||||
|
||||
- Line 296: `SCYLLA_ASSERT(!_irh);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### repair/repair.cc
|
||||
|
||||
- Line 1073: `SCYLLA_ASSERT(table_names().size() == table_ids.size());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### replica/database.cc
|
||||
|
||||
- Line 3299: `SCYLLA_ASSERT(!_cf_lock.try_write_lock()); // lock should be acquired before the`
|
||||
- Line 3304: `SCYLLA_ASSERT(!_cf_lock.try_write_lock()); // lock should be acquired before the`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### replica/database.hh
|
||||
|
||||
- Line 1971: `SCYLLA_ASSERT(_user_sstables_manager);`
|
||||
- Line 1976: `SCYLLA_ASSERT(_system_sstables_manager);`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### replica/dirty_memory_manager.cc
|
||||
|
||||
- Line 67: `SCYLLA_ASSERT(!child->_heap_handle);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### replica/dirty_memory_manager.hh
|
||||
|
||||
- Line 261: `SCYLLA_ASSERT(_shutdown_requested);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### replica/memtable.cc
|
||||
|
||||
- Line 563: `SCYLLA_ASSERT(_mt._flushed_memory <= static_cast<int64_t>(_mt.occupancy().total_`
|
||||
- Line 860: `SCYLLA_ASSERT(!reclaiming_enabled());`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### replica/table.cc
|
||||
|
||||
- Line 2829: `SCYLLA_ASSERT(!trange.start()->is_inclusive() && trange.end()->is_inclusive());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### schema/schema.hh
|
||||
|
||||
- Line 1022: `SCYLLA_ASSERT(_schema->is_view());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### schema/schema_registry.cc
|
||||
|
||||
- Line 257: `SCYLLA_ASSERT(_state >= state::LOADED);`
|
||||
- Line 262: `SCYLLA_ASSERT(_state >= state::LOADED);`
|
||||
- Line 329: `SCYLLA_ASSERT(o._cpu_of_origin == current);`
|
||||
|
||||
Total: 3 usages
|
||||
|
||||
### service/direct_failure_detector/failure_detector.cc
|
||||
|
||||
- Line 628: `SCYLLA_ASSERT(alive != endpoint_liveness.marked_alive);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### service/storage_service.cc
|
||||
|
||||
- Line 3398: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
- Line 5760: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
- Line 5775: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
- Line 5787: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
|
||||
Total: 4 usages
|
||||
|
||||
### sstables/generation_type.hh
|
||||
|
||||
- Line 132: `SCYLLA_ASSERT(bool(gen));`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### sstables/partition_index_cache.hh
|
||||
|
||||
- Line 62: `SCYLLA_ASSERT(!ready());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### sstables/sstables_manager.hh
|
||||
|
||||
- Line 244: `SCYLLA_ASSERT(_sstables_registry && "sstables_registry is not plugged");`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### sstables/storage.hh
|
||||
|
||||
- Line 86: `SCYLLA_ASSERT(false && "Changing directory not implemented");`
|
||||
- Line 89: `SCYLLA_ASSERT(false && "Direct links creation not implemented");`
|
||||
- Line 92: `SCYLLA_ASSERT(false && "Direct move not implemented");`
|
||||
|
||||
Total: 3 usages
|
||||
|
||||
### sstables_loader.cc
|
||||
|
||||
- Line 735: `SCYLLA_ASSERT(p);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### tasks/task_manager.cc
|
||||
|
||||
- Line 56: `SCYLLA_ASSERT(inserted);`
|
||||
- Line 76: `SCYLLA_ASSERT(child->get_status().progress_units == progress_units);`
|
||||
- Line 454: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
|
||||
Total: 3 usages
|
||||
|
||||
### tools/schema_loader.cc
|
||||
|
||||
- Line 281: `SCYLLA_ASSERT(p);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/UUID.hh
|
||||
|
||||
- Line 59: `SCYLLA_ASSERT(is_timestamp());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/bptree.hh
|
||||
|
||||
- Line 289: `SCYLLA_ASSERT(n.is_leftmost());`
|
||||
- Line 301: `SCYLLA_ASSERT(n.is_rightmost());`
|
||||
- Line 343: `SCYLLA_ASSERT(leaf->is_leaf());`
|
||||
- Line 434: `SCYLLA_ASSERT(d->attached());`
|
||||
- Line 453: `SCYLLA_ASSERT(n._num_keys > 0);`
|
||||
- Line 505: `SCYLLA_ASSERT(n->is_leftmost());`
|
||||
- Line 511: `SCYLLA_ASSERT(n->is_rightmost());`
|
||||
- Line 517: `SCYLLA_ASSERT(n->is_root());`
|
||||
- Line 557: `SCYLLA_ASSERT(!is_end());`
|
||||
- Line 566: `SCYLLA_ASSERT(!is_end());`
|
||||
- Line 613: `SCYLLA_ASSERT(n->_num_keys > 0);`
|
||||
- Line 833: `SCYLLA_ASSERT(_left->_num_keys > 0);`
|
||||
- Line 926: `SCYLLA_ASSERT(rl == rb);`
|
||||
- Line 927: `SCYLLA_ASSERT(rl <= nr);`
|
||||
- Line 1037: `SCYLLA_ASSERT(is_leaf());`
|
||||
- Line 1042: `SCYLLA_ASSERT(is_leaf());`
|
||||
- Line 1047: `SCYLLA_ASSERT(is_leaf());`
|
||||
- Line 1052: `SCYLLA_ASSERT(is_leaf());`
|
||||
- Line 1062: `SCYLLA_ASSERT(t->_right == this);`
|
||||
- Line 1083: `SCYLLA_ASSERT(t->_left == this);`
|
||||
- Line 1091: `SCYLLA_ASSERT(t->_right == this);`
|
||||
- Line 1103: `SCYLLA_ASSERT(false);`
|
||||
- Line 1153: `SCYLLA_ASSERT(i <= _num_keys);`
|
||||
- Line 1212: `SCYLLA_ASSERT(off <= _num_keys);`
|
||||
- Line 1236: `SCYLLA_ASSERT(from._num_keys > 0);`
|
||||
- Line 1389: `SCYLLA_ASSERT(!is_root());`
|
||||
- Line 1450: `SCYLLA_ASSERT(_num_keys == NodeSize);`
|
||||
- Line 1563: `SCYLLA_ASSERT(_num_keys < NodeSize);`
|
||||
- Line 1577: `SCYLLA_ASSERT(i != 0 || left_kid_sorted(k, less));`
|
||||
- Line 1647: `SCYLLA_ASSERT(nodes.empty());`
|
||||
- Line 1684: `SCYLLA_ASSERT(_num_keys > 0);`
|
||||
- Line 1686: `SCYLLA_ASSERT(p._kids[i].n == this);`
|
||||
- Line 1788: `SCYLLA_ASSERT(_num_keys == 0);`
|
||||
- Line 1789: `SCYLLA_ASSERT(is_root() || !is_leaf() || (get_prev() == this && get_next() == th`
|
||||
- Line 1821: `SCYLLA_ASSERT(_parent->_kids[i].n == &other);`
|
||||
- Line 1841: `SCYLLA_ASSERT(i <= _num_keys);`
|
||||
- Line 1856: `SCYLLA_ASSERT(!_nodes.empty());`
|
||||
- Line 1938: `SCYLLA_ASSERT(!attached());`
|
||||
- Line 1943: `SCYLLA_ASSERT(attached());`
|
||||
|
||||
Total: 39 usages
|
||||
|
||||
### utils/cached_file.hh
|
||||
|
||||
- Line 104: `SCYLLA_ASSERT(!_use_count);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/compact-radix-tree.hh
|
||||
|
||||
- Line 1026: `SCYLLA_ASSERT(check_capacity(head, ni));`
|
||||
- Line 1027: `SCYLLA_ASSERT(!_data.has(ni));`
|
||||
- Line 1083: `SCYLLA_ASSERT(next_cap > head._capacity);`
|
||||
- Line 1149: `SCYLLA_ASSERT(capacity != 0);`
|
||||
- Line 1239: `SCYLLA_ASSERT(i < Size);`
|
||||
- Line 1240: `SCYLLA_ASSERT(_idx[i] == unused_node_index);`
|
||||
- Line 1470: `SCYLLA_ASSERT(kid != nullptr);`
|
||||
- Line 1541: `SCYLLA_ASSERT(ret.first != nullptr);`
|
||||
- Line 1555: `SCYLLA_ASSERT(leaf_depth >= depth);`
|
||||
- Line 1614: `SCYLLA_ASSERT(n->check_prefix(key, depth));`
|
||||
- Line 1850: `SCYLLA_ASSERT(_root.is(nil_root));`
|
||||
|
||||
Total: 11 usages
|
||||
|
||||
### utils/cross-shard-barrier.hh
|
||||
|
||||
- Line 134: `SCYLLA_ASSERT(w.has_value());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/double-decker.hh
|
||||
|
||||
- Line 200: `SCYLLA_ASSERT(!hint.match);`
|
||||
- Line 366: `SCYLLA_ASSERT(nb == end._bucket);`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### utils/intrusive-array.hh
|
||||
|
||||
- Line 217: `SCYLLA_ASSERT(!is_single_element());`
|
||||
- Line 218: `SCYLLA_ASSERT(pos < max_len);`
|
||||
- Line 225: `SCYLLA_ASSERT(pos > 0);`
|
||||
- Line 238: `SCYLLA_ASSERT(train_len < max_len);`
|
||||
- Line 329: `SCYLLA_ASSERT(idx < max_len); // may the force be with us...`
|
||||
|
||||
Total: 5 usages
|
||||
|
||||
### utils/intrusive_btree.hh
|
||||
|
||||
- Line 148: `SCYLLA_ASSERT(to.num_keys == 0);`
|
||||
- Line 157: `SCYLLA_ASSERT(!attached());`
|
||||
- Line 227: `SCYLLA_ASSERT(n->is_inline());`
|
||||
- Line 232: `SCYLLA_ASSERT(n->is_inline());`
|
||||
- Line 288: `SCYLLA_ASSERT(n.is_root());`
|
||||
- Line 294: `SCYLLA_ASSERT(n.is_leftmost());`
|
||||
- Line 302: `SCYLLA_ASSERT(n.is_rightmost());`
|
||||
- Line 368: `SCYLLA_ASSERT(_root->is_leaf());`
|
||||
- Line 371: `SCYLLA_ASSERT(_inline.empty());`
|
||||
- Line 601: `SCYLLA_ASSERT(n->is_leaf());`
|
||||
- Line 673: `SCYLLA_ASSERT(!is_end());`
|
||||
- Line 674: `SCYLLA_ASSERT(h->attached());`
|
||||
- Line 677: `SCYLLA_ASSERT(_idx < cur.n->_base.num_keys);`
|
||||
- Line 679: `SCYLLA_ASSERT(_hook->attached());`
|
||||
- Line 690: `SCYLLA_ASSERT(!is_end());`
|
||||
- Line 764: `SCYLLA_ASSERT(n->num_keys > 0);`
|
||||
- Line 994: `SCYLLA_ASSERT(!_it.is_end());`
|
||||
- Line 1178: `SCYLLA_ASSERT(is_leaf());`
|
||||
- Line 1183: `SCYLLA_ASSERT(is_root());`
|
||||
- Line 1261: `SCYLLA_ASSERT(!is_root());`
|
||||
- Line 1268: `SCYLLA_ASSERT(p->_base.num_keys > 0 && p->_kids[0] == this);`
|
||||
- Line 1275: `SCYLLA_ASSERT(p->_base.num_keys > 0 && p->_kids[p->_base.num_keys] == this);`
|
||||
- Line 1286: `SCYLLA_ASSERT(false);`
|
||||
- Line 1291: `SCYLLA_ASSERT(!nb->is_inline());`
|
||||
- Line 1296: `SCYLLA_ASSERT(!nb->is_inline());`
|
||||
- Line 1338: `SCYLLA_ASSERT(_base.num_keys == 0);`
|
||||
- Line 1373: `SCYLLA_ASSERT(!(is_leftmost() || is_rightmost()));`
|
||||
- Line 1378: `SCYLLA_ASSERT(p->_kids[i] != this);`
|
||||
- Line 1396: `SCYLLA_ASSERT(!is_leaf());`
|
||||
- Line 1537: `SCYLLA_ASSERT(src != _base.num_keys); // need more keys for the next leaf`
|
||||
- Line 1995: `SCYLLA_ASSERT(_parent.n->_base.num_keys > 0);`
|
||||
- Line 2135: `SCYLLA_ASSERT(is_leaf());`
|
||||
- Line 2144: `SCYLLA_ASSERT(_base.num_keys != 0);`
|
||||
- Line 2160: `SCYLLA_ASSERT(_base.num_keys != 0);`
|
||||
- Line 2172: `SCYLLA_ASSERT(!empty());`
|
||||
- Line 2198: `SCYLLA_ASSERT(leaf == ret->is_leaf());`
|
||||
|
||||
Total: 36 usages
|
||||
|
||||
### utils/loading_shared_values.hh
|
||||
|
||||
- Line 203: `SCYLLA_ASSERT(!_set.size());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/logalloc.cc
|
||||
|
||||
- Line 544: `SCYLLA_ASSERT(!_background_reclaimer);`
|
||||
- Line 926: `SCYLLA_ASSERT(idx < _segments.size());`
|
||||
- Line 933: `SCYLLA_ASSERT(idx < _segments.size());`
|
||||
- Line 957: `SCYLLA_ASSERT(i != _segments.end());`
|
||||
- Line 1323: `SCYLLA_ASSERT(_lsa_owned_segments_bitmap.test(idx_from_segment(seg)));`
|
||||
- Line 1366: `SCYLLA_ASSERT(desc._region);`
|
||||
- Line 1885: `SCYLLA_ASSERT(desc._buf_pointers.empty());`
|
||||
- Line 1911: `SCYLLA_ASSERT(&desc == old_ptr->_desc);`
|
||||
- Line 2105: `SCYLLA_ASSERT(seg);`
|
||||
- Line 2116: `SCYLLA_ASSERT(seg);`
|
||||
- Line 2341: `SCYLLA_ASSERT(pool.current_emergency_reserve_goal() >= n_segments);`
|
||||
|
||||
Total: 11 usages
|
||||
|
||||
### utils/logalloc.hh
|
||||
|
||||
- Line 307: `SCYLLA_ASSERT(this_shard_id() == _cpu);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/reusable_buffer.hh
|
||||
|
||||
- Line 60: `SCYLLA_ASSERT(_refcount == 0);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
|
||||
## SCYLLA_ASSERT in Destructors
|
||||
|
||||
### api/column_family.cc
|
||||
|
||||
- Line 102: `SCYLLA_ASSERT(this_shard_id() == 0);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### cdc/generation.cc
|
||||
|
||||
- Line 846: `SCYLLA_ASSERT(_stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### cdc/log.cc
|
||||
|
||||
- Line 173: `SCYLLA_ASSERT(_stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### compaction/compaction_manager.cc
|
||||
|
||||
- Line 1074: `SCYLLA_ASSERT(_state == state::none || _state == state::stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### db/hints/internal/hint_endpoint_manager.cc
|
||||
|
||||
- Line 188: `SCYLLA_ASSERT(stopped());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### mutation/partition_version.cc
|
||||
|
||||
- Line 347: `SCYLLA_ASSERT(!_snapshot->is_locked());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### reader_concurrency_semaphore.cc
|
||||
|
||||
- Line 1116: `SCYLLA_ASSERT(!_stats.waiters);`
|
||||
- Line 1125: `SCYLLA_ASSERT(_inactive_reads.empty() && !_close_readers_gate.get_count() && !_p`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### repair/row_level.cc
|
||||
|
||||
- Line 3647: `SCYLLA_ASSERT(_state == state::none || _state == state::stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### replica/cell_locking.hh
|
||||
|
||||
- Line 371: `SCYLLA_ASSERT(_partitions.empty());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### replica/distributed_loader.cc
|
||||
|
||||
- Line 305: `SCYLLA_ASSERT(_sstable_directories.empty());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### schema/schema_registry.cc
|
||||
|
||||
- Line 45: `SCYLLA_ASSERT(!_schema);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### service/direct_failure_detector/failure_detector.cc
|
||||
|
||||
- Line 378: `SCYLLA_ASSERT(_ping_fiber.available());`
|
||||
- Line 379: `SCYLLA_ASSERT(_notify_fiber.available());`
|
||||
- Line 701: `SCYLLA_ASSERT(_shard_workers.empty());`
|
||||
- Line 702: `SCYLLA_ASSERT(_destroy_subscriptions.available());`
|
||||
- Line 703: `SCYLLA_ASSERT(_update_endpoint_fiber.available());`
|
||||
- Line 707: `SCYLLA_ASSERT(!_impl);`
|
||||
|
||||
Total: 6 usages
|
||||
|
||||
### service/load_broadcaster.hh
|
||||
|
||||
- Line 37: `SCYLLA_ASSERT(_stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### service/paxos/paxos_state.cc
|
||||
|
||||
- Line 323: `SCYLLA_ASSERT(_stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### service/storage_proxy.cc
|
||||
|
||||
- Line 281: `SCYLLA_ASSERT(_stopped);`
|
||||
- Line 3207: `SCYLLA_ASSERT(!_remote);`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### service/tablet_allocator.cc
|
||||
|
||||
- Line 3288: `SCYLLA_ASSERT(_stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### sstables/compressor.cc
|
||||
|
||||
- Line 1271: `SCYLLA_ASSERT(thread::running_in_thread());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### sstables/sstables_manager.cc
|
||||
|
||||
- Line 58: `SCYLLA_ASSERT(_closing);`
|
||||
- Line 59: `SCYLLA_ASSERT(_active.empty());`
|
||||
- Line 60: `SCYLLA_ASSERT(_undergoing_close.empty());`
|
||||
|
||||
Total: 3 usages
|
||||
|
||||
### sstables/sstables_manager.hh
|
||||
|
||||
- Line 188: `SCYLLA_ASSERT(_storage != nullptr);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/cached_file.hh
|
||||
|
||||
- Line 477: `SCYLLA_ASSERT(_cache.empty());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/disk_space_monitor.cc
|
||||
|
||||
- Line 66: `SCYLLA_ASSERT(_poller_fut.available());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/file_lock.cc
|
||||
|
||||
- Line 34: `SCYLLA_ASSERT(_fd.get() != -1);`
|
||||
- Line 36: `SCYLLA_ASSERT(r == 0);`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### utils/logalloc.cc
|
||||
|
||||
- Line 1991: `SCYLLA_ASSERT(desc.is_empty());`
|
||||
- Line 1996: `SCYLLA_ASSERT(segment_pool().descriptor(_active).is_empty());`
|
||||
|
||||
Total: 2 usages
|
||||
|
||||
### utils/lru.hh
|
||||
|
||||
- Line 41: `SCYLLA_ASSERT(!_lru_link.is_linked());`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
### utils/replicator.hh
|
||||
|
||||
- Line 221: `SCYLLA_ASSERT(_stopped);`
|
||||
|
||||
Total: 1 usages
|
||||
|
||||
@@ -37,7 +37,7 @@ Getting Started
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`ScyllaDB Drivers</using-scylla/drivers/index>`
|
||||
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
|
||||
* `Get Started Lesson on ScyllaDB University <https://university.scylladb.com/courses/scylla-essentials-overview/lessons/quick-wins-install-and-run-scylla/>`_
|
||||
* :doc:`CQL Reference </cql/index>`
|
||||
* :doc:`cqlsh - the CQL shell </cql/cqlsh/>`
|
||||
|
||||
@@ -35,7 +35,7 @@ Documentation Highlights
|
||||
* :doc:`Cluster Management Procedures </operating-scylla/procedures/cluster-management/index>`
|
||||
* :doc:`Upgrade ScyllaDB </upgrade/index>`
|
||||
* :doc:`CQL Reference </cql/index>`
|
||||
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`
|
||||
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
|
||||
* :doc:`Features </features/index>`
|
||||
|
||||
ScyllaDB Support
|
||||
|
||||
@@ -172,7 +172,7 @@ For example:
|
||||
* `ScyllaDB Java Driver <https://github.com/scylladb/java-driver/tree/3.7.1-scylla/manual/compression>`_
|
||||
* `Go Driver <https://godoc.org/github.com/gocql/gocql#Compressor>`_
|
||||
|
||||
Refer to the :doc:`Drivers Page </using-scylla/drivers/index>` for more drivers.
|
||||
Refer to `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ for more drivers.
|
||||
|
||||
.. _internode-compression:
|
||||
|
||||
|
||||
@@ -206,7 +206,7 @@ This is 19% of the latency compared to no batching.
|
||||
Driver Guidelines
|
||||
-----------------
|
||||
|
||||
Use the :doc:`ScyllaDB drivers </using-scylla/drivers/index>` that are available for Java, Python, Go, and C/C++.
|
||||
Use the `ScyllaDB drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ that are available for Java, Python, Go, and C/C++.
|
||||
They provide much better performance than third-party drivers because they are shard aware &emdash; they can route requests to the right CPU core (shard).
|
||||
When the driver starts, it gets the topology of the cluster and therefore it knows exactly which CPU core should get a request.
|
||||
Our latest shard-aware drivers also improve the efficiency of our Change Data Capture (CDC) feature.
|
||||
|
||||
@@ -121,7 +121,7 @@ Driver Compression
|
||||
|
||||
This refers to compressing traffic between the client and ScyllaDB.
|
||||
Verify your client driver is using compressed traffic when connected to ScyllaDB.
|
||||
As compression is driver settings dependent, please check your client driver manual or :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`.
|
||||
As compression is driver settings dependent, please check your client driver manual. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
|
||||
|
||||
|
||||
Connectivity
|
||||
@@ -130,7 +130,7 @@ Connectivity
|
||||
Drivers Settings
|
||||
================
|
||||
|
||||
* Use shard aware drivers wherever possible. :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` (not third-party drivers) are shard aware.
|
||||
* Use shard aware drivers wherever possible. `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ (not third-party drivers) are shard aware.
|
||||
* Configure connection pool - open more connections (>3 per shard) and/Or more clients. See `this blog <https://www.scylladb.com/2019/11/20/maximizing-performance-via-concurrency-while-minimizing-timeouts-in-distributed-databases/>`_.
|
||||
|
||||
Management
|
||||
|
||||
@@ -25,8 +25,8 @@ Actions
|
||||
|
||||
If your cluster is having timeouts during overload, check first if you are not making the overload situation worse through retries, and pay attention to the following:
|
||||
|
||||
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
|
||||
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults.
|
||||
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
|
||||
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults.
|
||||
* Make sure the server neither runs speculative retry nor runs it based on percentiles (as those can fluctuate aggressively). Server-side speculative retries are a per-table setting that can be changed with the ALTER TABLE command. See the :ref:`documentation <speculative-retry-options>` for details.
|
||||
|
||||
|
||||
|
||||
@@ -9,9 +9,19 @@ To ensure a successful upgrade, follow
|
||||
the :doc:`documented upgrade procedures <upgrade-guides/index>` tested by
|
||||
ScyllaDB. This means that:
|
||||
|
||||
* You should perform the upgrades consecutively - to each successive X.Y
|
||||
version, **without skipping any major or minor version**, unless there is
|
||||
a documented upgrade procedure to bypass a version.
|
||||
* You should follow the upgrade policy:
|
||||
|
||||
* Starting with version **2025.4**, upgrades can skip minor versions as long
|
||||
as they remain within the same major version (for example, upgrading directly
|
||||
from 2025.1 → 2025.4 is supported).
|
||||
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
|
||||
each successive X.Y version must be installed in order, **without skipping
|
||||
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3
|
||||
is not supported).
|
||||
* You cannot skip major versions. Upgrades must move from one major version to
|
||||
the next using the documented major-version upgrade path.
|
||||
* You should upgrade to a supported version of ScyllaDB.
|
||||
See `ScyllaDB Version Support <https://docs.scylladb.com/stable/versioning/version-support.html>`_.
|
||||
* Before you upgrade to the next version, the whole cluster (each node) must
|
||||
be upgraded to the previous version.
|
||||
* You cannot perform an upgrade by replacing the nodes in the cluster with new
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 28 KiB |
@@ -1,141 +0,0 @@
|
||||
=====================
|
||||
ScyllaDB CQL Drivers
|
||||
=====================
|
||||
|
||||
.. toctree::
|
||||
:titlesonly:
|
||||
:hidden:
|
||||
|
||||
scylla-python-driver
|
||||
scylla-java-driver
|
||||
scylla-go-driver
|
||||
scylla-gocqlx-driver
|
||||
scylla-cpp-driver
|
||||
scylla-rust-driver
|
||||
|
||||
ScyllaDB Drivers
|
||||
-----------------
|
||||
|
||||
The following ScyllaDB drivers are available:
|
||||
|
||||
* :doc:`Python Driver</using-scylla/drivers/cql-drivers/scylla-python-driver>`
|
||||
* :doc:`Java Driver </using-scylla/drivers/cql-drivers/scylla-java-driver>`
|
||||
* :doc:`Go Driver </using-scylla/drivers/cql-drivers/scylla-go-driver>`
|
||||
* :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
|
||||
* :doc:`C++ Driver </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
|
||||
* `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
|
||||
* :doc:`Rust Driver </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
|
||||
* `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
|
||||
|
||||
We recommend using ScyllaDB drivers. All ScyllaDB drivers are shard-aware and provide additional
|
||||
benefits over third-party drivers.
|
||||
|
||||
ScyllaDB supports the CQL binary protocol version 3, so any Apache Cassandra/CQL driver that implements
|
||||
the same version works with ScyllaDB.
|
||||
|
||||
CDC Integration with ScyllaDB Drivers
|
||||
-------------------------------------------
|
||||
|
||||
The following table specifies which ScyllaDB drivers include a library for
|
||||
:doc:`CDC </features/cdc/cdc-intro>`.
|
||||
|
||||
.. list-table::
|
||||
:widths: 40 60
|
||||
:header-rows: 1
|
||||
|
||||
* - ScyllaDB Driver
|
||||
- CDC Connector
|
||||
* - :doc:`Python </using-scylla/drivers/cql-drivers/scylla-python-driver>`
|
||||
- |x|
|
||||
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
|
||||
- |v|
|
||||
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
|
||||
- |v|
|
||||
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
|
||||
- |x|
|
||||
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
|
||||
- |x|
|
||||
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
|
||||
- |x|
|
||||
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
|
||||
- |v|
|
||||
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
|
||||
- |x|
|
||||
|
||||
Support for Tablets
|
||||
-------------------------
|
||||
|
||||
The following table specifies which ScyllaDB drivers support
|
||||
:doc:`tablets </architecture/tablets>` and since which version.
|
||||
|
||||
.. list-table::
|
||||
:widths: 30 35 35
|
||||
:header-rows: 1
|
||||
|
||||
* - ScyllaDB Driver
|
||||
- Support for Tablets
|
||||
- Since Version
|
||||
* - :doc:`Python</using-scylla/drivers/cql-drivers/scylla-python-driver>`
|
||||
- |v|
|
||||
- 3.26.5
|
||||
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
|
||||
- |v|
|
||||
- 4.18.0 (Java Driver 4.x)
|
||||
|
||||
3.11.5.2 (Java Driver 3.x)
|
||||
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
|
||||
- |v|
|
||||
- 1.13.0
|
||||
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
|
||||
- |x|
|
||||
- N/A
|
||||
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
|
||||
- |x|
|
||||
- N/A
|
||||
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
|
||||
- |v|
|
||||
- All versions
|
||||
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
|
||||
- |v|
|
||||
- 0.13.0
|
||||
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
|
||||
- |v|
|
||||
- All versions
|
||||
|
||||
Driver Support Policy
|
||||
-------------------------------
|
||||
|
||||
We support the **two most recent minor releases** of our drivers.
|
||||
|
||||
* We test and validate the latest two minor versions.
|
||||
* We typically patch only the latest minor release.
|
||||
|
||||
We recommend staying up to date with the latest supported versions to receive
|
||||
updates and fixes.
|
||||
|
||||
At a minimum, upgrade your driver when upgrading to a new ScyllaDB version
|
||||
to ensure compatibility between the driver and the database.
|
||||
|
||||
Third-party Drivers
|
||||
----------------------
|
||||
|
||||
You can find the third-party driver documentation on the GitHub pages for each driver:
|
||||
|
||||
* `DataStax Java Driver <https://github.com/datastax/java-driver/>`_
|
||||
* `DataStax Python Driver <https://github.com/datastax/python-driver/>`_
|
||||
* `DataStax C# Driver <https://github.com/datastax/csharp-driver/>`_
|
||||
* `DataStax Ruby Driver <https://github.com/datastax/ruby-driver/>`_
|
||||
* `DataStax Node.js Driver <https://github.com/datastax/nodejs-driver/>`_
|
||||
* `DataStax C++ Driver <https://github.com/datastax/cpp-driver/>`_
|
||||
* `DataStax PHP Driver (Supported versions: 7.1) <https://github.com/datastax/php-driver>`_
|
||||
* `He4rt PHP Driver (Supported versions: 8.1 and 8.2) <https://github.com/he4rt/scylladb-php-driver/>`_
|
||||
* `Scala Phantom Project <https://github.com/outworkers/phantom>`_
|
||||
* `Xandra Elixir Driver <https://github.com/lexhide/xandra>`_
|
||||
* `Exandra Elixir Driver <https://github.com/vinniefranco/exandra>`_
|
||||
|
||||
Learn about ScyllaDB Drivers on ScyllaDB University
|
||||
----------------------------------------------------
|
||||
|
||||
The free `Using ScyllaDB Drivers course <https://university.scylladb.com/courses/using-scylla-drivers/>`_
|
||||
on ScyllaDB University covers the use of drivers in multiple languages to interact with a ScyllaDB
|
||||
cluster. The languages covered include Java, CPP, Rust, Golang, Python, Node.JS, Scala, and others.
|
||||
@@ -1,16 +0,0 @@
|
||||
===================
|
||||
ScyllaDB C++ Driver
|
||||
===================
|
||||
|
||||
The ScyllaDB C++ driver is a modern, feature-rich and **shard-aware** C/C++ client library for ScyllaDB using exclusively Cassandra’s binary protocol and Cassandra Query Language v3.
|
||||
This driver is forked from Datastax cpp-driver.
|
||||
|
||||
Read the `documentation <https://cpp-driver.docs.scylladb.com>`_ to get started or visit the Github project `ScyllaDB C++ driver <https://github.com/scylladb/cpp-driver>`_.
|
||||
|
||||
|
||||
More Information
|
||||
----------------
|
||||
|
||||
* `C++ Driver Documentation <https://cpp-driver.docs.scylladb.com>`_
|
||||
* `C/C++ Driver course at ScyllaDB University <https://university.scylladb.com/courses/using-scylla-drivers/lessons/cpp-driver-part-1/>`_
|
||||
* `Blog: A Shard-Aware ScyllaDB C/C++ Driver <https://www.scylladb.com/2021/03/18/a-shard-aware-scylla-c-c-driver/>`_
|
||||
@@ -1,28 +0,0 @@
|
||||
==================
|
||||
ScyllaDB Go Driver
|
||||
==================
|
||||
|
||||
The `ScyllaDB Go driver <https://github.com/scylladb/gocql>`_ is shard aware and contains extensions for a tokenAwareHostPolicy supported by ScyllaDB 2.3 and onwards.
|
||||
It is is a fork of the `GoCQL Driver <https://github.com/gocql/gocql>`_ but has been enhanced with capabilities that take advantage of ScyllaDB's unique architecture.
|
||||
Using this policy, the driver can select a connection to a particular shard based on the shard’s token.
|
||||
As a result, latency is significantly reduced because there is no need to pass data between the shards.
|
||||
|
||||
The protocol extension spec is `available here <https://github.com/scylladb/scylla/blob/master/docs/dev/protocol-extensions.md>`_.
|
||||
The ScyllaDB Go Driver is a drop-in replacement for gocql.
|
||||
As such, no code changes are needed to use this driver.
|
||||
All you need to do is rebuild using the ``replace`` directive in your ``mod`` file.
|
||||
|
||||
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/gocql>`_.
|
||||
|
||||
|
||||
Using CDC with Go
|
||||
-----------------
|
||||
|
||||
When writing applications, you can now use our `Go Library <https://github.com/scylladb/scylla-cdc-go>`_ to simplify writing applications that read from ScyllaDB CDC.
|
||||
|
||||
More information
|
||||
----------------
|
||||
|
||||
* `ScyllaDB Gocql Driver project page on GitHub <https://github.com/scylladb/gocql>`_ - contains the source code as well as a readme and documentation files.
|
||||
* `ScyllaDB University: Golang and ScyllaDB <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-1/>`_
|
||||
A three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Gocql driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Go application.
|
||||
@@ -1,16 +0,0 @@
|
||||
=========================
|
||||
ScyllaDB Gocql Extension
|
||||
=========================
|
||||
|
||||
The ScyllaDB Gocqlx is an extension to gocql that provides usability features.
|
||||
With gocqlx, you can bind the query parameters from maps and structs, use named query parameters (``:identifier``), and scan the query results into structs and slices.
|
||||
The driver includes a fluent and flexible CQL query builder and a database migrations module.
|
||||
|
||||
|
||||
|
||||
More information
|
||||
----------------
|
||||
|
||||
* `ScyllaDB Gocqlx Driver project page on GitHub <https://github.com/scylladb/gocqlx>`_ - contains the source code as well as a readme and documentation files.
|
||||
* `ScyllaDB University: Golang and ScyllaDB Part 3 – GoCQLX <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-3-gocqlx/>`_ - part three of the Golang three-part course which focuses on how to create a sample Go application that executes a few basic CQL statements with a ScyllaDB cluster using the GoCQLX package
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
=====================
|
||||
ScyllaDB Java Driver
|
||||
=====================
|
||||
|
||||
ScyllaDB Java Driver is forked from `DataStax Java Driver <https://github.com/datastax/java-driver>`_ with enhanced capabilities, taking advantage of ScyllaDB's unique architecture.
|
||||
|
||||
The ScyllaDB Java driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
|
||||
Using this policy, the driver can select a connection to a particular shard based on the shard’s token.
|
||||
As a result, latency is significantly reduced because there is no need to pass data between the shards.
|
||||
|
||||
Use the ScyllaDB Java driver for better compatibility and support for ScyllaDB with Java-based applications.
|
||||
|
||||
Read the `documentation <https://java-driver.docs.scylladb.com/>`_ to get started or visit the `Github project <https://github.com/scylladb/java-driver>`_.
|
||||
|
||||
The driver architecture is based on layers. At the bottom lies the driver core.
|
||||
This core handles everything related to the connections to a ScyllaDB cluster (for example, connection pool, discovering new nodes, etc.) and exposes a simple, relatively low-level API on top of which higher-level layers can be built.
|
||||
|
||||
The ScyllaDB Java Driver is a drop-in replacement for the DataStax Java Driver.
|
||||
As such, no code changes are needed to use this driver.
|
||||
|
||||
Using CDC with Java
|
||||
-------------------
|
||||
|
||||
When writing applications, you can now use our `Java Library <https://github.com/scylladb/scylla-cdc-java>`_ to simplify writing applications that read from ScyllaDB CDC.
|
||||
|
||||
More information
|
||||
----------------
|
||||
* `ScyllaDB Java Driver Docs <https://java-driver.docs.scylladb.com/>`_
|
||||
* `ScyllaDB Java Driver project page on GitHub <https://github.com/scylladb/java-driver/>`_ - Source Code
|
||||
* `ScyllaDB University: Coding with Java <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-java-part-1/>`_ - a three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Java driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Java application.
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
======================
|
||||
ScyllaDB Python Driver
|
||||
======================
|
||||
|
||||
The ScyllaDB Python driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
|
||||
Using this policy, the driver can select a connection to a particular shard based on the shard’s token.
|
||||
As a result, latency is significantly reduced because there is no need to pass data between the shards.
|
||||
|
||||
Read the `documentation <https://python-driver.docs.scylladb.com/>`_ to get started or visit the Github project `ScyllaDB Python driver <https://github.com/scylladb/python-driver/>`_.
|
||||
|
||||
As the ScyllaDB Python Driver is a drop-in replacement for DataStax Python Driver, no code changes are needed to use the driver.
|
||||
Use the ScyllaDB Python driver for better compatibility and support for ScyllaDB with Python-based applications.
|
||||
|
||||
|
||||
More information
|
||||
----------------
|
||||
|
||||
* `ScyllaDB Python Driver Documentation <https://python-driver.docs.scylladb.com/>`_
|
||||
* `ScyllaDB Python Driver on GitHub <https://github.com/scylladb/python-driver/>`_
|
||||
* `ScyllaDB University: Coding with Python <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-python/>`_
|
||||
@@ -1,24 +0,0 @@
|
||||
=====================
|
||||
ScyllaDB Rust Driver
|
||||
=====================
|
||||
|
||||
The ScyllaDB Rust driver is a client-side, shard-aware driver written in pure Rust with a fully async API using Tokio.
|
||||
Optimized for ScyllaDB, the driver is also compatible with Apache Cassandra®.
|
||||
|
||||
|
||||
.. image:: ./images/monster-rust.png
|
||||
:width: 150pt
|
||||
|
||||
|
||||
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/scylla-rust-driver>`_.
|
||||
|
||||
Read the `Documentation <https://rust-driver.docs.scylladb.com>`_.
|
||||
|
||||
Using CDC with Rust
|
||||
----------------------
|
||||
|
||||
When writing applications, you can use ScyllaDB's `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_
|
||||
to simplify writing applications that read from ScyllaDB's CDC.
|
||||
|
||||
Use `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_ to read
|
||||
:doc:`ScyllaDB's CDC </features/cdc/index>` update streams.
|
||||
@@ -1,9 +0,0 @@
|
||||
========================
|
||||
AWS DynamoDB Drivers
|
||||
========================
|
||||
|
||||
|
||||
|
||||
|
||||
ScyllaDB AWS DynamoDB Compatible API can be used with any AWS DynamoDB Driver.
|
||||
For a list of AWS AWS DynamoDB drivers see `here <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStarted.html>`_
|
||||
@@ -1,21 +0,0 @@
|
||||
================
|
||||
ScyllaDB Drivers
|
||||
================
|
||||
|
||||
.. toctree::
|
||||
:titlesonly:
|
||||
:hidden:
|
||||
|
||||
ScyllaDB CQL Drivers <cql-drivers/index>
|
||||
ScyllaDB DynamoDB Drivers <dynamo-drivers/index>
|
||||
|
||||
|
||||
|
||||
You can use ScyllaDB with:
|
||||
|
||||
* :doc:`Apache Cassandra CQL Compatible Drivers <cql-drivers/index>`
|
||||
* :doc:`Amazon DynamoDB Compatible API Drivers <dynamo-drivers/index>`
|
||||
|
||||
Additional drivers coming soon!
|
||||
|
||||
If you are looking for a ScyllaDB Integration Solution or a Connector, refer to :doc:`ScyllaDB Integrations </using-scylla/integrations/index>`.
|
||||
@@ -9,7 +9,7 @@ ScyllaDB for Developers
|
||||
Tutorials and Example Projects <https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html>
|
||||
Learn to Use ScyllaDB <https://docs.scylladb.com/stable/get-started/learn-resources/index.html>
|
||||
ScyllaDB Alternator <alternator/index>
|
||||
ScyllaDB Drivers <drivers/index>
|
||||
ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>
|
||||
|
||||
|
||||
.. panel-box::
|
||||
@@ -26,7 +26,7 @@ ScyllaDB for Developers
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` - ScyllaDB and third-party drivers for CQL and DynamoDB
|
||||
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ - ScyllaDB and third-party drivers for CQL and DynamoDB
|
||||
* :doc:`ScyllaDB Alternator </using-scylla/alternator/index>` - The Open Source DynamoDB-compatible API
|
||||
* :doc:`CQL Reference </cql/index>` - Reference for the Apache Cassandra Query Language (CQL) and its ScyllaDB extensions
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ ScyllaDB Integrations and Connectors
|
||||
:class: my-panel
|
||||
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`).
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
|
||||
Any application which uses a CQL driver will work with ScyllaDB.
|
||||
|
||||
The list below contains links to integration projects using ScyllaDB with third-party projects.
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
Integrate ScyllaDB with Databricks
|
||||
==================================
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
|
||||
|
||||
Resource list
|
||||
-------------
|
||||
|
||||
@@ -3,7 +3,7 @@ Integrate ScyllaDB with Elasticsearch
|
||||
=====================================
|
||||
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
|
||||
|
||||
The list below contains integration projects using ScyllaDB with Elasticsearch. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.
|
||||
|
||||
|
||||
@@ -13,11 +13,11 @@ The Jaeger Query service offers a web-based UI and API for users to explore, vis
|
||||
Jaeger also supports integration with other observability tools like Prometheus and Grafana,
|
||||
making it a popular choice for monitoring modern distributed applications.
|
||||
|
||||
Jaeger Server `can also be run <https://github.com/jaegertracing/jaeger/tree/main/plugin/storage/scylladb>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
|
||||
Jaeger Server `can also be run <https://www.jaegertracing.io/docs/2.11/storage/cassandra/#compatible-backends>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
|
||||
As a drop-in replacement for Cassandra, ScyllaDB implements the same protocol and provides a high-performance,
|
||||
low-latency alternative. This compatibility allows Jaeger users to easily switch to ScyllaDB without making significant changes to their setup.
|
||||
|
||||
Using ScyllaDB as the storage backend for Jaeger Server can offer additional benefits,
|
||||
such as improved performance, scalability, and resource efficiency.
|
||||
This makes Jaeger even more effective for monitoring and troubleshooting distributed applications,
|
||||
especially in high-traffic, demanding environments where a high-performance storage solution is critical.
|
||||
especially in high-traffic, demanding environments where a high-performance storage solution is critical.
|
||||
|
||||
@@ -3,7 +3,7 @@ Integrate ScyllaDB with Spark
|
||||
=============================
|
||||
|
||||
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
|
||||
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
|
||||
|
||||
The list below contains integration projects using ScyllaDB with Spark. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/http.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/base64.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
@@ -267,7 +268,6 @@ std::tuple<std::string, std::string> azure_host::impl::parse_key(std::string_vie
|
||||
|
||||
std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std::string_view vault) {
|
||||
static const boost::regex vault_name_re(R"([a-zA-Z0-9-]+)");
|
||||
static const boost::regex vault_endpoint_re(R"((https?)://([^/:]+)(?::(\d+))?)");
|
||||
|
||||
boost::smatch match;
|
||||
std::string tmp{vault};
|
||||
@@ -277,16 +277,12 @@ std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std
|
||||
return {"https", fmt::format(AKV_HOST_TEMPLATE, vault), 443};
|
||||
}
|
||||
|
||||
if (boost::regex_match(tmp, match, vault_endpoint_re)) {
|
||||
std::string scheme = match[1];
|
||||
std::string host = match[2];
|
||||
std::string port_str = match[3];
|
||||
|
||||
unsigned port = (port_str.empty()) ? (scheme == "https" ? 443 : 80) : std::stoi(port_str);
|
||||
return {scheme, host, port};
|
||||
try {
|
||||
auto info = utils::http::parse_simple_url(tmp);
|
||||
return {info.scheme, info.host, info.port};
|
||||
} catch (...) {
|
||||
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault)));
|
||||
}
|
||||
|
||||
throw std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault));
|
||||
}
|
||||
|
||||
future<shared_ptr<tls::certificate_credentials>> azure_host::impl::make_creds() {
|
||||
|
||||
@@ -816,6 +816,7 @@ public:
|
||||
future<data_sink> wrap_sink(const sstables::sstable& sst, sstables::component_type type, data_sink sink) override {
|
||||
switch (type) {
|
||||
case sstables::component_type::Scylla:
|
||||
case sstables::component_type::TemporaryScylla:
|
||||
case sstables::component_type::TemporaryTOC:
|
||||
case sstables::component_type::TOC:
|
||||
co_return sink;
|
||||
@@ -844,6 +845,7 @@ public:
|
||||
sstables::component_type type,
|
||||
data_source src) override {
|
||||
switch (type) {
|
||||
case sstables::component_type::TemporaryScylla:
|
||||
case sstables::component_type::Scylla:
|
||||
case sstables::component_type::TemporaryTOC:
|
||||
case sstables::component_type::TOC:
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "encryption_exceptions.hh"
|
||||
#include "symmetric_key.hh"
|
||||
#include "utils.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/UUID.hh"
|
||||
@@ -163,6 +164,8 @@ private:
|
||||
shared_ptr<seastar::tls::certificate_credentials> _creds;
|
||||
std::unordered_map<bytes, shared_ptr<symmetric_key>> _cache;
|
||||
bool _initialized = false;
|
||||
|
||||
abort_source _as;
|
||||
};
|
||||
|
||||
template<typename T, typename C>
|
||||
@@ -251,24 +254,50 @@ future<rjson::value> encryption::gcp_host::impl::gcp_auth_post_with_retry(std::s
|
||||
|
||||
auto& creds = i->second;
|
||||
|
||||
int retries = 0;
|
||||
static constexpr auto max_retries = 10;
|
||||
|
||||
for (;;) {
|
||||
try {
|
||||
co_await creds.refresh(KMS_SCOPE, _certs);
|
||||
} catch (...) {
|
||||
std::throw_with_nested(permission_error("Error refreshing credentials"));
|
||||
exponential_backoff_retry exr(10ms, 10000ms);
|
||||
bool do_backoff = false;
|
||||
bool did_auth_retry = false;
|
||||
|
||||
for (int retry = 0; ; ++retry) {
|
||||
if (std::exchange(do_backoff, false)) {
|
||||
co_await exr.retry(_as);
|
||||
}
|
||||
|
||||
bool refreshing = true;
|
||||
|
||||
try {
|
||||
co_await creds.refresh(KMS_SCOPE, _certs);
|
||||
refreshing = false;
|
||||
|
||||
auto res = co_await send_request(uri, _certs, body, httpd::operation_type::POST, key_values({
|
||||
{ utils::gcp::AUTHORIZATION, utils::gcp::format_bearer(creds.token) },
|
||||
}));
|
||||
}), &_as);
|
||||
co_return res;
|
||||
} catch (httpd::unexpected_status_error& e) {
|
||||
gcp_log.debug("{}: Got unexpected response: {}", uri, e.status());
|
||||
if (e.status() == http::reply::status_type::unauthorized && retries++ < 3) {
|
||||
// refresh access token and retry.
|
||||
switch (e.status()) {
|
||||
default:
|
||||
if (http::reply::classify_status(e.status()) != http::reply::status_class::server_error) {
|
||||
break;
|
||||
}
|
||||
[[fallthrough]];
|
||||
case httpclient::reply_status::request_timeout:
|
||||
if (retry < max_retries) {
|
||||
// service unavailable etc -> backoff + retry
|
||||
do_backoff = true;
|
||||
did_auth_retry = false; // reset this, since we might cause expiration due to backoff (not really, but...)
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (refreshing) {
|
||||
std::throw_with_nested(permission_error("Error refreshing credentials"));
|
||||
}
|
||||
if (e.status() == http::reply::status_type::unauthorized && retry < max_retries && !did_auth_retry) {
|
||||
// refresh access token and retry. no backoff
|
||||
did_auth_retry = true;
|
||||
continue;
|
||||
}
|
||||
if (e.status() == http::reply::status_type::unauthorized) {
|
||||
@@ -322,6 +351,7 @@ future<> encryption::gcp_host::impl::init() {
|
||||
}
|
||||
|
||||
future<> encryption::gcp_host::impl::stop() {
|
||||
_as.request_abort();
|
||||
co_await _attr_cache.stop();
|
||||
co_await _id_cache.stop();
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "utils/http.hh"
|
||||
#include "marshal_exception.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
@@ -322,17 +323,26 @@ future<> kmip_host::impl::connection::connect() {
|
||||
f = f.then([this, cred] {
|
||||
return cred->set_x509_trust_file(_options.truststore, seastar::tls::x509_crt_format::PEM);
|
||||
});
|
||||
} else {
|
||||
f = f.then([cred] {
|
||||
return cred->set_system_trust();
|
||||
});
|
||||
}
|
||||
return f.then([this, cred] {
|
||||
// TODO, find if we should do hostname verification
|
||||
// TODO: connect all failovers already?
|
||||
|
||||
auto i = _host.find_last_of(':');
|
||||
auto name = _host.substr(0, i);
|
||||
auto port = i != sstring::npos ? std::stoul(_host.substr(i + 1)) : kmip_port;
|
||||
// Use the URL parser to handle ipv6 etc proper.
|
||||
// Turn host arg into a URL.
|
||||
auto info = utils::http::parse_simple_url("kmip://" + _host);
|
||||
auto name = info.host;
|
||||
auto port = info.port != 80 ? info.port : kmip_port;
|
||||
|
||||
return seastar::net::dns::resolve_name(name).then([this, cred, port](seastar::net::inet_address addr) {
|
||||
return seastar::tls::connect(cred, seastar::ipv4_addr{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
|
||||
return seastar::net::dns::resolve_name(name).then([this, cred, port, name](seastar::net::inet_address addr) {
|
||||
kmip_log.debug("Try connect {}:{}", addr, port);
|
||||
// TODO: should we verify non-numeric hosts here? (opts.server_name)
|
||||
// Adding this might break existing users with half-baked certs.
|
||||
return seastar::tls::connect(cred, seastar::socket_address{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
|
||||
kmip_log.debug("Successfully connected {}", _host);
|
||||
// #998 Set keepalive to try avoiding connection going stale in between commands.
|
||||
s.set_keepalive_parameters(net::tcp_keepalive_params{60s, 60s, 10});
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/http.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "utils/rjson.hh"
|
||||
@@ -151,15 +152,10 @@ public:
|
||||
{
|
||||
// check if we have an explicit endpoint set.
|
||||
if (!_options.endpoint.empty()) {
|
||||
static std::regex simple_url(R"foo((https?):\/\/(?:([\w\.]+)|\[([\w:]+)\]):?(\d+)?\/?)foo");
|
||||
std::transform(_options.endpoint.begin(), _options.endpoint.end(), _options.endpoint.begin(), ::tolower);
|
||||
std::smatch m;
|
||||
if (!std::regex_match(_options.endpoint, m, simple_url)) {
|
||||
throw std::invalid_argument(fmt::format("Could not parse URL: {}", _options.endpoint));
|
||||
}
|
||||
_options.https = m[1].str() == "https";
|
||||
_options.host = m[2].length() > 0 ? m[2].str() : m[3].str();
|
||||
_options.port = m[4].length() > 0 ? std::stoi(m[4].str()) : 0;
|
||||
auto info = utils::http::parse_simple_url(_options.endpoint);
|
||||
_options.https = info.is_https();
|
||||
_options.host = info.host;
|
||||
_options.port = info.port;
|
||||
}
|
||||
if (_options.endpoint.empty() && _options.host.empty() && _options.aws_region.empty() && !_options.aws_use_ec2_region) {
|
||||
throw std::invalid_argument("No AWS region or endpoint specified");
|
||||
|
||||
@@ -55,6 +55,7 @@ debian_base_packages=(
|
||||
librapidxml-dev
|
||||
libcrypto++-dev
|
||||
libxxhash-dev
|
||||
zlib1g-dev
|
||||
slapd
|
||||
ldap-utils
|
||||
libcpp-jwt-dev
|
||||
@@ -117,6 +118,7 @@ fedora_packages=(
|
||||
makeself
|
||||
libzstd-static libzstd-devel
|
||||
lz4-static lz4-devel
|
||||
zlib-ng-compat-devel
|
||||
rpm-build
|
||||
devscripts
|
||||
debhelper
|
||||
|
||||
13
install.sh
13
install.sh
@@ -157,6 +157,7 @@ adjust_bin() {
|
||||
export GNUTLS_SYSTEM_PRIORITY_FILE="\${GNUTLS_SYSTEM_PRIORITY_FILE-$prefix/libreloc/gnutls.config}"
|
||||
export LD_LIBRARY_PATH="$prefix/libreloc"
|
||||
export UBSAN_OPTIONS="${UBSAN_OPTIONS:+$UBSAN_OPTIONS:}suppressions=$prefix/libexec/ubsan-suppressions.supp"
|
||||
${p11_trust_paths:+export SCYLLA_P11_TRUST_PATHS="$p11_trust_paths"}
|
||||
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
|
||||
EOF
|
||||
chmod 755 "$root/$prefix/bin/$bin"
|
||||
@@ -330,7 +331,6 @@ if ! $nonroot; then
|
||||
rsysconfdir=$(realpath -m "$root/$sysconfdir")
|
||||
rusr=$(realpath -m "$root/usr")
|
||||
rsystemd=$(realpath -m "$rusr/lib/systemd/system")
|
||||
rshare="$rprefix/share"
|
||||
rdoc="$rprefix/share/doc"
|
||||
rdata=$(realpath -m "$root/var/lib/scylla")
|
||||
rhkdata=$(realpath -m "$root/var/lib/scylla-housekeeping")
|
||||
@@ -338,7 +338,6 @@ else
|
||||
retc="$rprefix/etc"
|
||||
rsysconfdir="$rprefix/$sysconfdir"
|
||||
rsystemd="$HOME/.config/systemd/user"
|
||||
rshare="$rprefix/share"
|
||||
rdoc="$rprefix/share/doc"
|
||||
rdata="$rprefix"
|
||||
fi
|
||||
@@ -522,16 +521,6 @@ PRODUCT="$product"
|
||||
EOS
|
||||
chmod 644 "$rprefix"/scripts/scylla_product.py
|
||||
|
||||
install -d -m755 "$rshare"/p11-kit/modules
|
||||
cat << EOS > "$rshare"/p11-kit/modules/p11-kit-trust.module
|
||||
module: $prefix/libreloc/pkcs11/p11-kit-trust.so
|
||||
priority: 1
|
||||
trust-policy: yes
|
||||
x-trust-lookup: pkcs11:library-description=PKCS%2311%20Kit%20Trust%20Module
|
||||
disable-in: p11-kit-proxy
|
||||
x-init-reserved: paths=$p11_trust_paths
|
||||
EOS
|
||||
|
||||
if ! $nonroot && ! $without_systemd; then
|
||||
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
|
||||
install -m644 dist/common/systemd/scylla-server.service.d/dependencies.conf -Dt "$retc"/systemd/system/scylla-server.service.d
|
||||
|
||||
56
main.cc
56
main.cc
@@ -10,6 +10,8 @@
|
||||
#include <functional>
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
#include <gnutls/pkcs11.h>
|
||||
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "db/view/view_building_worker.hh"
|
||||
@@ -118,15 +120,11 @@
|
||||
#include "message/dictionary_service.hh"
|
||||
#include "sstable_dict_autotrainer.hh"
|
||||
#include "utils/disk_space_monitor.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "tools/utils.hh"
|
||||
|
||||
|
||||
#define P11_KIT_FUTURE_UNSTABLE_API
|
||||
extern "C" {
|
||||
#include <p11-kit/p11-kit.h>
|
||||
}
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
#include <seastar/core/metrics_api.hh>
|
||||
#include <seastar/core/relabel_config.hh>
|
||||
@@ -708,14 +706,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
print_starting_message(ac, av, parsed_opts);
|
||||
}
|
||||
|
||||
// We have to override p11-kit config path before p11-kit initialization.
|
||||
// And the initialization will invoke on seastar initialization, so it has to
|
||||
// be before app.run()
|
||||
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe"));
|
||||
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
|
||||
auto p11_modules_str = p11_modules.string<char>();
|
||||
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
|
||||
|
||||
sharded<locator::shared_token_metadata> token_metadata;
|
||||
sharded<locator::effective_replication_map_factory> erm_factory;
|
||||
sharded<service::migration_notifier> mm_notifier;
|
||||
@@ -727,6 +717,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
|
||||
service::load_meter load_meter;
|
||||
sharded<service::storage_proxy> proxy;
|
||||
sharded<auth::cache> auth_cache;
|
||||
sharded<service::storage_service> ss;
|
||||
sharded<service::migration_manager> mm;
|
||||
sharded<tasks::task_manager> task_manager;
|
||||
@@ -789,7 +780,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return seastar::async([&app, cfg, ext, &disk_space_monitor_shard0, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
|
||||
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
|
||||
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
|
||||
&repair, &sst_loader, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
|
||||
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
|
||||
&hashing_worker, &vector_store_client] {
|
||||
try {
|
||||
if (opts.contains("relabel-config-file") && !opts["relabel-config-file"].as<sstring>().empty()) {
|
||||
@@ -1802,6 +1793,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
api::unset_server_stream_manager(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "starting auth cache");
|
||||
auth_cache.start(std::ref(qp)).get();
|
||||
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [&] {
|
||||
auth_cache.stop().get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing storage service");
|
||||
debug::the_storage_service = &ss;
|
||||
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
|
||||
@@ -1810,6 +1807,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
std::ref(messaging), std::ref(repair),
|
||||
std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch),
|
||||
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(view_builder), std::ref(view_building_worker), std::ref(qp), std::ref(sl_controller),
|
||||
std::ref(auth_cache),
|
||||
std::ref(tsm), std::ref(vbsm), std::ref(task_manager), std::ref(gossip_address_map),
|
||||
compression_dict_updated_callback,
|
||||
only_on_shard0(&*disk_space_monitor_shard0)
|
||||
@@ -1825,11 +1823,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
ss.stop().get();
|
||||
});
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
});
|
||||
|
||||
checkpoint(stop_signal, "initializing query processor remote part");
|
||||
// TODO: do this together with proxy.start_remote(...)
|
||||
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(mapreduce_service),
|
||||
@@ -2070,7 +2063,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
|
||||
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
|
||||
|
||||
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(hashing_worker)).get();
|
||||
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache), std::ref(hashing_worker)).get();
|
||||
|
||||
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
|
||||
|
||||
@@ -2184,6 +2177,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// This will also disable migration manager schema pulls if needed.
|
||||
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
});
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return messaging.invoke_on_all([&] (auto& ms) {
|
||||
return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) {
|
||||
@@ -2341,7 +2339,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
auth_config.authenticator_java_name = qualified_authenticator_name;
|
||||
auth_config.role_manager_java_name = qualified_role_manager_name;
|
||||
|
||||
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(hashing_worker)).get();
|
||||
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache), std::ref(hashing_worker)).get();
|
||||
|
||||
std::any stop_auth_service;
|
||||
// Has to be called after node joined the cluster (join_cluster())
|
||||
@@ -2687,13 +2685,15 @@ int main(int ac, char** av) {
|
||||
// #3583 - need to potentially ensure this for tools as well, since at least
|
||||
// sstable* might need crypto libraries.
|
||||
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe")); // could just be argv[0] I guess...
|
||||
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
|
||||
// Note: must be in scope for application lifetime. p11_kit_override_system_files does _not_
|
||||
// copy input strings.
|
||||
auto p11_modules_str = p11_modules.string<char>();
|
||||
// #3392 only do this if we are actually packaged and the path exists.
|
||||
if (fs::exists(p11_modules)) {
|
||||
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
|
||||
auto p11_trust_paths_from_env = std::getenv("SCYLLA_P11_TRUST_PATHS");
|
||||
auto trust_module_path = scylla_path.parent_path().parent_path().append("libreloc/pkcs11/p11-kit-trust.so");
|
||||
if (fs::exists(trust_module_path) && p11_trust_paths_from_env) {
|
||||
gnutls_pkcs11_init(GNUTLS_PKCS11_FLAG_MANUAL, nullptr);
|
||||
auto trust_config = fmt::format("p11-kit:paths={} trusted=yes", p11_trust_paths_from_env);
|
||||
auto ret = gnutls_pkcs11_add_provider(trust_module_path.string().c_str(), trust_config.c_str());
|
||||
if (ret != GNUTLS_E_SUCCESS) {
|
||||
startlog.warn("Could not initialize p11-kit trust module: {}\n", gnutls_strerror(ret));
|
||||
}
|
||||
}
|
||||
|
||||
return main_func(ac, av);
|
||||
|
||||
@@ -46,7 +46,7 @@ bool follower_progress::is_stray_reject(const append_reply::rejected& rejected)
|
||||
// any reject during snapshot transfer is stray one
|
||||
return true;
|
||||
default:
|
||||
SCYLLA_ASSERT(false);
|
||||
scylla_assert(false, "invalid follower_progress state: {}", static_cast<int>(state));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@@ -87,7 +87,7 @@ bool follower_progress::can_send_to() {
|
||||
// before starting to sync the log.
|
||||
return false;
|
||||
}
|
||||
SCYLLA_ASSERT(false);
|
||||
scylla_assert(false, "invalid follower_progress state in can_send_to: {}", static_cast<int>(state));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -2329,11 +2329,7 @@ future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_m
|
||||
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now()- start);
|
||||
rlogger.info("repair[{}]: Finished tablet repair for table={}.{} range={} duration={} replicas={} global_tablet_id={} flush_time={}",
|
||||
id.uuid(), keyspace_name, table_name, range, duration, replicas, gid, flush_time);
|
||||
if (!flush_time.has_value()) {
|
||||
throw std::runtime_error(format("Batchlog reply failed for table={}.{} range={} replicas={} global_tablet_id={}",
|
||||
id.uuid(), keyspace_name, table_name, range, replicas, gid));
|
||||
}
|
||||
co_return flush_time.value();
|
||||
co_return flush_time;
|
||||
}
|
||||
|
||||
tasks::is_user_task repair::tablet_repair_task_impl::is_user_task() const noexcept {
|
||||
@@ -2410,9 +2406,11 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
});
|
||||
|
||||
auto parent_shard = this_shard_id();
|
||||
std::vector<std::optional<gc_clock::time_point>> flush_times(smp::count, gc_clock::time_point{});
|
||||
rs.container().invoke_on_all([&idx, &flush_times, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<> {
|
||||
auto flush_time = _flush_time;
|
||||
auto res = rs.container().map_reduce0([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<std::pair<gc_clock::time_point, bool>> {
|
||||
std::exception_ptr error;
|
||||
gc_clock::time_point shard_flush_time;
|
||||
bool flush_failed = false;
|
||||
for (auto& m : metas) {
|
||||
if (m.master_shard_id != this_shard_id()) {
|
||||
continue;
|
||||
@@ -2466,27 +2464,24 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
error = std::move(ep);
|
||||
}
|
||||
}
|
||||
auto current = flush_times[this_shard_id()];
|
||||
if ((needs_flush_before_repair &&!hints_batchlog_flushed) || !current.has_value()) {
|
||||
flush_times[this_shard_id()] = std::nullopt;
|
||||
} else {
|
||||
auto time = task->get_flush_time();
|
||||
flush_times[this_shard_id()] = current == gc_clock::time_point() ? time : std::min(current.value(), time);
|
||||
}
|
||||
auto time = task->get_flush_time();
|
||||
shard_flush_time = shard_flush_time == gc_clock::time_point() ? time : std::min(shard_flush_time, time);
|
||||
flush_failed = flush_failed || (needs_flush_before_repair && !hints_batchlog_flushed);
|
||||
}
|
||||
if (error) {
|
||||
co_await coroutine::return_exception_ptr(std::move(error));
|
||||
}
|
||||
co_return std::make_pair(shard_flush_time, flush_failed);
|
||||
}, std::make_pair<gc_clock::time_point, bool>(std::move(flush_time), false), [] (const auto& p1, const auto& p2) {
|
||||
auto& [time1, failed1] = p1;
|
||||
auto& [time2, failed2] = p2;
|
||||
auto flush_time = time1 == gc_clock::time_point() ? time2 :
|
||||
(time2 == gc_clock::time_point() ? time1 : std::min(time1, time2));
|
||||
auto failed = failed1 || failed2;
|
||||
return std::make_pair(flush_time, failed);
|
||||
}).get();
|
||||
for (auto& time : flush_times) {
|
||||
if (!time.has_value()) {
|
||||
_flush_time = std::nullopt;
|
||||
break;
|
||||
}
|
||||
if (time != gc_clock::time_point()) {
|
||||
_flush_time = _flush_time == gc_clock::time_point() ? time : std::min(_flush_time.value(), time.value());
|
||||
}
|
||||
}
|
||||
_flush_time = res.first;
|
||||
_should_flush_and_flush_failed = res.second;
|
||||
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time);
|
||||
rlogger.info("repair[{}]: Finished user-requested repair for tablet keyspace={} tables={} repair_id={} tablets_repaired={} duration={}",
|
||||
id.uuid(), _keyspace, _tables, id.id, _metas.size(), duration);
|
||||
|
||||
@@ -2529,7 +2529,7 @@ future<repair_update_system_table_response> repair_service::repair_update_system
|
||||
}
|
||||
}
|
||||
if (req.range.end()) {
|
||||
if (!req.range.end()->is_inclusive()) {
|
||||
if (!req.range.end()->is_inclusive() && req.range.end()->value() != dht::maximum_token()) {
|
||||
is_valid_range = false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,8 @@ private:
|
||||
optimized_optional<abort_source::subscription> _abort_subscription;
|
||||
std::optional<int> _ranges_parallelism;
|
||||
size_t _metas_size = 0;
|
||||
std::optional<gc_clock::time_point> _flush_time = gc_clock::time_point();
|
||||
gc_clock::time_point _flush_time = gc_clock::time_point();
|
||||
bool _should_flush_and_flush_failed = false;
|
||||
service::frozen_topology_guard _topo_guard;
|
||||
bool _skip_flush;
|
||||
public:
|
||||
@@ -134,7 +135,12 @@ public:
|
||||
return tasks::is_abortable(!_abort_subscription);
|
||||
}
|
||||
|
||||
std::optional<gc_clock::time_point> get_flush_time() const { return _flush_time; }
|
||||
gc_clock::time_point get_flush_time() const {
|
||||
if (_should_flush_and_flush_failed) {
|
||||
throw std::runtime_error(fmt::format("Flush is needed for repair {} with parent {}, but failed", id(), _parent_id));
|
||||
}
|
||||
return _flush_time;
|
||||
}
|
||||
|
||||
tasks::is_user_task is_user_task() const noexcept override;
|
||||
virtual future<> release_resources() noexcept override;
|
||||
|
||||
@@ -3704,7 +3704,7 @@ future<utils::chunked_vector<temporary_buffer<char>>> database::sample_data_file
|
||||
}), std::ref(state));
|
||||
|
||||
// [1, 2, 3, 0] --> [0, 1, 3, 6]
|
||||
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), 0, std::plus());
|
||||
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), uint64_t(0), std::plus());
|
||||
|
||||
// We can't generate random non-negative integers smaller than 0,
|
||||
// so let's just deal with the `total_chunks == 0` case with an early return.
|
||||
|
||||
@@ -301,6 +301,7 @@ protected:
|
||||
class ghost_row_deleting_query_pager : public service::pager::query_pager {
|
||||
service::storage_proxy& _proxy;
|
||||
db::timeout_clock::duration _timeout_duration;
|
||||
size_t _concurrency;
|
||||
public:
|
||||
ghost_row_deleting_query_pager(schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
|
||||
service::query_state& state,
|
||||
@@ -309,10 +310,12 @@ public:
|
||||
dht::partition_range_vector ranges,
|
||||
cql3::cql_stats& stats,
|
||||
service::storage_proxy& proxy,
|
||||
db::timeout_clock::duration timeout_duration)
|
||||
db::timeout_clock::duration timeout_duration,
|
||||
size_t concurrency)
|
||||
: query_pager(proxy, s, selection, state, options, std::move(cmd), std::move(ranges), std::nullopt)
|
||||
, _proxy(proxy)
|
||||
, _timeout_duration(timeout_duration)
|
||||
, _concurrency(concurrency)
|
||||
{}
|
||||
virtual ~ghost_row_deleting_query_pager() {}
|
||||
|
||||
@@ -322,8 +325,12 @@ public:
|
||||
_query_read_repair_decision = qr.read_repair_decision;
|
||||
qr.query_result->ensure_counts();
|
||||
return seastar::async([this, query_result = std::move(qr.query_result), page_size, now] () mutable -> result<> {
|
||||
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration},
|
||||
std::exception_ptr ex;
|
||||
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration, _concurrency, ex},
|
||||
std::move(query_result), page_size, now);
|
||||
if (ex) {
|
||||
std::rethrow_exception(ex);
|
||||
}
|
||||
return bo::success();
|
||||
});
|
||||
}));
|
||||
@@ -503,7 +510,8 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
|
||||
dht::partition_range_vector ranges,
|
||||
cql3::cql_stats& stats,
|
||||
storage_proxy& proxy,
|
||||
db::timeout_clock::duration duration) {
|
||||
db::timeout_clock::duration duration,
|
||||
size_t concurrency) {
|
||||
return ::make_shared<ghost_row_deleting_query_pager>(std::move(s), std::move(selection), state,
|
||||
options, std::move(cmd), std::move(ranges), stats, proxy, duration);
|
||||
options, std::move(cmd), std::move(ranges), stats, proxy, duration, concurrency);
|
||||
}
|
||||
|
||||
@@ -47,7 +47,8 @@ public:
|
||||
dht::partition_range_vector,
|
||||
cql3::cql_stats& stats,
|
||||
storage_proxy& proxy,
|
||||
db::timeout_clock::duration timeout_duration);
|
||||
db::timeout_clock::duration timeout_duration,
|
||||
size_t concurrency);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#include "service/raft/group0_state_machine.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "mutation/atomic_cell.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
@@ -174,6 +175,7 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) {
|
||||
bool update_service_levels_effective_cache = false;
|
||||
bool make_view_building_state_transition = false;
|
||||
std::unordered_set<table_id> update_cdc_streams;
|
||||
std::unordered_set<auth::cache::role_name_t> update_auth_cache_roles;
|
||||
|
||||
for (const auto& m : modules.entries) {
|
||||
if (m.table == db::system_keyspace::service_levels_v2()->id()) {
|
||||
@@ -197,6 +199,12 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) {
|
||||
const auto elements = m.pk.explode(*db::system_keyspace::cdc_streams_history());
|
||||
auto cdc_log_table_id = table_id(value_cast<utils::UUID>(uuid_type->deserialize_value(elements.front())));
|
||||
update_cdc_streams.insert(cdc_log_table_id);
|
||||
} else if (auth::cache::includes_table(m.table)) {
|
||||
auto schema = _ss.get_database().find_schema(m.table);
|
||||
const auto elements = m.pk.explode(*schema);
|
||||
auto role = value_cast<sstring>(schema->partition_key_type()->
|
||||
types().front()->deserialize(elements.front()));
|
||||
update_auth_cache_roles.insert(std::move(role));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,6 +217,9 @@ future<> group0_state_machine::reload_modules(modules_to_reload modules) {
|
||||
if (update_cdc_streams.size()) {
|
||||
co_await _ss.load_cdc_streams(std::move(update_cdc_streams));
|
||||
}
|
||||
if (update_auth_cache_roles.size()) {
|
||||
co_await _ss.auth_cache().load_roles(std::move(update_auth_cache_roles));
|
||||
}
|
||||
}
|
||||
|
||||
future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merger) {
|
||||
@@ -375,6 +386,7 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
|
||||
if (_feature_service.cdc_with_tablets) {
|
||||
co_await _ss.load_cdc_streams();
|
||||
}
|
||||
co_await _ss.auth_cache().load_all();
|
||||
_ss._topology_state_machine.event.broadcast();
|
||||
_ss._view_building_state_machine.event.broadcast();
|
||||
}
|
||||
@@ -443,6 +455,8 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
|
||||
co_await mutate_locally(std::move(raft_snp->mutations), _sp);
|
||||
}
|
||||
|
||||
co_await _ss.auth_cache().load_all();
|
||||
|
||||
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
|
||||
} catch (const abort_requested_exception&) {
|
||||
throw raft::request_aborted(fmt::format(
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "compaction/task_manager_module.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include <ranges>
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
@@ -203,6 +204,7 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
sharded<db::view::view_building_worker>& view_building_worker,
|
||||
cql3::query_processor& qp,
|
||||
sharded<qos::service_level_controller>& sl_controller,
|
||||
auth::cache& auth_cache,
|
||||
topology_state_machine& topology_state_machine,
|
||||
db::view::view_building_state_machine& view_building_state_machine,
|
||||
tasks::task_manager& tm,
|
||||
@@ -221,6 +223,7 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
, _stream_manager(stream_manager)
|
||||
, _snitch(snitch)
|
||||
, _sl_controller(sl_controller)
|
||||
, _auth_cache(auth_cache)
|
||||
, _group0(nullptr)
|
||||
, _async_gate("storage_service")
|
||||
, _node_ops_abort_thread(node_ops_abort_thread())
|
||||
@@ -274,6 +277,10 @@ node_ops::task_manager_module& storage_service::get_node_ops_module() noexcept {
|
||||
return *_node_ops_module;
|
||||
}
|
||||
|
||||
auth::cache& storage_service::auth_cache() noexcept {
|
||||
return _auth_cache;
|
||||
}
|
||||
|
||||
enum class node_external_status {
|
||||
UNKNOWN = 0,
|
||||
STARTING = 1,
|
||||
@@ -593,7 +600,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip));
|
||||
break;
|
||||
case node_state::replacing: {
|
||||
SCYLLA_ASSERT(_topology_state_machine._topology.req_param.contains(id));
|
||||
scylla_assert(_topology_state_machine._topology.req_param.contains(id));
|
||||
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
|
||||
auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
@@ -681,7 +688,7 @@ future<> storage_service::notify_nodes_after_sync(nodes_to_notify_after_sync&& n
|
||||
future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
#ifdef SEASTAR_DEBUG
|
||||
static bool running = false;
|
||||
SCYLLA_ASSERT(!running); // The function is not re-entrant
|
||||
scylla_assert(!running); // The function is not re-entrant
|
||||
auto d = defer([] {
|
||||
running = false;
|
||||
});
|
||||
@@ -708,11 +715,14 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await _qp.container().invoke_on_all([] (cql3::query_processor& qp) {
|
||||
if (_qp.auth_version < db::system_keyspace::auth_version_t::v2) {
|
||||
// auth-v2 gets enabled when consistent topology changes are enabled
|
||||
// (see topology::upgrade_state_type::done above) as we use the same migration procedure
|
||||
qp.auth_version = db::system_keyspace::auth_version_t::v2;
|
||||
});
|
||||
co_await _qp.container().invoke_on_all([] (cql3::query_processor& qp) {
|
||||
qp.auth_version = db::system_keyspace::auth_version_t::v2;
|
||||
});
|
||||
co_await auth_cache().load_all();
|
||||
}
|
||||
|
||||
co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) {
|
||||
sl_controller.upgrade_to_v2(_qp, _group0->client());
|
||||
@@ -844,7 +854,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
}
|
||||
|
||||
future<> storage_service::topology_transition(state_change_hint hint) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
co_await topology_state_load(std::move(hint)); // reload new state
|
||||
|
||||
_topology_state_machine.event.broadcast();
|
||||
@@ -888,7 +898,7 @@ future<> storage_service::view_building_state_load() {
|
||||
}
|
||||
|
||||
future<> storage_service::view_building_transition() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
co_await view_building_state_load();
|
||||
|
||||
_view_building_state_machine.event.broadcast();
|
||||
@@ -956,7 +966,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
|
||||
}
|
||||
|
||||
future<> storage_service::update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache, qos::query_context ctx) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
if (_sl_controller.local().is_v2()) {
|
||||
// Skip cache update unless the topology upgrade is done
|
||||
co_await _sl_controller.local().update_cache(update_only_effective_cache, ctx);
|
||||
@@ -1510,7 +1520,7 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
|
||||
}
|
||||
|
||||
future<> storage_service::start_upgrade_to_raft_topology() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
|
||||
if (_topology_state_machine._topology.upgrade_state != topology::upgrade_state_type::not_upgraded) {
|
||||
co_return;
|
||||
@@ -1562,7 +1572,7 @@ future<> storage_service::start_upgrade_to_raft_topology() {
|
||||
}
|
||||
|
||||
topology::upgrade_state_type storage_service::get_topology_upgrade_state() const {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
return _topology_state_machine._topology.upgrade_state;
|
||||
}
|
||||
|
||||
@@ -1831,7 +1841,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
slogger.info("Nodes {} are alive", get_sync_nodes());
|
||||
}
|
||||
|
||||
SCYLLA_ASSERT(_group0);
|
||||
scylla_assert(_group0);
|
||||
|
||||
join_node_request_params join_params {
|
||||
.host_id = _group0->load_my_id(),
|
||||
@@ -2073,7 +2083,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
|
||||
if (!_sys_ks.local().bootstrap_complete()) {
|
||||
// If we're not bootstrapping then we shouldn't have chosen a CDC streams timestamp yet.
|
||||
SCYLLA_ASSERT(should_bootstrap() || !cdc_gen_id);
|
||||
scylla_assert(should_bootstrap() || !cdc_gen_id);
|
||||
|
||||
// Don't try rewriting CDC stream description tables.
|
||||
// See cdc.md design notes, `Streams description table V1 and rewriting` section, for explanation.
|
||||
@@ -2157,7 +2167,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
|
||||
SCYLLA_ASSERT(_group0);
|
||||
scylla_assert(_group0);
|
||||
co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local(), false);
|
||||
co_await _cdc_gens.local().after_join(std::move(cdc_gen_id));
|
||||
|
||||
@@ -2182,7 +2192,7 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
}
|
||||
|
||||
future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded<service::storage_proxy>& proxy) {
|
||||
SCYLLA_ASSERT(_group0);
|
||||
scylla_assert(_group0);
|
||||
|
||||
while (true) {
|
||||
_group0_as.check();
|
||||
@@ -2306,7 +2316,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
|
||||
|
||||
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
|
||||
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
||||
SCYLLA_ASSERT(!cdc_gen_id);
|
||||
scylla_assert(!cdc_gen_id);
|
||||
|
||||
cdc_gen_id = _cdc_gens.local().legacy_make_new_generation(bootstrap_tokens, !is_first_node()).get();
|
||||
|
||||
@@ -2339,9 +2349,9 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
|
||||
slogger.debug("Removing replaced endpoint {} from system.peers", replace_addr);
|
||||
_sys_ks.local().remove_endpoint(replace_addr).get();
|
||||
|
||||
SCYLLA_ASSERT(replaced_host_id);
|
||||
scylla_assert(replaced_host_id);
|
||||
auto raft_id = raft::server_id{replaced_host_id.uuid()};
|
||||
SCYLLA_ASSERT(_group0);
|
||||
scylla_assert(_group0);
|
||||
bool raft_available = _group0->wait_for_raft().get();
|
||||
if (raft_available) {
|
||||
slogger.info("Replace: removing {}/{} from group 0...", replace_addr, raft_id);
|
||||
@@ -2990,7 +3000,7 @@ future<> storage_service::stop_transport() {
|
||||
}
|
||||
|
||||
future<> storage_service::drain_on_shutdown() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
return (_operation_mode == mode::DRAINING || _operation_mode == mode::DRAINED) ?
|
||||
_drain_finished.get_future() : do_drain();
|
||||
}
|
||||
@@ -3015,7 +3025,7 @@ bool storage_service::is_topology_coordinator_enabled() const {
|
||||
|
||||
future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
|
||||
start_hint_manager start_hm, gms::generation_type new_generation) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
|
||||
if (_sys_ks.local().was_decommissioned()) {
|
||||
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless "
|
||||
@@ -3215,7 +3225,7 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
|
||||
}
|
||||
|
||||
future<token_metadata_change> storage_service::prepare_token_metadata_change(mutable_token_metadata_ptr tmptr, const schema_getter& schema_getter) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
std::exception_ptr ex;
|
||||
token_metadata_change change;
|
||||
|
||||
@@ -3982,7 +3992,7 @@ future<> storage_service::decommission() {
|
||||
slogger.info("DECOMMISSIONING: starts");
|
||||
ctl.req.leaving_nodes = std::list<gms::inet_address>{endpoint};
|
||||
|
||||
SCYLLA_ASSERT(ss._group0);
|
||||
scylla_assert(ss._group0);
|
||||
bool raft_available = ss._group0->wait_for_raft().get();
|
||||
|
||||
try {
|
||||
@@ -4034,7 +4044,7 @@ future<> storage_service::decommission() {
|
||||
|
||||
if (raft_available && left_token_ring) {
|
||||
slogger.info("decommission[{}]: leaving Raft group 0", uuid);
|
||||
SCYLLA_ASSERT(ss._group0);
|
||||
scylla_assert(ss._group0);
|
||||
ss._group0->leave_group0().get();
|
||||
slogger.info("decommission[{}]: left Raft group 0", uuid);
|
||||
}
|
||||
@@ -4340,7 +4350,7 @@ future<> storage_service::removenode(locator::host_id host_id, locator::host_id_
|
||||
auto stop_ctl = deferred_stop(ctl);
|
||||
auto uuid = ctl.uuid();
|
||||
const auto& tmptr = ctl.tmptr;
|
||||
SCYLLA_ASSERT(ss._group0);
|
||||
scylla_assert(ss._group0);
|
||||
auto raft_id = raft::server_id{host_id.uuid()};
|
||||
bool raft_available = ss._group0->wait_for_raft().get();
|
||||
bool is_group0_member = raft_available && ss._group0->is_member(raft_id, false);
|
||||
@@ -4460,7 +4470,7 @@ future<> storage_service::removenode(locator::host_id host_id, locator::host_id_
|
||||
}
|
||||
|
||||
future<> storage_service::check_and_repair_cdc_streams() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
|
||||
if (!_cdc_gens.local_is_initialized()) {
|
||||
return make_exception_future<>(std::runtime_error("CDC generation service not initialized yet"));
|
||||
@@ -5774,7 +5784,7 @@ future<> storage_service::mutate_token_metadata(std::function<future<> (mutable_
|
||||
}
|
||||
|
||||
future<> storage_service::update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
|
||||
try {
|
||||
locator::dc_rack_fn get_dc_rack_by_host_id([this, &tm = *tmptr] (locator::host_id host_id) -> std::optional<locator::endpoint_dc_rack> {
|
||||
@@ -5821,7 +5831,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
|
||||
}
|
||||
|
||||
future<locator::mutable_token_metadata_ptr> storage_service::prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint, mutable_token_metadata_ptr pending_token_metadata) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
if (hint) {
|
||||
co_await replica::update_tablet_metadata(_db.local(), _qp, pending_token_metadata->tablets(), hint);
|
||||
} else {
|
||||
@@ -5945,7 +5955,7 @@ void storage_service::start_tablet_split_monitor() {
|
||||
}
|
||||
|
||||
future<> storage_service::snitch_reconfigured() {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
auto& snitch = _snitch.local();
|
||||
co_await mutate_token_metadata([&snitch] (mutable_token_metadata_ptr tmptr) -> future<> {
|
||||
// re-read local rack and DC info
|
||||
@@ -6477,8 +6487,8 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
|
||||
co_await utils::get_local_injector().inject("block_tablet_streaming", [this, &tablet] (auto& handler) -> future<> {
|
||||
const auto keyspace = handler.get("keyspace");
|
||||
const auto table = handler.get("table");
|
||||
SCYLLA_ASSERT(keyspace);
|
||||
SCYLLA_ASSERT(table);
|
||||
scylla_assert(keyspace);
|
||||
scylla_assert(table);
|
||||
auto s = _db.local().find_column_family(tablet.table).schema();
|
||||
bool should_block = s->ks_name() == *keyspace && s->cf_name() == *table;
|
||||
while (should_block && !handler.poll_for_message() && !_async_gate.is_closed()) {
|
||||
@@ -7499,7 +7509,7 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
|
||||
}
|
||||
|
||||
future<join_node_response_result> storage_service::join_node_response_handler(join_node_response_params params) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
scylla_assert(this_shard_id() == 0);
|
||||
|
||||
// Usually this handler will only run once, but there are some cases where we might get more than one RPC,
|
||||
// possibly happening at the same time, e.g.:
|
||||
|
||||
@@ -114,6 +114,8 @@ namespace replica {
|
||||
class tablet_mutation_builder;
|
||||
}
|
||||
|
||||
namespace auth { class cache; }
|
||||
|
||||
namespace utils {
|
||||
class disk_space_monitor;
|
||||
}
|
||||
@@ -199,6 +201,7 @@ private:
|
||||
sharded<streaming::stream_manager>& _stream_manager;
|
||||
sharded<locator::snitch_ptr>& _snitch;
|
||||
sharded<qos::service_level_controller>& _sl_controller;
|
||||
auth::cache& _auth_cache;
|
||||
|
||||
// Engaged on shard 0 before `join_cluster`.
|
||||
service::raft_group0* _group0;
|
||||
@@ -265,6 +268,7 @@ public:
|
||||
sharded<db::view::view_building_worker>& view_building_worker,
|
||||
cql3::query_processor& qp,
|
||||
sharded<qos::service_level_controller>& sl_controller,
|
||||
auth::cache& auth_cache,
|
||||
topology_state_machine& topology_state_machine,
|
||||
db::view::view_building_state_machine& view_building_state_machine,
|
||||
tasks::task_manager& tm,
|
||||
@@ -998,6 +1002,8 @@ public:
|
||||
// update_both_cache_levels::no - update only effective service levels cache
|
||||
future<> update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache = qos::update_both_cache_levels::yes, qos::query_context ctx = qos::query_context::unspecified);
|
||||
|
||||
auth::cache& auth_cache() noexcept;
|
||||
|
||||
// Should be called whenever new compression dictionaries are published to system.dicts.
|
||||
// This is an arbitrary callback passed through the constructor,
|
||||
// but its intended usage is to set up the RPC connections to use the new dictionaries.
|
||||
|
||||
@@ -360,7 +360,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
auto& topo = _topo_sm._topology;
|
||||
|
||||
auto it = topo.find(id);
|
||||
SCYLLA_ASSERT(it);
|
||||
scylla_assert(it);
|
||||
|
||||
std::optional<topology_request> req;
|
||||
auto rit = topo.requests.find(id);
|
||||
@@ -1643,25 +1643,27 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::cleanup_target:
|
||||
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
|
||||
if (!trinfo.pending_replica) {
|
||||
rtlogger.info("Tablet cleanup of {} skipped because no replicas pending", gid);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
locator::tablet_replica dst = *trinfo.pending_replica;
|
||||
if (is_excluded(raft::server_id(dst.host.uuid()))) {
|
||||
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded and doesn't need to revert migration", gid, dst);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
|
||||
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
|
||||
if (do_barrier()) {
|
||||
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
|
||||
if (!trinfo.pending_replica) {
|
||||
rtlogger.info("Tablet cleanup of {} skipped because no replicas pending", gid);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
locator::tablet_replica dst = *trinfo.pending_replica;
|
||||
if (is_excluded(raft::server_id(dst.host.uuid()))) {
|
||||
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded and doesn't need to revert migration", gid, dst);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
|
||||
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
|
||||
});
|
||||
});
|
||||
});
|
||||
})) {
|
||||
transition_to(locator::tablet_transition_stage::revert_migration);
|
||||
})) {
|
||||
transition_to(locator::tablet_transition_stage::revert_migration);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::revert_migration:
|
||||
@@ -2308,7 +2310,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
switch (node.rs->state) {
|
||||
case node_state::bootstrapping: {
|
||||
SCYLLA_ASSERT(!node.rs->ring);
|
||||
scylla_assert(!node.rs->ring);
|
||||
auto num_tokens = std::get<join_param>(node.req_param.value()).num_tokens;
|
||||
auto tokens_string = std::get<join_param>(node.req_param.value()).tokens_string;
|
||||
|
||||
@@ -2357,11 +2359,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
break;
|
||||
case node_state::replacing: {
|
||||
SCYLLA_ASSERT(!node.rs->ring);
|
||||
scylla_assert(!node.rs->ring);
|
||||
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
|
||||
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
|
||||
SCYLLA_ASSERT(it != _topo_sm._topology.normal_nodes.end());
|
||||
SCYLLA_ASSERT(it->second.ring && it->second.state == node_state::normal);
|
||||
scylla_assert(it != _topo_sm._topology.normal_nodes.end());
|
||||
scylla_assert(it->second.ring && it->second.state == node_state::normal);
|
||||
|
||||
topology_mutation_builder builder(node.guard.write_timestamp());
|
||||
|
||||
@@ -3020,7 +3022,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
rtbuilder.set("start_time", db_clock::now());
|
||||
switch (node.request.value()) {
|
||||
case topology_request::join: {
|
||||
SCYLLA_ASSERT(!node.rs->ring);
|
||||
scylla_assert(!node.rs->ring);
|
||||
// Write chosen tokens through raft.
|
||||
builder.set_transition_state(topology::transition_state::join_group0)
|
||||
.with_node(node.id)
|
||||
@@ -3031,7 +3033,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
}
|
||||
case topology_request::leave:
|
||||
SCYLLA_ASSERT(node.rs->ring);
|
||||
scylla_assert(node.rs->ring);
|
||||
// start decommission and put tokens of decommissioning nodes into write_both_read_old state
|
||||
// meaning that reads will go to the replica being decommissioned
|
||||
// but writes will go to new owner as well
|
||||
@@ -3044,7 +3046,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
"start decommission");
|
||||
break;
|
||||
case topology_request::remove: {
|
||||
SCYLLA_ASSERT(node.rs->ring);
|
||||
scylla_assert(node.rs->ring);
|
||||
|
||||
builder.set_transition_state(topology::transition_state::tablet_draining)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
@@ -3056,7 +3058,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
}
|
||||
case topology_request::replace: {
|
||||
SCYLLA_ASSERT(!node.rs->ring);
|
||||
scylla_assert(!node.rs->ring);
|
||||
|
||||
builder.set_transition_state(topology::transition_state::join_group0)
|
||||
.with_node(node.id)
|
||||
@@ -3161,7 +3163,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
auto id = node.id;
|
||||
|
||||
SCYLLA_ASSERT(!_topo_sm._topology.transition_nodes.empty());
|
||||
scylla_assert(!_topo_sm._topology.transition_nodes.empty());
|
||||
|
||||
release_node(std::move(node));
|
||||
|
||||
@@ -4011,7 +4013,7 @@ future<> topology_coordinator::stop() {
|
||||
// but let's check all of them because we never reset these holders
|
||||
// once they are added as barriers
|
||||
for (auto& [stage, barrier]: tablet_state.barriers) {
|
||||
SCYLLA_ASSERT(barrier.has_value());
|
||||
scylla_assert(barrier.has_value());
|
||||
co_await stop_background_action(barrier, gid, [stage] { return format("at stage {}", tablet_transition_stage_to_string(stage)); });
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ enum class component_type {
|
||||
TemporaryTOC,
|
||||
TemporaryStatistics,
|
||||
Scylla,
|
||||
TemporaryScylla,
|
||||
Rows,
|
||||
Partitions,
|
||||
TemporaryHashes,
|
||||
@@ -76,6 +77,8 @@ struct fmt::formatter<sstables::component_type> : fmt::formatter<string_view> {
|
||||
return formatter<string_view>::format("TemporaryStatistics", ctx);
|
||||
case Scylla:
|
||||
return formatter<string_view>::format("Scylla", ctx);
|
||||
case TemporaryScylla:
|
||||
return formatter<string_view>::format("TemporaryScylla", ctx);
|
||||
case Partitions:
|
||||
return formatter<string_view>::format("Partitions", ctx);
|
||||
case Rows:
|
||||
|
||||
@@ -251,7 +251,7 @@ void compression::discard_hidden_options() {
|
||||
}
|
||||
|
||||
compressor& compression::get_compressor() const {
|
||||
SCYLLA_ASSERT(_compressor);
|
||||
scylla_assert(_compressor);
|
||||
return *_compressor.get();
|
||||
}
|
||||
|
||||
|
||||
@@ -170,7 +170,7 @@ struct compression {
|
||||
const_iterator(const const_iterator& other) = default;
|
||||
|
||||
const_iterator& operator=(const const_iterator& other) {
|
||||
SCYLLA_ASSERT(&_offsets == &other._offsets);
|
||||
scylla_assert(&_offsets == &other._offsets);
|
||||
_index = other._index;
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "sstables/sstable_compressor_factory.hh"
|
||||
#include "compressor.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/config_file_impl.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
@@ -295,7 +296,7 @@ size_t zstd_processor::uncompress(const char* input, size_t input_len, char* out
|
||||
if (_ddict) {
|
||||
return ZSTD_decompress_usingDDict(dctx, output, output_len, input, input_len, _ddict->dict());
|
||||
} else {
|
||||
SCYLLA_ASSERT(!_cdict && "Write-only compressor used for reading");
|
||||
scylla_assert(!_cdict && "Write-only compressor used for reading");
|
||||
return ZSTD_decompressDCtx(dctx, output, output_len, input, input_len);
|
||||
}
|
||||
});
|
||||
@@ -310,7 +311,7 @@ size_t zstd_processor::compress(const char* input, size_t input_len, char* outpu
|
||||
if (_cdict) {
|
||||
return ZSTD_compress_usingCDict(cctx, output, output_len, input, input_len, _cdict->dict());
|
||||
} else {
|
||||
SCYLLA_ASSERT(!_ddict && "Read-only compressor used for writing");
|
||||
scylla_assert(!_ddict && "Read-only compressor used for writing");
|
||||
return ZSTD_compressCCtx(cctx, output, output_len, input, input_len, _compression_level);
|
||||
}
|
||||
});
|
||||
@@ -627,7 +628,7 @@ size_t lz4_processor::uncompress(const char* input, size_t input_len,
|
||||
if (_ddict) {
|
||||
ret = LZ4_decompress_safe_usingDict(input, output, input_len, output_len, reinterpret_cast<const char*>(_ddict->raw().data()), _ddict->raw().size());
|
||||
} else {
|
||||
SCYLLA_ASSERT(!_cdict && "Write-only compressor used for reading");
|
||||
scylla_assert(!_cdict && "Write-only compressor used for reading");
|
||||
ret = LZ4_decompress_safe(input, output, input_len, output_len);
|
||||
}
|
||||
if (ret < 0) {
|
||||
@@ -657,7 +658,7 @@ size_t lz4_processor::compress(const char* input, size_t input_len,
|
||||
LZ4_resetStream_fast(ctx);
|
||||
}
|
||||
} else {
|
||||
SCYLLA_ASSERT(!_ddict && "Read-only compressor used for writing");
|
||||
scylla_assert(!_ddict && "Read-only compressor used for writing");
|
||||
ret = LZ4_compress_default(input, output + 4, input_len, LZ4_compressBound(input_len));
|
||||
}
|
||||
if (ret == 0) {
|
||||
@@ -1268,7 +1269,7 @@ lz4_cdict::~lz4_cdict() {
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
|
||||
SCYLLA_ASSERT(thread::running_in_thread());
|
||||
scylla_assert(thread::running_in_thread());
|
||||
struct wrapper : sstable_compressor_factory {
|
||||
using impl = default_sstable_compressor_factory;
|
||||
sharded<impl> _impl;
|
||||
|
||||
@@ -44,14 +44,14 @@ public:
|
||||
* @return A list of `sampling_level` unique indices between 0 and `sampling_level`
|
||||
*/
|
||||
static const std::vector<int>& get_sampling_pattern(int sampling_level) {
|
||||
SCYLLA_ASSERT(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
|
||||
scylla_assert(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
|
||||
auto& entry = _sample_pattern_cache[sampling_level-1];
|
||||
if (!entry.empty()) {
|
||||
return entry;
|
||||
}
|
||||
|
||||
if (sampling_level <= 1) {
|
||||
SCYLLA_ASSERT(_sample_pattern_cache[0].empty());
|
||||
scylla_assert(_sample_pattern_cache[0].empty());
|
||||
_sample_pattern_cache[0].push_back(0);
|
||||
return _sample_pattern_cache[0];
|
||||
}
|
||||
@@ -96,7 +96,7 @@ public:
|
||||
* @return a list of original indexes for current summary entries
|
||||
*/
|
||||
static const std::vector<int>& get_original_indexes(int sampling_level) {
|
||||
SCYLLA_ASSERT(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
|
||||
scylla_assert(sampling_level > 0 && sampling_level <= BASE_SAMPLING_LEVEL);
|
||||
auto& entry = _original_index_cache[sampling_level-1];
|
||||
if (!entry.empty()) {
|
||||
return entry;
|
||||
@@ -128,7 +128,7 @@ public:
|
||||
* @return the number of partitions before the next index summary entry, inclusive on one end
|
||||
*/
|
||||
static int get_effective_index_interval_after_index(int index, int sampling_level, int min_index_interval) {
|
||||
SCYLLA_ASSERT(index >= -1);
|
||||
scylla_assert(index >= -1);
|
||||
const std::vector<int>& original_indexes = get_original_indexes(sampling_level);
|
||||
if (index == -1) {
|
||||
return original_indexes[0] * min_index_interval;
|
||||
|
||||
@@ -31,7 +31,7 @@ public:
|
||||
[[noreturn]] void on_parse_error(sstring message, std::optional<component_name> filename);
|
||||
[[noreturn, gnu::noinline]] void on_bti_parse_error(uint64_t pos);
|
||||
|
||||
// Use this instead of SCYLLA_ASSERT() or assert() in code that is used while parsing SSTables.
|
||||
// Use this instead of scylla_assert() or assert() in code that is used while parsing SSTables.
|
||||
// SSTables can be corrupted either by ScyllaDB itself or by a freak accident like cosmic background
|
||||
// radiation hitting the disk the wrong way. Either way a corrupt SSTable should not bring down the
|
||||
// whole server. This method will call on_internal_error() if the condition is false.
|
||||
|
||||
@@ -129,7 +129,7 @@ public:
|
||||
/// way to determine that is overlapping its partition-ranges with the shard's
|
||||
/// owned ranges.
|
||||
static bool maybe_owned_by_this_shard(const sstables::generation_type& gen) {
|
||||
SCYLLA_ASSERT(bool(gen));
|
||||
scylla_assert(bool(gen));
|
||||
int64_t hint = 0;
|
||||
if (gen.is_uuid_based()) {
|
||||
hint = std::hash<utils::UUID>{}(gen.as_uuid());
|
||||
|
||||
@@ -91,7 +91,7 @@ public:
|
||||
{}
|
||||
|
||||
void increment() {
|
||||
SCYLLA_ASSERT(_range);
|
||||
scylla_assert(_range);
|
||||
if (!_range->next()) {
|
||||
_range = nullptr;
|
||||
}
|
||||
@@ -102,7 +102,7 @@ public:
|
||||
}
|
||||
|
||||
const ValueType dereference() const {
|
||||
SCYLLA_ASSERT(_range);
|
||||
scylla_assert(_range);
|
||||
return _range->get_value();
|
||||
}
|
||||
|
||||
@@ -153,7 +153,7 @@ public:
|
||||
auto limit = std::min(_serialization_limit_size, _offset + clustering_block::max_block_size);
|
||||
|
||||
_current_block = {};
|
||||
SCYLLA_ASSERT (_offset % clustering_block::max_block_size == 0);
|
||||
scylla_assert (_offset % clustering_block::max_block_size == 0);
|
||||
while (_offset < limit) {
|
||||
auto shift = _offset % clustering_block::max_block_size;
|
||||
if (_offset < _prefix.size(_schema)) {
|
||||
@@ -280,7 +280,7 @@ public:
|
||||
++_current_index;
|
||||
}
|
||||
} else {
|
||||
SCYLLA_ASSERT(_mode == encoding_mode::large_encode_missing);
|
||||
scylla_assert(_mode == encoding_mode::large_encode_missing);
|
||||
while (_current_index < total_size) {
|
||||
auto cell = _row.find_cell(_columns[_current_index].get().id);
|
||||
if (!cell) {
|
||||
@@ -632,6 +632,10 @@ private:
|
||||
std::unique_ptr<file_writer> close_writer(std::unique_ptr<file_writer>& w);
|
||||
|
||||
void close_data_writer();
|
||||
void close_index_writer();
|
||||
void close_rows_writer();
|
||||
void close_partitions_writer();
|
||||
|
||||
void ensure_tombstone_is_written() {
|
||||
if (!_tombstone_written) {
|
||||
consume(tombstone());
|
||||
@@ -944,17 +948,16 @@ void writer::init_file_writers() {
|
||||
_sst._schema->get_compressor_params(),
|
||||
std::move(compressor)), _sst.get_filename());
|
||||
}
|
||||
|
||||
if (_sst.has_component(component_type::Index)) {
|
||||
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Index).get();
|
||||
_index_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), _sst.index_filename());
|
||||
_index_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, _sst.index_filename());
|
||||
}
|
||||
if (_sst.has_component(component_type::Partitions) && _sst.has_component(component_type::Rows)) {
|
||||
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Rows).get();
|
||||
_rows_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Rows));
|
||||
_rows_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Rows));
|
||||
_bti_row_index_writer = trie::bti_row_index_writer(*_rows_writer);
|
||||
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Partitions).get();
|
||||
_partitions_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Partitions));
|
||||
_partitions_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Partitions));
|
||||
_bti_partition_index_writer = trie::bti_partition_index_writer(*_partitions_writer);
|
||||
}
|
||||
if (_delayed_filter) {
|
||||
@@ -982,6 +985,41 @@ void writer::close_data_writer() {
|
||||
}
|
||||
}
|
||||
|
||||
void writer::close_index_writer() {
|
||||
if (_index_writer) {
|
||||
auto writer = close_writer(_index_writer);
|
||||
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
|
||||
_sst.get_components_digests().index_digest = chksum_wr->full_checksum();
|
||||
}
|
||||
}
|
||||
|
||||
void writer::close_partitions_writer() {
|
||||
if (_partitions_writer) {
|
||||
_sst._partitions_db_footer = std::move(*_bti_partition_index_writer).finish(
|
||||
_sst.get_version(),
|
||||
_first_key.value(),
|
||||
_last_key.value());
|
||||
auto writer = close_writer(_partitions_writer);
|
||||
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
|
||||
_sst.get_components_digests().partitions_digest = chksum_wr->full_checksum();
|
||||
}
|
||||
}
|
||||
|
||||
void writer::close_rows_writer() {
|
||||
if (_rows_writer) {
|
||||
// Append some garbage padding to the file just to ensure that it's never empty.
|
||||
// (Otherwise it would be empty if the sstable contains only small partitions).
|
||||
// This is a hack to work around some bad interactions between zero-sized files
|
||||
// and object storage. (It seems that e.g. minio considers a zero-sized file
|
||||
// upload to be a no-op, which breaks some assumptions).
|
||||
uint32_t garbage = seastar::cpu_to_be(0x13371337);
|
||||
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
|
||||
auto writer = close_writer(_rows_writer);
|
||||
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
|
||||
_sst.get_components_digests().rows_digest = chksum_wr->full_checksum();
|
||||
}
|
||||
}
|
||||
|
||||
void writer::consume_new_partition(const dht::decorated_key& dk) {
|
||||
_c_stats.start_offset = _data_writer->offset();
|
||||
_prev_row_start = _data_writer->offset();
|
||||
@@ -1142,7 +1180,7 @@ void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clus
|
||||
|
||||
if (cdef.is_counter()) {
|
||||
if (!is_deleted) {
|
||||
SCYLLA_ASSERT(!cell.is_counter_update());
|
||||
scylla_assert(!cell.is_counter_update());
|
||||
auto ccv = counter_cell_view(cell);
|
||||
write_counter_value(ccv, writer, _sst.get_version(), [] (bytes_ostream& out, uint32_t value) {
|
||||
return write_vint(out, value);
|
||||
@@ -1451,7 +1489,7 @@ template <typename W>
|
||||
requires Writer<W>
|
||||
static void write_clustering_prefix(sstable_version_types v, W& writer, bound_kind_m kind,
|
||||
const schema& s, const clustering_key_prefix& clustering) {
|
||||
SCYLLA_ASSERT(kind != bound_kind_m::static_clustering);
|
||||
scylla_assert(kind != bound_kind_m::static_clustering);
|
||||
write(v, writer, kind);
|
||||
auto is_ephemerally_full = ephemerally_full_prefix{s.is_compact_table()};
|
||||
if (kind != bound_kind_m::clustering) {
|
||||
@@ -1630,27 +1668,10 @@ void writer::consume_end_of_stream() {
|
||||
_collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
|
||||
}
|
||||
|
||||
if (_index_writer) {
|
||||
close_writer(_index_writer);
|
||||
}
|
||||
close_index_writer();
|
||||
|
||||
if (_partitions_writer) {
|
||||
_sst._partitions_db_footer = std::move(*_bti_partition_index_writer).finish(
|
||||
_sst.get_version(),
|
||||
_first_key.value(),
|
||||
_last_key.value());
|
||||
close_writer(_partitions_writer);
|
||||
}
|
||||
if (_rows_writer) {
|
||||
// Append some garbage padding to the file just to ensure that it's never empty.
|
||||
// (Otherwise it would be empty if the sstable contains only small partitions).
|
||||
// This is a hack to work around some bad interactions between zero-sized files
|
||||
// and object storage. (It seems that e.g. minio considers a zero-sized file
|
||||
// upload to be a no-op, which breaks some assumptions).
|
||||
uint32_t garbage = seastar::cpu_to_be(0x13371337);
|
||||
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
|
||||
close_writer(_rows_writer);
|
||||
}
|
||||
close_partitions_writer();
|
||||
close_rows_writer();
|
||||
|
||||
if (_hashes_writer) {
|
||||
close_writer(_hashes_writer);
|
||||
|
||||
@@ -59,7 +59,7 @@ private:
|
||||
// Live entry_ptr should keep the entry alive, except when the entry failed on loading.
|
||||
// In that case, entry_ptr holders are not supposed to use the pointer, so it's safe
|
||||
// to nullify those entry_ptrs.
|
||||
SCYLLA_ASSERT(!ready());
|
||||
scylla_assert(!ready());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -496,7 +496,7 @@ sstable_directory::move_foreign_sstables(sharded<sstable_directory>& source_dire
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// Should be empty, since an SSTable that belongs to this shard is not remote.
|
||||
SCYLLA_ASSERT(shard_id != this_shard_id());
|
||||
scylla_assert(shard_id != this_shard_id());
|
||||
dirlog.debug("Moving {} unshared SSTables of {}.{} to shard {} ", info_vec.size(), _schema->ks_name(), _schema->cf_name(), shard_id);
|
||||
return source_directory.invoke_on(shard_id, &sstables::sstable_directory::load_foreign_sstables, std::move(info_vec));
|
||||
});
|
||||
@@ -540,7 +540,7 @@ sstable_directory::collect_output_unshared_sstables(std::vector<sstables::shared
|
||||
dirlog.debug("Collecting {} output SSTables (remote={})", resharded_sstables.size(), remote_ok);
|
||||
return parallel_for_each(std::move(resharded_sstables), [this, remote_ok] (sstables::shared_sstable sst) {
|
||||
auto shards = sst->get_shards_for_this_sstable();
|
||||
SCYLLA_ASSERT(shards.size() == 1);
|
||||
scylla_assert(shards.size() == 1);
|
||||
auto shard = shards[0];
|
||||
|
||||
if (shard == this_shard_id()) {
|
||||
|
||||
@@ -283,7 +283,7 @@ bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) cons
|
||||
}
|
||||
sstlog.info("SSTable {}, as_unleveled={}, expect_unleveled={}, sst_tr={}, overlap_ratio={}",
|
||||
sst->generation(), as_unleveled, expect_unleveled, sst_tr, dht::overlap_ratio(_token_range, sst_tr));
|
||||
SCYLLA_ASSERT(as_unleveled == expect_unleveled);
|
||||
scylla_assert(as_unleveled == expect_unleveled);
|
||||
});
|
||||
|
||||
return as_unleveled;
|
||||
@@ -712,8 +712,8 @@ public:
|
||||
|
||||
// by !empty(bound) and `_it` invariant:
|
||||
// _it != _end, _it->first <= bound, and filter(*_it->second) == true
|
||||
SCYLLA_ASSERT(_cmp(_it->first, bound) <= 0);
|
||||
// we don't SCYLLA_ASSERT(filter(*_it->second)) due to the requirement that `filter` is called at most once for each sstable
|
||||
scylla_assert(_cmp(_it->first, bound) <= 0);
|
||||
// we don't scylla_assert(filter(*_it->second)) due to the requirement that `filter` is called at most once for each sstable
|
||||
|
||||
// Find all sstables with the same position as `_it` (they form a contiguous range in the container).
|
||||
auto next = std::find_if(std::next(_it), _end, [this] (const value_t& v) { return _cmp(v.first, _it->first) != 0; });
|
||||
@@ -1301,7 +1301,7 @@ sstable_set::create_single_key_sstable_reader(
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
const sstable_predicate& predicate,
|
||||
sstables::integrity_check integrity) const {
|
||||
SCYLLA_ASSERT(pr.is_singular() && pr.start()->value().has_key());
|
||||
scylla_assert(pr.is_singular() && pr.start()->value().has_key());
|
||||
return _impl->create_single_key_sstable_reader(cf, std::move(schema),
|
||||
std::move(permit), sstable_histogram, pr, slice, std::move(trace_state), fwd, fwd_mr, predicate, integrity);
|
||||
}
|
||||
@@ -1408,7 +1408,7 @@ sstable_set::make_local_shard_sstable_reader(
|
||||
{
|
||||
auto reader_factory_fn = [s, permit, &slice, trace_state, fwd, fwd_mr, &monitor_generator, &predicate, integrity]
|
||||
(shared_sstable& sst, const dht::partition_range& pr) mutable {
|
||||
SCYLLA_ASSERT(!sst->is_shared());
|
||||
scylla_assert(!sst->is_shared());
|
||||
if (!predicate(*sst)) {
|
||||
return make_empty_mutation_reader(s, permit);
|
||||
}
|
||||
|
||||
@@ -44,6 +44,7 @@ sstable_version_constants::component_map_t sstable_version_constants::create_com
|
||||
{ component_type::Filter, "Filter.db" },
|
||||
{ component_type::Statistics, "Statistics.db" },
|
||||
{ component_type::Scylla, "Scylla.db" },
|
||||
{ component_type::TemporaryScylla, "Scylla.db.tmp" },
|
||||
{ component_type::TemporaryTOC, TEMPORARY_TOC_SUFFIX },
|
||||
{ component_type::TemporaryStatistics, "Statistics.db.tmp" }
|
||||
};
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/to_string.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "data_dictionary/storage_options.hh"
|
||||
#include "dht/sharder.hh"
|
||||
#include "writer.hh"
|
||||
@@ -956,16 +957,22 @@ future<file_writer> sstable::make_component_file_writer(component_type c, file_o
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unique_ptr<crc32_digest_file_writer>> sstable::make_digests_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept {
|
||||
return _storage->make_component_sink(*this, c, oflags, std::move(options)).then([this, comp = component_name(*this, c)] (data_sink sink) mutable {
|
||||
return std::make_unique<crc32_digest_file_writer>(std::move(sink), sstable_buffer_size, comp);
|
||||
});
|
||||
}
|
||||
|
||||
void sstable::open_sstable(const sstring& origin) {
|
||||
_origin = origin;
|
||||
generate_toc();
|
||||
_storage->open(*this);
|
||||
}
|
||||
|
||||
void sstable::write_toc(file_writer w) {
|
||||
void sstable::write_toc(std::unique_ptr<crc32_digest_file_writer> w) {
|
||||
sstlog.debug("Writing TOC file {} ", toc_filename());
|
||||
|
||||
do_write_simple(std::move(w), [&] (version_types v, file_writer& w) {
|
||||
do_write_simple(*w, [&] (version_types v, file_writer& w) {
|
||||
for (auto&& key : _recognized_components) {
|
||||
// new line character is appended to the end of each component name.
|
||||
auto value = sstable_version_constants::get_component_map(v).at(key) + "\n";
|
||||
@@ -973,6 +980,8 @@ void sstable::write_toc(file_writer w) {
|
||||
write(v, w, b);
|
||||
}
|
||||
});
|
||||
|
||||
_components_digests.toc_digest = w->full_checksum();
|
||||
}
|
||||
|
||||
void sstable::write_crc(const checksum& c) {
|
||||
@@ -989,6 +998,7 @@ void sstable::write_digest(uint32_t full_checksum) {
|
||||
auto digest = to_sstring<bytes>(full_checksum);
|
||||
write(v, w, digest);
|
||||
}, buffer_size);
|
||||
_components_digests.data_digest = full_checksum;
|
||||
}
|
||||
|
||||
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache;
|
||||
@@ -1045,7 +1055,7 @@ future<> sstable::read_simple(T& component) {
|
||||
});
|
||||
}
|
||||
|
||||
void sstable::do_write_simple(file_writer&& writer,
|
||||
void sstable::do_write_simple(file_writer& writer,
|
||||
noncopyable_function<void (version_types, file_writer&)> write_component) {
|
||||
write_component(_version, writer);
|
||||
_metadata_size_on_disk += writer.offset();
|
||||
@@ -1060,7 +1070,7 @@ void sstable::do_write_simple(component_type type,
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = buffer_size;
|
||||
auto w = make_component_file_writer(type, std::move(options)).get();
|
||||
do_write_simple(std::move(w), std::move(write_component));
|
||||
do_write_simple(w, std::move(write_component));
|
||||
}
|
||||
|
||||
template <component_type Type, typename T>
|
||||
@@ -1070,10 +1080,30 @@ void sstable::write_simple(const T& component) {
|
||||
}, sstable_buffer_size);
|
||||
}
|
||||
|
||||
uint32_t sstable::do_write_simple_with_digest(component_type type,
|
||||
noncopyable_function<void (version_types version, file_writer& writer)> write_component, unsigned buffer_size) {
|
||||
auto file_path = filename(type);
|
||||
sstlog.debug("Writing {} file {}", sstable_version_constants::get_component_map(_version).at(type), file_path);
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = buffer_size;
|
||||
auto w = make_digests_component_file_writer(type, std::move(options)).get();
|
||||
do_write_simple(*w, std::move(write_component));
|
||||
return w->full_checksum();
|
||||
}
|
||||
|
||||
template <component_type Type, typename T>
|
||||
uint32_t sstable::write_simple_with_digest(const T& component) {
|
||||
return do_write_simple_with_digest(Type, [&component] (version_types v, file_writer& w) {
|
||||
write(v, w, component);
|
||||
}, sstable_buffer_size);
|
||||
}
|
||||
|
||||
template future<> sstable::read_simple<component_type::Filter>(sstables::filter& f);
|
||||
template void sstable::write_simple<component_type::Filter>(const sstables::filter& f);
|
||||
|
||||
template void sstable::write_simple<component_type::Summary>(const sstables::summary_ka&);
|
||||
template uint32_t sstable::write_simple_with_digest<component_type::Summary>(const sstables::summary_ka&);
|
||||
|
||||
future<> sstable::read_compression() {
|
||||
// FIXME: If there is no compression, we should expect a CRC file to be present.
|
||||
@@ -1092,7 +1122,8 @@ void sstable::write_compression() {
|
||||
return;
|
||||
}
|
||||
|
||||
write_simple<component_type::CompressionInfo>(_components->compression);
|
||||
uint32_t digest = write_simple_with_digest<component_type::CompressionInfo>(_components->compression);
|
||||
_components_digests.compression_digest = digest;
|
||||
}
|
||||
|
||||
void sstable::validate_partitioner() {
|
||||
@@ -1317,7 +1348,8 @@ future<> sstable::read_partitions_db_footer() {
|
||||
}
|
||||
|
||||
void sstable::write_statistics() {
|
||||
write_simple<component_type::Statistics>(_components->statistics);
|
||||
auto digest = write_simple_with_digest<component_type::Statistics>(_components->statistics);
|
||||
_components_digests.statistics_digest = digest;
|
||||
}
|
||||
|
||||
void sstable::mark_as_being_repaired(const service::session_id& id) {
|
||||
@@ -1342,10 +1374,23 @@ void sstable::rewrite_statistics() {
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
auto w = make_component_file_writer(component_type::TemporaryStatistics, std::move(options),
|
||||
auto w = make_digests_component_file_writer(component_type::TemporaryStatistics, std::move(options),
|
||||
open_flags::wo | open_flags::create | open_flags::truncate).get();
|
||||
write(_version, w, _components->statistics);
|
||||
w.close();
|
||||
write(_version, *w, _components->statistics);
|
||||
w->close();
|
||||
|
||||
// When rewriting statistics, we also need to update the scylla component
|
||||
// because it contains the digest of the statistics component.
|
||||
if (has_scylla_component()) {
|
||||
_components_digests.statistics_digest = w->full_checksum();
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::ComponentsDigests>(components_digests{_components_digests});
|
||||
sstlog.debug("Rewriting scylla component of sstable {}", get_filename());
|
||||
write_simple<component_type::TemporaryScylla>(*_components->scylla_metadata);
|
||||
|
||||
// rename() guarantees atomicity when renaming a file into place.
|
||||
sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryScylla)), fmt::to_string(filename(component_type::Scylla))).get();
|
||||
}
|
||||
|
||||
// rename() guarantees atomicity when renaming a file into place.
|
||||
sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryStatistics)), fmt::to_string(filename(component_type::Statistics))).get();
|
||||
}
|
||||
@@ -1539,7 +1584,8 @@ void sstable::write_filter() {
|
||||
|
||||
auto&& bs = f->bits();
|
||||
auto filter_ref = sstables::filter_ref(f->num_hashes(), bs.get_storage());
|
||||
write_simple<component_type::Filter>(filter_ref);
|
||||
uint32_t digest = write_simple_with_digest<component_type::Filter>(filter_ref);
|
||||
_components_digests.filter_digest = digest;
|
||||
}
|
||||
|
||||
void sstable::maybe_rebuild_filter_from_index(uint64_t num_partitions) {
|
||||
@@ -1998,6 +2044,8 @@ sstable::read_scylla_metadata() noexcept {
|
||||
}
|
||||
return read_simple<component_type::Scylla>(*_components->scylla_metadata).then([this] {
|
||||
_features = _components->scylla_metadata->get_features();
|
||||
_components_digests = _components->scylla_metadata->get_components_digests();
|
||||
_components->digest = _components_digests.data_digest;
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2087,6 +2135,7 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
|
||||
sstable_schema.columns.elements.push_back(sstable_column_description{to_sstable_column_kind(col.kind), {col.name()}, {to_bytes(col.type->name())}});
|
||||
}
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::Schema>(std::move(sstable_schema));
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::ComponentsDigests>(components_digests(_components_digests));
|
||||
|
||||
write_simple<component_type::Scylla>(*_components->scylla_metadata);
|
||||
}
|
||||
@@ -3075,6 +3124,31 @@ void sstable::set_sstable_level(uint32_t new_level) {
|
||||
s.sstable_level = new_level;
|
||||
}
|
||||
|
||||
std::optional<uint32_t> sstable::get_component_digest(component_type c) const {
|
||||
switch (c) {
|
||||
case component_type::Index:
|
||||
return _components_digests.index_digest;
|
||||
case component_type::Summary:
|
||||
return _components_digests.summary_digest;
|
||||
case component_type::TOC:
|
||||
return _components_digests.toc_digest;
|
||||
case component_type::CompressionInfo:
|
||||
return _components_digests.compression_digest;
|
||||
case component_type::Filter:
|
||||
return _components_digests.filter_digest;
|
||||
case component_type::Partitions:
|
||||
return _components_digests.partitions_digest;
|
||||
case component_type::Rows:
|
||||
return _components_digests.rows_digest;
|
||||
case component_type::Data:
|
||||
return _components_digests.data_digest;
|
||||
case component_type::Statistics:
|
||||
return _components_digests.statistics_digest;
|
||||
default:
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable::mutate_sstable_level(uint32_t new_level) {
|
||||
if (!has_component(component_type::Statistics)) {
|
||||
return make_ready_future<>();
|
||||
@@ -4088,7 +4162,7 @@ future<data_sink> file_io_extension::wrap_sink(const sstable& sst, component_typ
|
||||
}
|
||||
|
||||
future<data_source> file_io_extension::wrap_source(const sstable& sst, component_type c, data_source) {
|
||||
SCYLLA_ASSERT(0 && "You are not supposed to get here, file_io_extension::wrap_source() is not implemented");
|
||||
scylla_assert(0 && "You are not supposed to get here, file_io_extension::wrap_source() is not implemented");
|
||||
}
|
||||
|
||||
namespace trie {
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user