Compare commits

..

4 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
2b9b1584ce Pass formattable object to logger instead of intermediate string
Instead of using std::function<std::string()> which creates intermediate
strings, introduce allocating_section_namer class that formats directly
to fmt::memory_buffer. This avoids string allocation overhead when
logging allocation failures.

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-05 14:44:35 +00:00
copilot-swe-agent[bot]
ac695b6986 Make allocating_section name lazily constructed from table reference
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-03 20:16:13 +00:00
copilot-swe-agent[bot]
f23f6c5dcd Add name field to allocating_section and update logging
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-03 20:01:38 +00:00
copilot-swe-agent[bot]
bd01b669d0 Initial plan 2025-12-03 19:48:31 +00:00
87 changed files with 630 additions and 1822 deletions

View File

@@ -1,194 +0,0 @@
# Clustering Range to Position Range Migration - Summary
## Problem Statement
The `clustering_range` type (alias for `interval<clustering_key_prefix>`) has known correctness issues with operations like `intersection()` and `deoverlap()`. These operations can return incorrect results due to the complex semantics of comparing clustering key prefixes with different bound inclusiveness.
**Related Issues:**
- #22817 - `interval<clustering_key_prefix>::deoverlap` can return incorrect results
- #21604 - Problems with clustering range operations
- #8157 - `interval<clustering_key_prefix_view>::intersection` can return incorrect results
## Solution Approach
The `position_range` class represents clustering ranges as a pair of `position_in_partition` objects, avoiding the problematic interval semantics. The migration strategy involves:
1. **Fix critical bugs immediately** - Use `clustering_interval_set` which internally uses `position_range`
2. **Add infrastructure** - Feature flags, IDL support, utility functions
3. **Gradual internal migration** - Replace internal uses of `clustering_range` with `position_range`
4. **RPC compatibility** - Maintain backward compatibility with feature-gated new verbs
## What Has Been Done
### 1. Feature Flag ✅
Added `gms::feature position_range` to `gms/feature_service.hh` for cluster-wide feature detection.
### 2. IDL Support ✅
Added `position_range` to `idl/position_in_partition.idl.hh` for RPC serialization:
```idl
class position_range {
position_in_partition start();
position_in_partition end();
};
```
### 3. Critical Bug Fixes ✅
#### Fixed in `cql3/statements/cas_request.cc`:
```cpp
// OLD (buggy):
ranges = query::clustering_range::deoverlap(std::move(ranges), clustering_key::tri_compare(*_schema));
// NEW (fixed):
clustering_interval_set interval_set(*_schema, ranges);
ranges = interval_set.to_clustering_row_ranges();
```
#### Fixed in `db/view/view.cc`:
```cpp
// OLD (buggy):
auto deoverlapped_ranges = interval<clustering_key_prefix_view>::deoverlap(std::move(row_ranges), cmp);
// NEW (fixed):
clustering_interval_set interval_set(base, temp_ranges);
return interval_set.to_clustering_row_ranges();
```
### 4. Utility Functions ✅
Created `query/position_range_utils.hh` with safe range operation helpers:
- `clustering_row_ranges_to_position_ranges()` - Batch conversion
- `position_ranges_to_clustering_row_ranges()` - Batch conversion back
- `deoverlap_clustering_row_ranges()` - Safe deoverlap using clustering_interval_set
- `intersect_clustering_row_ranges()` - Safe intersection using clustering_interval_set
### 5. Tests ✅
Added comprehensive unit tests in `test/boost/position_range_utils_test.cc`:
- Test deoverlap with overlapping and non-overlapping ranges
- Test conversion between clustering_range and position_range
- Test intersection operations
- Validate correctness of utility functions
### 6. Documentation ✅
- **Migration guide**: `docs/dev/clustering-range-to-position-range-migration.md`
- Overview of the problem and solution
- Conversion utilities and patterns
- Implementation checklist
- **RPC migration plan**: `docs/dev/position-range-rpc-migration.md`
- Detailed plan for backward-compatible RPC migration
- IDL type definitions for v2 types
- Feature-gated verb selection logic
- Phased rollout strategy
## What Remains To Be Done
### Phase 1: RPC Migration (High Priority)
1. Define `partition_slice_v2` with `std::vector<position_range>`
2. Define `read_command_v2` using `partition_slice_v2`
3. Add new RPC verbs: `read_data_v2`, `read_mutation_data_v2`, `read_digest_v2`
4. Implement conversion between v1 and v2 types
5. Add feature-gated verb selection in RPC clients
6. Test backward compatibility
### Phase 2: Internal Refactoring (Ongoing)
1. Identify internal data structures using `clustering_range`
2. Refactor to use `position_range` where appropriate
3. Update mutation readers and iterators
4. Modify query processing logic
5. Update cache structures
### Phase 3: Validation (Continuous)
1. Build and run existing tests
2. Add more tests for edge cases
3. Performance benchmarking
4. Rolling upgrade testing
## Files Changed
### Core Changes
- `gms/feature_service.hh` - Added position_range feature flag
- `idl/position_in_partition.idl.hh` - Added position_range IDL definition
- `cql3/statements/cas_request.cc` - Fixed deoverlap bug
- `db/view/view.cc` - Fixed deoverlap bug, enhanced documentation
### New Files
- `query/position_range_utils.hh` - Utility functions for safe range operations
- `test/boost/position_range_utils_test.cc` - Unit tests for utilities
### Documentation
- `docs/dev/clustering-range-to-position-range-migration.md` - Migration guide
- `docs/dev/position-range-rpc-migration.md` - RPC migration plan
- `CLUSTERING_RANGE_MIGRATION.md` - This summary document
## Impact and Benefits
### Immediate Benefits ✅
- **Fixed critical bugs**: Two production code bugs in `cas_request.cc` and `view.cc` that could cause incorrect query results
- **Safe operations**: Developers can now use utility functions that guarantee correct deoverlap and intersection
- **Future-proof**: Infrastructure is in place for gradual migration
### Future Benefits 🔄
- **Correctness**: All clustering range operations will be correct by construction
- **Maintainability**: Clearer code using position_range instead of complex interval semantics
- **Performance**: Potential optimizations from simpler position-based comparisons
## Testing Strategy
### Unit Tests ✅
- `test/boost/position_range_utils_test.cc` validates utility functions
- Existing tests in `test/boost/mutation_test.cc` use clustering_interval_set
- Tests in `test/boost/mvcc_test.cc` validate clustering_interval_set behavior
### Integration Testing (To Do)
- Test RPC backward compatibility during rolling upgrades
- Test mixed-version clusters
- Validate query correctness with position_range
### Performance Testing (To Do)
- Benchmark conversion overhead
- Compare memory usage
- Measure query latency impact
## Migration Timeline
- **Week 1-2**: ✅ Foundation and critical bug fixes (COMPLETED)
- Feature flag
- IDL support
- Bug fixes in cas_request.cc and view.cc
- Utility functions and tests
- Documentation
- **Week 3-4**: 🔄 RPC migration (IN PROGRESS)
- Define v2 IDL types
- Implement new RPC verbs
- Add feature-gated selection
- **Week 5-8**: 🔄 Internal refactoring (PLANNED)
- Systematic replacement in internal code
- Update readers and iterators
- Performance validation
- **Week 9+**: 🔄 Validation and rollout (PLANNED)
- Comprehensive testing
- Rolling upgrade validation
- Production deployment
## Key Takeaways
1. **clustering_interval_set is your friend**: When working with clustering ranges, use clustering_interval_set for set operations instead of raw interval operations.
2. **Use utility functions**: The helpers in `query/position_range_utils.hh` provide safe alternatives to buggy operations.
3. **RPC requires care**: Backward compatibility is critical. Always use feature flags for RPC changes.
4. **Incremental approach**: This is a large refactoring. Do it incrementally, with tests at each step.
5. **Document as you go**: Good documentation (like this) helps future developers understand the context and rationale.
## References
- `mutation/position_in_partition.hh` - position_range definition
- `keys/clustering_interval_set.hh` - Safe clustering range operations
- `query/query-request.hh` - clustering_range definition and warnings
- Issues: #22817, #21604, #8157
- Feature service: `gms/feature_service.hh`

View File

@@ -100,12 +100,10 @@ future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& r
}
future<> cache::prune_all() noexcept {
for (auto it = _roles.begin(); it != _roles.end(); ) {
for (auto it = _roles.begin(); it != _roles.end(); it++) {
if (it->second->version != _current_version) {
_roles.erase(it++);
_roles.erase(it);
co_await coroutine::maybe_yield();
} else {
++it;
}
}
co_return;

View File

@@ -445,7 +445,6 @@ ldap_tests = set([
scylla_tests = set([
'test/boost/combined_tests',
'test/boost/UUID_test',
'test/boost/url_parse_test',
'test/boost/advanced_rpc_compressor_test',
'test/boost/allocation_strategy_test',
'test/boost/alternator_unit_test',
@@ -1648,7 +1647,6 @@ deps['test/boost/bytes_ostream_test'] = [
]
deps['test/boost/input_stream_test'] = ['test/boost/input_stream_test.cc']
deps['test/boost/UUID_test'] = ['clocks-impl.cc', 'utils/UUID_gen.cc', 'test/boost/UUID_test.cc', 'utils/uuid.cc', 'utils/dynamic_bitset.cc', 'utils/hashers.cc', 'utils/on_internal_error.cc']
deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test.cc', ]
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']

View File

@@ -575,15 +575,6 @@ usingTimeoutServiceLevelClauseObjective[std::unique_ptr<cql3::attributes::raw>&
| serviceLevel sl_name=serviceLevelOrRoleName { attrs->service_level = std::move(sl_name); }
;
usingTimeoutConcurrencyClause[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_USING usingTimeoutConcurrencyClauseObjective[attrs] ( K_AND usingTimeoutConcurrencyClauseObjective[attrs] )*
;
usingTimeoutConcurrencyClauseObjective[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_TIMEOUT to=term { attrs->timeout = std::move(to); }
| K_CONCURRENCY c=term { attrs->concurrency = std::move(c); }
;
/**
* UPDATE <CF>
* USING TIMESTAMP <long>
@@ -675,7 +666,7 @@ pruneMaterializedViewStatement returns [std::unique_ptr<raw::select_statement> e
auto attrs = std::make_unique<cql3::attributes::raw>();
expression wclause = conjunction{};
}
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingTimeoutConcurrencyClause[attrs] )?
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingClause[attrs] )?
{
auto params = make_lw_shared<raw::select_statement::parameters>(std::move(orderings), is_distinct, allow_filtering, statement_subtype, bypass_cache);
return std::make_unique<raw::select_statement>(std::move(cf), std::move(params),
@@ -2379,7 +2370,6 @@ K_LIKE: L I K E;
K_TIMEOUT: T I M E O U T;
K_PRUNE: P R U N E;
K_CONCURRENCY: C O N C U R R E N C Y;
K_EXECUTE: E X E C U T E;

View File

@@ -20,21 +20,19 @@
namespace cql3 {
std::unique_ptr<attributes> attributes::none() {
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}, {}}};
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}}};
}
attributes::attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency)
std::optional<sstring> service_level)
: _timestamp_unset_guard(timestamp)
, _timestamp{std::move(timestamp)}
, _time_to_live_unset_guard(time_to_live)
, _time_to_live{std::move(time_to_live)}
, _timeout{std::move(timeout)}
, _service_level(std::move(service_level))
, _concurrency{std::move(concurrency)}
{ }
bool attributes::is_timestamp_set() const {
@@ -53,10 +51,6 @@ bool attributes::is_service_level_set() const {
return bool(_service_level);
}
bool attributes::is_concurrency_set() const {
return bool(_concurrency);
}
int64_t attributes::get_timestamp(int64_t now, const query_options& options) {
if (!_timestamp.has_value() || _timestamp_unset_guard.is_unset(options)) {
return now;
@@ -129,27 +123,6 @@ qos::service_level_options attributes::get_service_level(qos::service_level_cont
return sl_controller.get_service_level(sl_name).slo;
}
std::optional<int32_t> attributes::get_concurrency(const query_options& options) const {
if (!_concurrency.has_value()) {
return std::nullopt;
}
cql3::raw_value concurrency_raw = expr::evaluate(*_concurrency, options);
if (concurrency_raw.is_null()) {
throw exceptions::invalid_request_exception("Invalid null value of concurrency");
}
int32_t concurrency;
try {
concurrency = concurrency_raw.view().validate_and_deserialize<int32_t>(*int32_type);
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception("Invalid concurrency value");
}
if (concurrency <= 0) {
throw exceptions::invalid_request_exception("Concurrency must be a positive integer");
}
return concurrency;
}
void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timestamp.has_value()) {
expr::fill_prepare_context(*_timestamp, ctx);
@@ -160,13 +133,10 @@ void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timeout.has_value()) {
expr::fill_prepare_context(*_timeout, ctx);
}
if (_concurrency.has_value()) {
expr::fill_prepare_context(*_concurrency, ctx);
}
}
std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const {
std::optional<expr::expression> ts, ttl, to, conc;
std::optional<expr::expression> ts, ttl, to;
if (timestamp.has_value()) {
ts = prepare_expression(*timestamp, db, ks_name, nullptr, timestamp_receiver(ks_name, cf_name));
@@ -183,12 +153,7 @@ std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database d
verify_no_aggregate_functions(*timeout, "USING clause");
}
if (concurrency.has_value()) {
conc = prepare_expression(*concurrency, db, ks_name, nullptr, concurrency_receiver(ks_name, cf_name));
verify_no_aggregate_functions(*concurrency, "USING clause");
}
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level), std::move(conc)}};
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level)}};
}
lw_shared_ptr<column_specification> attributes::raw::timestamp_receiver(const sstring& ks_name, const sstring& cf_name) const {
@@ -203,8 +168,4 @@ lw_shared_ptr<column_specification> attributes::raw::timeout_receiver(const sstr
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[timeout]", true), duration_type);
}
lw_shared_ptr<column_specification> attributes::raw::concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const {
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[concurrency]", true), data_type_for<int32_t>());
}
}

View File

@@ -36,15 +36,13 @@ private:
std::optional<cql3::expr::expression> _time_to_live;
std::optional<cql3::expr::expression> _timeout;
std::optional<sstring> _service_level;
std::optional<cql3::expr::expression> _concurrency;
public:
static std::unique_ptr<attributes> none();
private:
attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency);
std::optional<sstring> service_level);
public:
bool is_timestamp_set() const;
@@ -54,8 +52,6 @@ public:
bool is_service_level_set() const;
bool is_concurrency_set() const;
int64_t get_timestamp(int64_t now, const query_options& options);
std::optional<int32_t> get_time_to_live(const query_options& options);
@@ -64,8 +60,6 @@ public:
qos::service_level_options get_service_level(qos::service_level_controller& sl_controller) const;
std::optional<int32_t> get_concurrency(const query_options& options) const;
void fill_prepare_context(prepare_context& ctx);
class raw final {
@@ -74,7 +68,6 @@ public:
std::optional<cql3::expr::expression> time_to_live;
std::optional<cql3::expr::expression> timeout;
std::optional<sstring> service_level;
std::optional<cql3::expr::expression> concurrency;
std::unique_ptr<attributes> prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const;
private:
@@ -83,8 +76,6 @@ public:
lw_shared_ptr<column_specification> time_to_live_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> timeout_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const;
};
};

View File

@@ -19,7 +19,6 @@
#include "types/map.hh"
#include "service/storage_proxy.hh"
#include "cql3/query_processor.hh"
#include "keys/clustering_interval_set.hh"
namespace cql3::statements {
@@ -88,9 +87,8 @@ lw_shared_ptr<query::read_command> cas_request::read_command(query_processor& qp
ranges.emplace_back(query::clustering_range::make_open_ended_both_sides());
max_rows = 1;
} else {
// Use clustering_interval_set to correctly deoverlap ranges (fixes scylladb#22817 and scylladb#21604)
clustering_interval_set interval_set(*_schema, ranges);
ranges = interval_set.to_clustering_row_ranges();
// WARNING: clustering_range::deoverlap can return incorrect results - refer to scylladb#22817 and scylladb#21604
ranges = query::clustering_range::deoverlap(std::move(ranges), clustering_key::tri_compare(*_schema));
}
auto options = update_parameters::options;
options.set(query::partition_slice::option::always_return_static_content);

View File

@@ -279,15 +279,11 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
}
// Regular secondary indexes require rf-rack-validity.
// Custom indexes need to validate this property themselves, if they need it.
if (!_properties || !_properties->custom_class) {
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
validate_for_local_index(*schema);

View File

@@ -21,7 +21,7 @@ namespace cql3 {
namespace statements {
static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, std::vector<query::clustering_range> clustering_bounds, view_ptr view,
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration, size_t concurrency) {
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration) {
auto key_columns = std::ranges::to<std::vector<const column_definition*>>(
view->all_columns()
| std::views::filter([] (const column_definition& cdef) { return cdef.is_primary_key(); })
@@ -35,7 +35,7 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
auto p = service::pager::query_pagers::ghost_row_deleting_pager(schema_ptr(view), selection, state,
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration, concurrency);
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration);
int32_t page_size = std::max(options.get_page_size(), 1000);
auto now = gc_clock::now();
@@ -62,8 +62,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> prune_materialized
auto timeout_duration = get_timeout(state.get_client_state(), options);
dht::partition_range_vector key_ranges = _restrictions->get_partition_key_ranges(options);
std::vector<query::clustering_range> clustering_bounds = _restrictions->get_clustering_bounds(options);
size_t concurrency = _attrs->is_concurrency_set() ? _attrs->get_concurrency(options).value() : 1;
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration, concurrency).then([] {
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(::make_shared<cql_transport::messages::result_message::void_message>());
});
}

View File

@@ -1172,17 +1172,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"* default_weight: (Default: 1 **) How many requests are handled during each turn of the RoundRobin.\n"
"* weights: (Default: Keyspace: 1) Takes a list of keyspaces. It sets how many requests are handled during each turn of the RoundRobin, based on the request_scheduler_id.")
/**
* @Group Vector search settings
* @GroupDescription Settings for configuring and tuning vector search functionality.
*/
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
/**
* @Group Security properties
* @GroupDescription Server and client security settings.
*/
@@ -1470,6 +1459,13 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
, vector_store_primary_uri(
this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "", "A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in vector_store_primary_uri and vector_store_secondary_uri. The available options are:\n"
"* truststore: (Default: <not set. use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")

View File

@@ -344,9 +344,6 @@ public:
named_value<sstring> request_scheduler;
named_value<sstring> request_scheduler_id;
named_value<string_map> request_scheduler_options;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<sstring> authenticator;
named_value<sstring> internode_authenticator;
named_value<sstring> authorizer;
@@ -474,6 +471,10 @@ public:
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<bool> abort_on_ebadf;
named_value<bool> sanitizer_report_backtrace;

View File

@@ -1287,6 +1287,15 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
, _partitions(dht::raw_token_less_comparator{})
, _underlying(src())
, _snapshot_source(std::move(src))
, _update_section(logalloc::allocating_section_namer([this] (fmt::memory_buffer& buf) {
fmt::format_to(std::back_inserter(buf), "{}.{}_update", _schema->ks_name(), _schema->cf_name());
}))
, _populate_section(logalloc::allocating_section_namer([this] (fmt::memory_buffer& buf) {
fmt::format_to(std::back_inserter(buf), "{}.{}_populate", _schema->ks_name(), _schema->cf_name());
}))
, _read_section(logalloc::allocating_section_namer([this] (fmt::memory_buffer& buf) {
fmt::format_to(std::back_inserter(buf), "{}.{}_read", _schema->ks_name(), _schema->cf_name());
}))
{
try {
with_allocator(_tracker.allocator(), [this, cont] {

View File

@@ -9,8 +9,6 @@
#include "query/query-result-reader.hh"
#include "replica/database_fwd.hh"
#include "db/timeout_clock.hh"
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
namespace service {
class storage_proxy;
@@ -27,14 +25,8 @@ class delete_ghost_rows_visitor {
replica::table& _view_table;
schema_ptr _base_schema;
std::optional<partition_key> _view_pk;
db::timeout_semaphore _concurrency_semaphore;
seastar::gate _gate;
std::exception_ptr& _ex;
public:
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex);
delete_ghost_rows_visitor(delete_ghost_rows_visitor&&) = default;
~delete_ghost_rows_visitor() noexcept;
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration);
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
}
@@ -53,9 +45,6 @@ public:
uint32_t accept_partition_end(const query::result_row_view& static_row) {
return 0;
}
private:
future<> do_accept_new_row(partition_key pk, clustering_key ck);
};
} //namespace db::view

View File

@@ -78,7 +78,6 @@
#include "readers/multishard.hh"
#include "readers/filtering.hh"
#include "delete_ghost_rows_visitor.hh"
#include "keys/clustering_interval_set.hh"
#include "locator/host_id.hh"
#include "cartesian_product.hh"
#include "idl/view.dist.hh"
@@ -1659,10 +1658,7 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(data_d
const dht::decorated_key& key,
const mutation_partition& mp,
const std::vector<view_ptr>& views) {
// FIXME: This function should be refactored to use position_range and clustering_interval_set
// instead of interval<clustering_key_prefix_view> to avoid issues with intersection and deoverlap.
// See scylladb#22817, scylladb#21604, and scylladb#8157 for details.
// The current implementation uses unsafe operations that can return incorrect results.
// WARNING: interval<clustering_key_prefix_view> is unsafe - refer to scylladb#22817 and scylladb#21604
utils::chunked_vector<interval<clustering_key_prefix_view>> row_ranges;
utils::chunked_vector<interval<clustering_key_prefix_view>> view_row_ranges;
clustering_key_prefix_view::tri_compare cmp(base);
@@ -1688,10 +1684,7 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(data_d
bound_view::to_interval_bound<interval>(rt.start_bound()),
bound_view::to_interval_bound<interval>(rt.end_bound()));
for (auto&& vr : view_row_ranges) {
// FIXME: interval<clustering_key_prefix_view>::intersection can return incorrect results
// (scylladb#8157, scylladb#21604). This should be refactored to use position_range.
// Proper fix: Convert to position_range, check overlap using position_range::overlaps(),
// compute intersection manually with position_in_partition comparisons.
// WARNING: interval<clustering_key_prefix_view>::intersection can return incorrect results - refer to scylladb#8157 and scylladb#21604
auto overlap = rtr.intersection(vr, cmp);
if (overlap) {
row_ranges.push_back(std::move(overlap).value());
@@ -1715,18 +1708,15 @@ future<query::clustering_row_ranges> calculate_affected_clustering_ranges(data_d
// content, in case the view includes a column that is not included in
// this mutation.
// FIXME: interval<clustering_key_prefix_view>::deoverlap can return incorrect results (scylladb#22817)
// Proper fix: Convert row_ranges to clustering_row_ranges, then use clustering_interval_set
// which handles deoverlapping correctly via position_range internally.
query::clustering_row_ranges temp_ranges;
temp_ranges.reserve(row_ranges.size());
for (auto&& r : row_ranges) {
temp_ranges.emplace_back(std::move(r).transform([] (auto&& ckv) { return clustering_key_prefix(ckv); }));
query::clustering_row_ranges result_ranges;
// FIXME: scylladb#22817 - interval<clustering_key_prefix_view>::deoverlap can return incorrect results
auto deoverlapped_ranges = interval<clustering_key_prefix_view>::deoverlap(std::move(row_ranges), cmp);
result_ranges.reserve(deoverlapped_ranges.size());
for (auto&& r : deoverlapped_ranges) {
result_ranges.emplace_back(std::move(r).transform([] (auto&& ckv) { return clustering_key_prefix(ckv); }));
co_await coroutine::maybe_yield();
}
// Use clustering_interval_set for correct deoverlapping (fixes scylladb#22817)
clustering_interval_set interval_set(base, temp_ranges);
co_return interval_set.to_clustering_row_ranges();
co_return result_ranges;
}
bool needs_static_row(const mutation_partition& mp, const std::vector<view_ptr>& views) {
@@ -3607,7 +3597,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem
})
{ }
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex)
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration)
: _proxy(proxy)
, _state(state)
, _timeout_duration(timeout_duration)
@@ -3615,20 +3605,8 @@ delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& pro
, _view_table(_proxy.get_db().local().find_column_family(view))
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
, _view_pk()
, _concurrency_semaphore(concurrency)
, _ex(ex)
{}
delete_ghost_rows_visitor::~delete_ghost_rows_visitor() noexcept {
try {
_gate.close().get();
} catch (...) {
// Closing the gate should never throw, but if it does anyway, capture the exception.
_ex = std::current_exception();
}
}
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
SCYLLA_ASSERT(thread::running_in_thread());
_view_pk = key;
@@ -3636,18 +3614,7 @@ void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, u
// Assumes running in seastar::thread
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
auto units = get_units(_concurrency_semaphore, 1).get();
(void)seastar::try_with_gate(_gate, [this, pk = _view_pk.value(), units = std::move(units), ck] () mutable {
return do_accept_new_row(std::move(pk), std::move(ck)).then_wrapped([this, units = std::move(units)] (future<>&& f) mutable {
if (f.failed()) {
_ex = f.get_exception();
}
});
});
}
future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clustering_key ck) {
auto view_exploded_pk = pk.explode();
auto view_exploded_pk = _view_pk->explode();
auto view_exploded_ck = ck.explode();
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
@@ -3682,17 +3649,17 @@ future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clusteri
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
auto timeout = db::timeout_clock::now() + _timeout_duration;
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
auto base_qr = co_await _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts);
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get();
query::result& result = *base_qr.query_result;
auto delete_ghost_row = [&]() -> future<> {
mutation m(_view, pk);
auto delete_ghost_row = [&]() {
mutation m(_view, *_view_pk);
auto& row = m.partition().clustered_row(*_view, ck);
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
timeout = db::timeout_clock::now() + _timeout_duration;
return _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no);
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
};
if (result.row_count().value_or(0) == 0) {
co_await delete_ghost_row();
delete_ghost_row();
} else if (!view_key_cols_not_in_base_key.empty()) {
if (result.row_count().value_or(0) != 1) {
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
@@ -3702,7 +3669,7 @@ future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clusteri
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
co_await delete_ghost_row();
delete_ghost_row();
break;
}
}

View File

@@ -2,6 +2,7 @@ etc/default/scylla-server
etc/default/scylla-housekeeping
etc/scylla.d/*.conf
etc/bash_completion.d/nodetool-completion
opt/scylladb/share/p11-kit/modules/*
opt/scylladb/share/doc/scylla/*
opt/scylladb/share/doc/scylla/licenses/
usr/lib/systemd/system/*.timer

View File

@@ -122,6 +122,7 @@ ln -sfT /etc/scylla /var/lib/scylla/conf
%config(noreplace) %{_sysconfdir}/sysconfig/scylla-housekeeping
%attr(0755,root,root) %dir %{_sysconfdir}/scylla.d
%config(noreplace) %{_sysconfdir}/scylla.d/*.conf
/opt/scylladb/share/p11-kit/modules/*
/opt/scylladb/share/doc/scylla/*
%{_unitdir}/scylla-fstrim.service
%{_unitdir}/scylla-housekeeping-daily.service

View File

@@ -1,18 +1,6 @@
### a dictionary of redirections
#old path: new path
# Move the diver information to another project
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html
/stable/using-scylla/drivers/dynamo-drivers/index.html: https://docs.scylladb.com/stable/drivers/dynamo-drivers.html
/stable/using-scylla/drivers/cql-drivers/index.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-python-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-java-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-go-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-gocqlx-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-cpp-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
/stable/using-scylla/drivers/cql-drivers/scylla-rust-driver.html: https://docs.scylladb.com/stable/drivers/cql-drivers.html
# Redirect 2025.1 upgrade guides that are not on master but were indexed by Google (404 reported)
/master/upgrade/upgrade-guides/upgrade-guide-from-2024.x-to-2025.1/upgrade-guide-from-2024.x-to-2025.1.html: https://docs.scylladb.com/manual/stable/upgrade/index.html

View File

@@ -106,15 +106,6 @@ which is recommended in order to make the operation less heavyweight
and allow for running multiple parallel pruning statements for non-overlapping
token ranges.
By default, the PRUNE MATERIALIZED VIEW statement is relatively slow, only
performing one base read or write at a time. This can be changed with the
USING CONCURRENCY clause. If the clause is used, the concurrency of reads
and writes from the base table will be allowed to increase up to the specified
value. For example, to run the PRUNE with 100 parallel reads/writes, you can use:
```cql
PRUNE MATERIALIZED VIEW my_view WHERE v = 19 USING CONCURRENCY 100;
```
## Synchronous materialized views
Usually, when a table with materialized views is updated, the update to the

View File

@@ -1,156 +0,0 @@
# Clustering Range to Position Range Migration
## Background
The `clustering_range` type (alias for `interval<clustering_key_prefix>`) has known issues with operations like `intersection()` and `deoverlap()` that can return incorrect results due to the complexity of comparing clustering key prefixes with different inclusiveness on bounds.
See issues:
- #22817 - `interval<clustering_key_prefix>::deoverlap` can return incorrect results
- #21604 - Problems with clustering range operations
- #8157 - `interval<clustering_key_prefix_view>::intersection` can return incorrect results
The `position_range` class was introduced as a safer alternative that represents clustering ranges as a pair of `position_in_partition` objects, avoiding the problematic interval semantics.
## Migration Strategy
### 1. Feature Flag
A new `gms::feature` called `"POSITION_RANGE"` has been added to `gms/feature_service.hh`. This feature gates the use of position_range in RPC interfaces to ensure backward compatibility during rolling upgrades.
### 2. IDL Support
The `position_range` class has been added to `idl/position_in_partition.idl.hh` to support serialization in RPC verbs.
### 3. Internal Code Migration
Internal code should be migrated to use `position_range` instead of `clustering_range` wherever possible. This migration should be done incrementally:
#### Priority Areas
1. **Functions with known problematic operations**:
- Any code using `clustering_range::intersection()`
- Any code using `clustering_range::deoverlap()`
- See marked locations in:
- `db/view/view.cc` (lines 1687-1713)
- `cql3/statements/cas_request.cc` (line 90-91)
2. **Internal data structures**:
- Readers and iterators that track position ranges
- Cache structures
- Query processing internals
3. **Utility functions**:
- Helper functions that operate on ranges
- Range manipulation and transformation functions
#### Conversion Utilities
Existing converters:
- `position_range::from_range(const query::clustering_range&)` - Convert clustering_range to position_range
- `position_range_to_clustering_range(const position_range&, const schema&)` - Convert position_range to clustering_range (returns optional)
The `clustering_interval_set` class already demonstrates best practices - it uses `position_range` internally and provides conversion methods to/from `clustering_row_ranges`.
Helper utilities in `query/position_range_utils.hh`:
- `clustering_row_ranges_to_position_ranges()` - Batch convert clustering ranges to position ranges
- `position_ranges_to_clustering_row_ranges()` - Batch convert position ranges to clustering ranges
- `deoverlap_clustering_row_ranges()` - Safely deoverlap ranges using clustering_interval_set
- `intersect_clustering_row_ranges()` - Safely intersect ranges using clustering_interval_set
#### Migration Pattern
```cpp
// OLD CODE (problematic):
void process_ranges(const query::clustering_row_ranges& ranges) {
auto deoverlapped = query::clustering_range::deoverlap(ranges, cmp);
// ... use deoverlapped ranges
}
// NEW CODE (using position_range):
void process_ranges(const schema& s, const query::clustering_row_ranges& ranges) {
clustering_interval_set interval_set(s, ranges);
// interval_set handles deoverlapping correctly internally
for (const position_range& r : interval_set) {
// ... use position ranges
}
// Convert back if needed for compatibility
auto result_ranges = interval_set.to_clustering_row_ranges();
}
```
### 4. RPC Interface Migration
RPC interfaces must maintain backward compatibility. The strategy is:
1. Keep existing RPC verbs that use `clustering_range` (in IDL: `std::vector<interval<clustering_key_prefix>>`)
2. Add new RPC verbs that use `position_range`
3. Use the new verbs when `feature_service.position_range` is enabled
#### Example RPC Migration
In `idl/storage_proxy.idl.hh`:
```cpp
// Existing verb (keep for compatibility)
verb [[with_client_info, with_timeout]] read_data (
query::read_command cmd [[ref]],
::compat::wrapping_partition_range pr,
...
) -> query::result [[lw_shared_ptr]], ...;
// New verb using position_range (to be added)
verb [[with_client_info, with_timeout]] read_data_v2 (
query::read_command_v2 cmd [[ref]],
::compat::wrapping_partition_range pr,
...
) -> query::result [[lw_shared_ptr]], ...;
```
Where `read_command_v2` would use a `partition_slice_v2` that contains position ranges instead of clustering ranges.
#### Feature-Gated RPC Selection
```cpp
future<query::result> storage_proxy::query_data(...) {
if (_features.position_range) {
return rpc_verb_read_data_v2(...);
} else {
return rpc_verb_read_data(...);
}
}
```
### 5. Testing
Tests should verify:
1. Correct conversion between clustering_range and position_range
2. Correct behavior of position_range operations
3. RPC compatibility with both old and new verbs
4. Feature flag behavior during rolling upgrades
### 6. Known Limitations
- Not all clustering_range uses can be eliminated - some external interfaces may require them
- The conversion from position_range to clustering_range can return `nullopt` for empty ranges
- Performance implications should be measured for hot paths
## Implementation Checklist
- [x] Add `position_range` feature flag to `gms/feature_service.hh`
- [x] Add `position_range` IDL definition to `idl/position_in_partition.idl.hh`
- [ ] Create new RPC verbs using position_range
- [ ] Add feature-gated RPC selection logic
- [ ] Migrate high-priority problematic code paths:
- [ ] Fix intersection in `db/view/view.cc:1687`
- [ ] Fix deoverlap in `db/view/view.cc:1712`
- [ ] Fix deoverlap in `cql3/statements/cas_request.cc:90`
- [ ] Migrate internal data structures systematically
- [ ] Add comprehensive tests
- [ ] Performance benchmarking
- [ ] Documentation updates
## References
- `mutation/position_in_partition.hh` - position_range definition
- `keys/clustering_interval_set.hh` - Example of correct position_range usage
- Issues: #22817, #21604, #8157

View File

@@ -1,271 +0,0 @@
# Position Range RPC Migration Plan
## Overview
This document outlines the plan for migrating RPC interfaces from `clustering_range` to `position_range` in a backward-compatible manner using feature flags.
## Background
The current RPC interfaces use `clustering_range` (defined as `interval<clustering_key_prefix>`) in structures like `partition_slice` and `read_command`. To enable the use of `position_range` internally while maintaining backward compatibility, we need to:
1. Create new RPC message types that use `position_range`
2. Add new RPC verbs that accept these new types
3. Feature-gate the use of these new verbs based on cluster capabilities
## Feature Flag
A new feature flag `position_range` has been added to `gms::feature_service`:
```cpp
gms::feature position_range { *this, "POSITION_RANGE"sv };
```
This feature will be enabled when all nodes in the cluster support the new RPC verbs.
## IDL Changes
### Already Added
The `position_range` class has been added to `idl/position_in_partition.idl.hh`:
```idl
class position_range {
position_in_partition start();
position_in_partition end();
};
```
### To Be Added
New IDL types need to be created for RPC migration:
#### 1. partition_slice_v2 (in `idl/read_command.idl.hh`)
```idl
namespace query {
class partition_slice_v2 {
std::vector<position_range> default_row_ranges();
utils::small_vector<uint32_t, 8> static_columns;
utils::small_vector<uint32_t, 8> regular_columns;
query::partition_slice::option_set options;
std::unique_ptr<query::specific_ranges> get_specific_ranges();
cql_serialization_format cql_format();
uint32_t partition_row_limit_low_bits();
uint32_t partition_row_limit_high_bits();
};
class read_command_v2 {
table_id cf_id;
table_schema_version schema_version;
query::partition_slice_v2 slice;
uint32_t row_limit_low_bits;
std::chrono::time_point<gc_clock, gc_clock::duration> timestamp;
std::optional<tracing::trace_info> trace_info;
uint32_t partition_limit;
query_id query_uuid;
query::is_first_page is_first_page;
std::optional<query::max_result_size> max_result_size;
uint32_t row_limit_high_bits;
uint64_t tombstone_limit;
};
}
```
#### 2. New RPC Verbs (in `idl/storage_proxy.idl.hh`)
```idl
// New verbs using position_range (to be used when position_range feature is enabled)
verb [[with_client_info, with_timeout]] read_data_v2 (
query::read_command_v2 cmd [[ref]],
::compat::wrapping_partition_range pr,
query::digest_algorithm digest,
db::per_partition_rate_limit::info rate_limit_info,
service::fencing_token fence
) -> query::result [[lw_shared_ptr]],
cache_temperature,
replica::exception_variant;
verb [[with_client_info, with_timeout]] read_mutation_data_v2 (
query::read_command_v2 cmd [[ref]],
::compat::wrapping_partition_range pr,
service::fencing_token fence
) -> reconcilable_result [[lw_shared_ptr]],
cache_temperature,
replica::exception_variant;
verb [[with_client_info, with_timeout]] read_digest_v2 (
query::read_command_v2 cmd [[ref]],
::compat::wrapping_partition_range pr,
query::digest_algorithm digest,
db::per_partition_rate_limit::info rate_limit_info,
service::fencing_token fence
) -> query::result_digest,
api::timestamp_type,
cache_temperature,
replica::exception_variant,
std::optional<full_position>;
```
## Implementation Changes
### 1. C++ Type Definitions
Create C++ implementations for the new IDL types:
```cpp
// In query/query-request.hh or a new header
namespace query {
class partition_slice_v2 {
std::vector<position_range> _row_ranges;
// ... other members same as partition_slice
public:
// Constructors
partition_slice_v2(std::vector<position_range> row_ranges, ...);
// Conversion methods
static partition_slice_v2 from_legacy(const partition_slice& legacy);
partition_slice to_legacy(const schema& s) const;
// Accessors
const std::vector<position_range>& row_ranges() const { return _row_ranges; }
};
class read_command_v2 {
partition_slice_v2 slice;
// ... other members same as read_command
public:
// Constructors
read_command_v2(...);
// Conversion methods
static read_command_v2 from_legacy(const read_command& legacy);
read_command to_legacy(const schema& s) const;
};
}
```
### 2. RPC Handler Implementation
In `service/storage_proxy.cc`, add handlers for the new verbs:
```cpp
future<rpc::tuple<query::result_lw_shared_ptr, cache_temperature, replica::exception_variant>>
storage_proxy::read_data_v2_handler(
query::read_command_v2&& cmd,
compat::wrapping_partition_range&& pr,
query::digest_algorithm da,
db::per_partition_rate_limit::info rate_limit_info,
service::fencing_token fence) {
// Convert to legacy format if needed internally
// Or better: refactor internal implementation to work with position_range
auto legacy_cmd = cmd.to_legacy(*get_schema(cmd.cf_id));
// Call existing implementation
return read_data_handler(std::move(legacy_cmd), std::move(pr), da, rate_limit_info, fence);
}
```
### 3. RPC Client Selection
In code that invokes RPCs (e.g., `storage_proxy::query_result`), add feature detection:
```cpp
future<query::result> storage_proxy::query_data(...) {
if (_features.position_range) {
// Use new verb with position_range
auto cmd_v2 = read_command_v2::from_legacy(cmd);
return rpc_verb_read_data_v2(std::move(cmd_v2), ...);
} else {
// Use legacy verb with clustering_range
return rpc_verb_read_data(std::move(cmd), ...);
}
}
```
## Migration Strategy
### Phase 1: Foundation (Complete)
- [x] Add `position_range` feature flag
- [x] Add `position_range` IDL definition
- [x] Fix critical clustering_range bugs using clustering_interval_set
### Phase 2: RPC Infrastructure (To Do)
- [ ] Add `partition_slice_v2` IDL definition
- [ ] Add `read_command_v2` IDL definition
- [ ] Add new RPC verbs (`read_data_v2`, etc.)
- [ ] Implement conversion methods between v1 and v2 types
- [ ] Add RPC handlers for new verbs
### Phase 3: Client Migration (To Do)
- [ ] Update RPC clients to check feature flag
- [ ] Add logic to select appropriate verb based on feature availability
- [ ] Test backward compatibility during rolling upgrades
### Phase 4: Internal Refactoring (To Do)
- [ ] Gradually refactor internal implementations to use position_range natively
- [ ] Remove conversion overhead once both versions are established
- [ ] Update documentation and examples
### Phase 5: Deprecation (Future)
- [ ] Once all production clusters are upgraded, consider deprecating v1 verbs
- [ ] Remove legacy code after sufficient time has passed
## Testing
### Unit Tests
- Test conversion between partition_slice and partition_slice_v2
- Test conversion between read_command and read_command_v2
- Verify that converted types produce equivalent results
### Integration Tests
- Test RPC calls using both old and new verbs
- Verify feature flag behavior during rolling upgrades
- Test mixed-version clusters
### Backward Compatibility Tests
- Ensure old clients can still communicate with new servers
- Ensure new clients fall back to old verbs when feature is disabled
## Performance Considerations
1. **Conversion Overhead**: During the transition period, conversions between v1 and v2 types add overhead. This should be measured and minimized.
2. **Memory Usage**: position_range may have different memory characteristics than clustering_range. Monitor memory usage after migration.
3. **Serialization Size**: Compare wire format sizes to ensure no significant increase in network traffic.
## Risks and Mitigation
### Risk: Conversion Bugs
**Mitigation**: Comprehensive unit tests for all conversion paths, particularly edge cases like empty ranges and open-ended ranges.
### Risk: Feature Flag Synchronization
**Mitigation**: Use standard Scylla feature propagation mechanisms. Ensure feature is only enabled when all nodes support it.
### Risk: Performance Regression
**Mitigation**: Performance benchmarks comparing old and new implementations. Have rollback plan if issues are discovered.
## Alternative Approaches Considered
### 1. Direct Migration Without Feature Flag
**Rejected**: Too risky for rolling upgrades. Would require all-at-once cluster upgrade.
### 2. Transparent Conversion in IDL Layer
**Rejected**: Would hide the distinction between old and new formats, making debugging harder.
### 3. Maintain Both Forever
**Rejected**: Increases maintenance burden without clear benefit once migration is complete.
## References
- Main migration guide: `docs/dev/clustering-range-to-position-range-migration.md`
- Issues: #22817, #21604, #8157
- Feature service: `gms/feature_service.hh`
- IDL definitions: `idl/position_in_partition.idl.hh`, `idl/read_command.idl.hh`

View File

@@ -37,7 +37,7 @@ Getting Started
:id: "getting-started"
:class: my-panel
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
* :doc:`ScyllaDB Drivers</using-scylla/drivers/index>`
* `Get Started Lesson on ScyllaDB University <https://university.scylladb.com/courses/scylla-essentials-overview/lessons/quick-wins-install-and-run-scylla/>`_
* :doc:`CQL Reference </cql/index>`
* :doc:`cqlsh - the CQL shell </cql/cqlsh/>`

View File

@@ -35,7 +35,7 @@ Documentation Highlights
* :doc:`Cluster Management Procedures </operating-scylla/procedures/cluster-management/index>`
* :doc:`Upgrade ScyllaDB </upgrade/index>`
* :doc:`CQL Reference </cql/index>`
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`
* :doc:`Features </features/index>`
ScyllaDB Support

View File

@@ -172,7 +172,7 @@ For example:
* `ScyllaDB Java Driver <https://github.com/scylladb/java-driver/tree/3.7.1-scylla/manual/compression>`_
* `Go Driver <https://godoc.org/github.com/gocql/gocql#Compressor>`_
Refer to `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ for more drivers.
Refer to the :doc:`Drivers Page </using-scylla/drivers/index>` for more drivers.
.. _internode-compression:

View File

@@ -206,7 +206,7 @@ This is 19% of the latency compared to no batching.
Driver Guidelines
-----------------
Use the `ScyllaDB drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ that are available for Java, Python, Go, and C/C++.
Use the :doc:`ScyllaDB drivers </using-scylla/drivers/index>` that are available for Java, Python, Go, and C/C++.
They provide much better performance than third-party drivers because they are shard aware &emdash; they can route requests to the right CPU core (shard).
When the driver starts, it gets the topology of the cluster and therefore it knows exactly which CPU core should get a request.
Our latest shard-aware drivers also improve the efficiency of our Change Data Capture (CDC) feature.

View File

@@ -121,7 +121,7 @@ Driver Compression
This refers to compressing traffic between the client and ScyllaDB.
Verify your client driver is using compressed traffic when connected to ScyllaDB.
As compression is driver settings dependent, please check your client driver manual. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
As compression is driver settings dependent, please check your client driver manual or :doc:`ScyllaDB Drivers </using-scylla/drivers/index>`.
Connectivity
@@ -130,7 +130,7 @@ Connectivity
Drivers Settings
================
* Use shard aware drivers wherever possible. `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ (not third-party drivers) are shard aware.
* Use shard aware drivers wherever possible. :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` (not third-party drivers) are shard aware.
* Configure connection pool - open more connections (>3 per shard) and/Or more clients. See `this blog <https://www.scylladb.com/2019/11/20/maximizing-performance-via-concurrency-while-minimizing-timeouts-in-distributed-databases/>`_.
Management

View File

@@ -25,8 +25,8 @@ Actions
If your cluster is having timeouts during overload, check first if you are not making the overload situation worse through retries, and pay attention to the following:
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your `driver documentation <https://docs.scylladb.com/stable/drivers/index.html>`_ about parameters and defaults.
* Make sure the client retries only after the server has already timed out. Depending on the application this may mean increasing the client-side timeout or decreasing the server-side timeout. Client timeouts are configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults. For the server-side timeout, the ``/etc/scylla/scylla.yaml`` has request-specific timeout settings like ``read_request_timeout_in_ms`` and ``write_request_timeout_in_ms``
* Make sure the client neither runs a speculative retry nor runs it very aggressively. Client-side speculative retry is configured by the driver, check your :doc:`driver documentation </using-scylla/drivers/index>` about parameters and defaults.
* Make sure the server neither runs speculative retry nor runs it based on percentiles (as those can fluctuate aggressively). Server-side speculative retries are a per-table setting that can be changed with the ALTER TABLE command. See the :ref:`documentation <speculative-retry-options>` for details.

View File

@@ -9,19 +9,9 @@ To ensure a successful upgrade, follow
the :doc:`documented upgrade procedures <upgrade-guides/index>` tested by
ScyllaDB. This means that:
* You should follow the upgrade policy:
* Starting with version **2025.4**, upgrades can skip minor versions as long
as they remain within the same major version (for example, upgrading directly
from 2025.1 → 2025.4 is supported).
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
each successive X.Y version must be installed in order, **without skipping
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3
is not supported).
* You cannot skip major versions. Upgrades must move from one major version to
the next using the documented major-version upgrade path.
* You should upgrade to a supported version of ScyllaDB.
See `ScyllaDB Version Support <https://docs.scylladb.com/stable/versioning/version-support.html>`_.
* You should perform the upgrades consecutively - to each successive X.Y
version, **without skipping any major or minor version**, unless there is
a documented upgrade procedure to bypass a version.
* Before you upgrade to the next version, the whole cluster (each node) must
be upgraded to the previous version.
* You cannot perform an upgrade by replacing the nodes in the cluster with new

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

View File

@@ -0,0 +1,141 @@
=====================
ScyllaDB CQL Drivers
=====================
.. toctree::
:titlesonly:
:hidden:
scylla-python-driver
scylla-java-driver
scylla-go-driver
scylla-gocqlx-driver
scylla-cpp-driver
scylla-rust-driver
ScyllaDB Drivers
-----------------
The following ScyllaDB drivers are available:
* :doc:`Python Driver</using-scylla/drivers/cql-drivers/scylla-python-driver>`
* :doc:`Java Driver </using-scylla/drivers/cql-drivers/scylla-java-driver>`
* :doc:`Go Driver </using-scylla/drivers/cql-drivers/scylla-go-driver>`
* :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
* :doc:`C++ Driver </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
* `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
* :doc:`Rust Driver </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
* `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
We recommend using ScyllaDB drivers. All ScyllaDB drivers are shard-aware and provide additional
benefits over third-party drivers.
ScyllaDB supports the CQL binary protocol version 3, so any Apache Cassandra/CQL driver that implements
the same version works with ScyllaDB.
CDC Integration with ScyllaDB Drivers
-------------------------------------------
The following table specifies which ScyllaDB drivers include a library for
:doc:`CDC </features/cdc/cdc-intro>`.
.. list-table::
:widths: 40 60
:header-rows: 1
* - ScyllaDB Driver
- CDC Connector
* - :doc:`Python </using-scylla/drivers/cql-drivers/scylla-python-driver>`
- |x|
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
- |v|
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
- |v|
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
- |x|
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
- |x|
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
- |x|
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
- |v|
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
- |x|
Support for Tablets
-------------------------
The following table specifies which ScyllaDB drivers support
:doc:`tablets </architecture/tablets>` and since which version.
.. list-table::
:widths: 30 35 35
:header-rows: 1
* - ScyllaDB Driver
- Support for Tablets
- Since Version
* - :doc:`Python</using-scylla/drivers/cql-drivers/scylla-python-driver>`
- |v|
- 3.26.5
* - :doc:`Java </using-scylla/drivers/cql-drivers/scylla-java-driver>`
- |v|
- 4.18.0 (Java Driver 4.x)
3.11.5.2 (Java Driver 3.x)
* - :doc:`Go </using-scylla/drivers/cql-drivers/scylla-go-driver>`
- |v|
- 1.13.0
* - :doc:`Go Extension </using-scylla/drivers/cql-drivers/scylla-gocqlx-driver>`
- |x|
- N/A
* - :doc:`C++ </using-scylla/drivers/cql-drivers/scylla-cpp-driver>`
- |x|
- N/A
* - `CPP-over-Rust Driver <https://cpp-rust-driver.docs.scylladb.com/>`_
- |v|
- All versions
* - :doc:`Rust </using-scylla/drivers/cql-drivers/scylla-rust-driver>`
- |v|
- 0.13.0
* - `C# Driver <https://csharp-driver.docs.scylladb.com/>`_
- |v|
- All versions
Driver Support Policy
-------------------------------
We support the **two most recent minor releases** of our drivers.
* We test and validate the latest two minor versions.
* We typically patch only the latest minor release.
We recommend staying up to date with the latest supported versions to receive
updates and fixes.
At a minimum, upgrade your driver when upgrading to a new ScyllaDB version
to ensure compatibility between the driver and the database.
Third-party Drivers
----------------------
You can find the third-party driver documentation on the GitHub pages for each driver:
* `DataStax Java Driver <https://github.com/datastax/java-driver/>`_
* `DataStax Python Driver <https://github.com/datastax/python-driver/>`_
* `DataStax C# Driver <https://github.com/datastax/csharp-driver/>`_
* `DataStax Ruby Driver <https://github.com/datastax/ruby-driver/>`_
* `DataStax Node.js Driver <https://github.com/datastax/nodejs-driver/>`_
* `DataStax C++ Driver <https://github.com/datastax/cpp-driver/>`_
* `DataStax PHP Driver (Supported versions: 7.1) <https://github.com/datastax/php-driver>`_
* `He4rt PHP Driver (Supported versions: 8.1 and 8.2) <https://github.com/he4rt/scylladb-php-driver/>`_
* `Scala Phantom Project <https://github.com/outworkers/phantom>`_
* `Xandra Elixir Driver <https://github.com/lexhide/xandra>`_
* `Exandra Elixir Driver <https://github.com/vinniefranco/exandra>`_
Learn about ScyllaDB Drivers on ScyllaDB University
----------------------------------------------------
The free `Using ScyllaDB Drivers course <https://university.scylladb.com/courses/using-scylla-drivers/>`_
on ScyllaDB University covers the use of drivers in multiple languages to interact with a ScyllaDB
cluster. The languages covered include Java, CPP, Rust, Golang, Python, Node.JS, Scala, and others.

View File

@@ -0,0 +1,16 @@
===================
ScyllaDB C++ Driver
===================
The ScyllaDB C++ driver is a modern, feature-rich and **shard-aware** C/C++ client library for ScyllaDB using exclusively Cassandras binary protocol and Cassandra Query Language v3.
This driver is forked from Datastax cpp-driver.
Read the `documentation <https://cpp-driver.docs.scylladb.com>`_ to get started or visit the Github project `ScyllaDB C++ driver <https://github.com/scylladb/cpp-driver>`_.
More Information
----------------
* `C++ Driver Documentation <https://cpp-driver.docs.scylladb.com>`_
* `C/C++ Driver course at ScyllaDB University <https://university.scylladb.com/courses/using-scylla-drivers/lessons/cpp-driver-part-1/>`_
* `Blog: A Shard-Aware ScyllaDB C/C++ Driver <https://www.scylladb.com/2021/03/18/a-shard-aware-scylla-c-c-driver/>`_

View File

@@ -0,0 +1,28 @@
==================
ScyllaDB Go Driver
==================
The `ScyllaDB Go driver <https://github.com/scylladb/gocql>`_ is shard aware and contains extensions for a tokenAwareHostPolicy supported by ScyllaDB 2.3 and onwards.
It is is a fork of the `GoCQL Driver <https://github.com/gocql/gocql>`_ but has been enhanced with capabilities that take advantage of ScyllaDB's unique architecture.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
The protocol extension spec is `available here <https://github.com/scylladb/scylla/blob/master/docs/dev/protocol-extensions.md>`_.
The ScyllaDB Go Driver is a drop-in replacement for gocql.
As such, no code changes are needed to use this driver.
All you need to do is rebuild using the ``replace`` directive in your ``mod`` file.
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/gocql>`_.
Using CDC with Go
-----------------
When writing applications, you can now use our `Go Library <https://github.com/scylladb/scylla-cdc-go>`_ to simplify writing applications that read from ScyllaDB CDC.
More information
----------------
* `ScyllaDB Gocql Driver project page on GitHub <https://github.com/scylladb/gocql>`_ - contains the source code as well as a readme and documentation files.
* `ScyllaDB University: Golang and ScyllaDB <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-1/>`_
A three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Gocql driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Go application.

View File

@@ -0,0 +1,16 @@
=========================
ScyllaDB Gocql Extension
=========================
The ScyllaDB Gocqlx is an extension to gocql that provides usability features.
With gocqlx, you can bind the query parameters from maps and structs, use named query parameters (``:identifier``), and scan the query results into structs and slices.
The driver includes a fluent and flexible CQL query builder and a database migrations module.
More information
----------------
* `ScyllaDB Gocqlx Driver project page on GitHub <https://github.com/scylladb/gocqlx>`_ - contains the source code as well as a readme and documentation files.
* `ScyllaDB University: Golang and ScyllaDB Part 3 GoCQLX <https://university.scylladb.com/courses/using-scylla-drivers/lessons/golang-and-scylla-part-3-gocqlx/>`_ - part three of the Golang three-part course which focuses on how to create a sample Go application that executes a few basic CQL statements with a ScyllaDB cluster using the GoCQLX package

View File

@@ -0,0 +1,31 @@
=====================
ScyllaDB Java Driver
=====================
ScyllaDB Java Driver is forked from `DataStax Java Driver <https://github.com/datastax/java-driver>`_ with enhanced capabilities, taking advantage of ScyllaDB's unique architecture.
The ScyllaDB Java driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
Use the ScyllaDB Java driver for better compatibility and support for ScyllaDB with Java-based applications.
Read the `documentation <https://java-driver.docs.scylladb.com/>`_ to get started or visit the `Github project <https://github.com/scylladb/java-driver>`_.
The driver architecture is based on layers. At the bottom lies the driver core.
This core handles everything related to the connections to a ScyllaDB cluster (for example, connection pool, discovering new nodes, etc.) and exposes a simple, relatively low-level API on top of which higher-level layers can be built.
The ScyllaDB Java Driver is a drop-in replacement for the DataStax Java Driver.
As such, no code changes are needed to use this driver.
Using CDC with Java
-------------------
When writing applications, you can now use our `Java Library <https://github.com/scylladb/scylla-cdc-java>`_ to simplify writing applications that read from ScyllaDB CDC.
More information
----------------
* `ScyllaDB Java Driver Docs <https://java-driver.docs.scylladb.com/>`_
* `ScyllaDB Java Driver project page on GitHub <https://github.com/scylladb/java-driver/>`_ - Source Code
* `ScyllaDB University: Coding with Java <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-java-part-1/>`_ - a three-part lesson with in-depth examples from executing a few basic CQL statements with a ScyllaDB cluster using the Java driver, to the different data types that you can use in your database tables and how to store these binary files in ScyllaDB with a simple Java application.

View File

@@ -0,0 +1,20 @@
======================
ScyllaDB Python Driver
======================
The ScyllaDB Python driver is shard aware and contains extensions for a ``tokenAwareHostPolicy``.
Using this policy, the driver can select a connection to a particular shard based on the shards token.
As a result, latency is significantly reduced because there is no need to pass data between the shards.
Read the `documentation <https://python-driver.docs.scylladb.com/>`_ to get started or visit the Github project `ScyllaDB Python driver <https://github.com/scylladb/python-driver/>`_.
As the ScyllaDB Python Driver is a drop-in replacement for DataStax Python Driver, no code changes are needed to use the driver.
Use the ScyllaDB Python driver for better compatibility and support for ScyllaDB with Python-based applications.
More information
----------------
* `ScyllaDB Python Driver Documentation <https://python-driver.docs.scylladb.com/>`_
* `ScyllaDB Python Driver on GitHub <https://github.com/scylladb/python-driver/>`_
* `ScyllaDB University: Coding with Python <https://university.scylladb.com/courses/using-scylla-drivers/lessons/coding-with-python/>`_

View File

@@ -0,0 +1,24 @@
=====================
ScyllaDB Rust Driver
=====================
The ScyllaDB Rust driver is a client-side, shard-aware driver written in pure Rust with a fully async API using Tokio.
Optimized for ScyllaDB, the driver is also compatible with Apache Cassandra®.
.. image:: ./images/monster-rust.png
:width: 150pt
**To download and install the driver**, visit the `Github project <https://github.com/scylladb/scylla-rust-driver>`_.
Read the `Documentation <https://rust-driver.docs.scylladb.com>`_.
Using CDC with Rust
----------------------
When writing applications, you can use ScyllaDB's `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_
to simplify writing applications that read from ScyllaDB's CDC.
Use `Rust CDC Library <https://github.com/scylladb/scylla-cdc-rust>`_ to read
:doc:`ScyllaDB's CDC </features/cdc/index>` update streams.

View File

@@ -0,0 +1,9 @@
========================
AWS DynamoDB Drivers
========================
ScyllaDB AWS DynamoDB Compatible API can be used with any AWS DynamoDB Driver.
For a list of AWS AWS DynamoDB drivers see `here <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GettingStarted.html>`_

View File

@@ -0,0 +1,21 @@
================
ScyllaDB Drivers
================
.. toctree::
:titlesonly:
:hidden:
ScyllaDB CQL Drivers <cql-drivers/index>
ScyllaDB DynamoDB Drivers <dynamo-drivers/index>
You can use ScyllaDB with:
* :doc:`Apache Cassandra CQL Compatible Drivers <cql-drivers/index>`
* :doc:`Amazon DynamoDB Compatible API Drivers <dynamo-drivers/index>`
Additional drivers coming soon!
If you are looking for a ScyllaDB Integration Solution or a Connector, refer to :doc:`ScyllaDB Integrations </using-scylla/integrations/index>`.

View File

@@ -9,7 +9,7 @@ ScyllaDB for Developers
Tutorials and Example Projects <https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html>
Learn to Use ScyllaDB <https://docs.scylladb.com/stable/get-started/learn-resources/index.html>
ScyllaDB Alternator <alternator/index>
ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>
ScyllaDB Drivers <drivers/index>
.. panel-box::
@@ -26,7 +26,7 @@ ScyllaDB for Developers
:id: "getting-started"
:class: my-panel
* `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_ - ScyllaDB and third-party drivers for CQL and DynamoDB
* :doc:`ScyllaDB Drivers </using-scylla/drivers/index>` - ScyllaDB and third-party drivers for CQL and DynamoDB
* :doc:`ScyllaDB Alternator </using-scylla/alternator/index>` - The Open Source DynamoDB-compatible API
* :doc:`CQL Reference </cql/index>` - Reference for the Apache Cassandra Query Language (CQL) and its ScyllaDB extensions

View File

@@ -28,7 +28,7 @@ ScyllaDB Integrations and Connectors
:class: my-panel
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`).
Any application which uses a CQL driver will work with ScyllaDB.
The list below contains links to integration projects using ScyllaDB with third-party projects.

View File

@@ -2,7 +2,7 @@
Integrate ScyllaDB with Databricks
==================================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB, for example, Databricks Spark cluster.
Resource list
-------------

View File

@@ -3,7 +3,7 @@ Integrate ScyllaDB with Elasticsearch
=====================================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
The list below contains integration projects using ScyllaDB with Elasticsearch. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.

View File

@@ -13,11 +13,11 @@ The Jaeger Query service offers a web-based UI and API for users to explore, vis
Jaeger also supports integration with other observability tools like Prometheus and Grafana,
making it a popular choice for monitoring modern distributed applications.
Jaeger Server `can also be run <https://www.jaegertracing.io/docs/2.11/storage/cassandra/#compatible-backends>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
Jaeger Server `can also be run <https://github.com/jaegertracing/jaeger/tree/main/plugin/storage/scylladb>`_ with ScyllaDB as the storage backend, thanks to ScyllaDB's compatibility with Cassandra.
As a drop-in replacement for Cassandra, ScyllaDB implements the same protocol and provides a high-performance,
low-latency alternative. This compatibility allows Jaeger users to easily switch to ScyllaDB without making significant changes to their setup.
Using ScyllaDB as the storage backend for Jaeger Server can offer additional benefits,
such as improved performance, scalability, and resource efficiency.
This makes Jaeger even more effective for monitoring and troubleshooting distributed applications,
especially in high-traffic, demanding environments where a high-performance storage solution is critical.
especially in high-traffic, demanding environments where a high-performance storage solution is critical.

View File

@@ -3,7 +3,7 @@ Integrate ScyllaDB with Spark
=============================
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB. See `ScyllaDB Drivers <https://docs.scylladb.com/stable/drivers/index.html>`_. Any application which uses a CQL driver will work with ScyllaDB.
ScyllaDB is Apache Cassandra compatible at the CQL binary protocol level, and any driver which uses CQL will work with ScyllaDB (more :doc:`here </using-scylla/drivers/index>`). Any application which uses a CQL driver will work with ScyllaDB.
The list below contains integration projects using ScyllaDB with Spark. If you have tested your application with ScyllaDB and want to publish the results, contact us using the `community forum <https://forum.scylladb.com>`_.

View File

@@ -15,7 +15,6 @@
#include "db/config.hh"
#include "utils/log.hh"
#include "utils/hash.hh"
#include "utils/http.hh"
#include "utils/rjson.hh"
#include "utils/base64.hh"
#include "utils/loading_cache.hh"
@@ -268,6 +267,7 @@ std::tuple<std::string, std::string> azure_host::impl::parse_key(std::string_vie
std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std::string_view vault) {
static const boost::regex vault_name_re(R"([a-zA-Z0-9-]+)");
static const boost::regex vault_endpoint_re(R"((https?)://([^/:]+)(?::(\d+))?)");
boost::smatch match;
std::string tmp{vault};
@@ -277,12 +277,16 @@ std::tuple<std::string, std::string, unsigned> azure_host::impl::parse_vault(std
return {"https", fmt::format(AKV_HOST_TEMPLATE, vault), 443};
}
try {
auto info = utils::http::parse_simple_url(tmp);
return {info.scheme, info.host, info.port};
} catch (...) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault)));
if (boost::regex_match(tmp, match, vault_endpoint_re)) {
std::string scheme = match[1];
std::string host = match[2];
std::string port_str = match[3];
unsigned port = (port_str.empty()) ? (scheme == "https" ? 443 : 80) : std::stoi(port_str);
return {scheme, host, port};
}
throw std::invalid_argument(fmt::format("Invalid vault '{}'. Must be either a name or an endpoint in format: http(s)://<host>[:port]", vault));
}
future<shared_ptr<tls::certificate_credentials>> azure_host::impl::make_creds() {

View File

@@ -816,7 +816,6 @@ public:
future<data_sink> wrap_sink(const sstables::sstable& sst, sstables::component_type type, data_sink sink) override {
switch (type) {
case sstables::component_type::Scylla:
case sstables::component_type::TemporaryScylla:
case sstables::component_type::TemporaryTOC:
case sstables::component_type::TOC:
co_return sink;
@@ -845,7 +844,6 @@ public:
sstables::component_type type,
data_source src) override {
switch (type) {
case sstables::component_type::TemporaryScylla:
case sstables::component_type::Scylla:
case sstables::component_type::TemporaryTOC:
case sstables::component_type::TOC:

View File

@@ -36,7 +36,6 @@
#include "encryption_exceptions.hh"
#include "symmetric_key.hh"
#include "utils.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
#include "utils/UUID.hh"
@@ -164,8 +163,6 @@ private:
shared_ptr<seastar::tls::certificate_credentials> _creds;
std::unordered_map<bytes, shared_ptr<symmetric_key>> _cache;
bool _initialized = false;
abort_source _as;
};
template<typename T, typename C>
@@ -254,50 +251,24 @@ future<rjson::value> encryption::gcp_host::impl::gcp_auth_post_with_retry(std::s
auto& creds = i->second;
static constexpr auto max_retries = 10;
exponential_backoff_retry exr(10ms, 10000ms);
bool do_backoff = false;
bool did_auth_retry = false;
for (int retry = 0; ; ++retry) {
if (std::exchange(do_backoff, false)) {
co_await exr.retry(_as);
}
bool refreshing = true;
int retries = 0;
for (;;) {
try {
co_await creds.refresh(KMS_SCOPE, _certs);
refreshing = false;
} catch (...) {
std::throw_with_nested(permission_error("Error refreshing credentials"));
}
try {
auto res = co_await send_request(uri, _certs, body, httpd::operation_type::POST, key_values({
{ utils::gcp::AUTHORIZATION, utils::gcp::format_bearer(creds.token) },
}), &_as);
}));
co_return res;
} catch (httpd::unexpected_status_error& e) {
gcp_log.debug("{}: Got unexpected response: {}", uri, e.status());
switch (e.status()) {
default:
if (http::reply::classify_status(e.status()) != http::reply::status_class::server_error) {
break;
}
[[fallthrough]];
case httpclient::reply_status::request_timeout:
if (retry < max_retries) {
// service unavailable etc -> backoff + retry
do_backoff = true;
did_auth_retry = false; // reset this, since we might cause expiration due to backoff (not really, but...)
continue;
}
break;
}
if (refreshing) {
std::throw_with_nested(permission_error("Error refreshing credentials"));
}
if (e.status() == http::reply::status_type::unauthorized && retry < max_retries && !did_auth_retry) {
// refresh access token and retry. no backoff
did_auth_retry = true;
if (e.status() == http::reply::status_type::unauthorized && retries++ < 3) {
// refresh access token and retry.
continue;
}
if (e.status() == http::reply::status_type::unauthorized) {
@@ -351,7 +322,6 @@ future<> encryption::gcp_host::impl::init() {
}
future<> encryption::gcp_host::impl::stop() {
_as.request_abort();
co_await _attr_cache.stop();
co_await _id_cache.stop();
}

View File

@@ -38,7 +38,6 @@
#include "utils/loading_cache.hh"
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "utils/http.hh"
#include "marshal_exception.hh"
#include "db/config.hh"
@@ -323,26 +322,17 @@ future<> kmip_host::impl::connection::connect() {
f = f.then([this, cred] {
return cred->set_x509_trust_file(_options.truststore, seastar::tls::x509_crt_format::PEM);
});
} else {
f = f.then([cred] {
return cred->set_system_trust();
});
}
return f.then([this, cred] {
// TODO, find if we should do hostname verification
// TODO: connect all failovers already?
// Use the URL parser to handle ipv6 etc proper.
// Turn host arg into a URL.
auto info = utils::http::parse_simple_url("kmip://" + _host);
auto name = info.host;
auto port = info.port != 80 ? info.port : kmip_port;
auto i = _host.find_last_of(':');
auto name = _host.substr(0, i);
auto port = i != sstring::npos ? std::stoul(_host.substr(i + 1)) : kmip_port;
return seastar::net::dns::resolve_name(name).then([this, cred, port, name](seastar::net::inet_address addr) {
kmip_log.debug("Try connect {}:{}", addr, port);
// TODO: should we verify non-numeric hosts here? (opts.server_name)
// Adding this might break existing users with half-baked certs.
return seastar::tls::connect(cred, seastar::socket_address{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
return seastar::net::dns::resolve_name(name).then([this, cred, port](seastar::net::inet_address addr) {
return seastar::tls::connect(cred, seastar::ipv4_addr{addr, uint16_t(port)}).then([this](seastar::connected_socket s) {
kmip_log.debug("Successfully connected {}", _host);
// #998 Set keepalive to try avoiding connection going stale in between commands.
s.set_keepalive_parameters(net::tcp_keepalive_params{60s, 60s, 10});

View File

@@ -35,7 +35,6 @@
#include "utils/exponential_backoff_retry.hh"
#include "utils/hash.hh"
#include "utils/loading_cache.hh"
#include "utils/http.hh"
#include "utils/UUID.hh"
#include "utils/UUID_gen.hh"
#include "utils/rjson.hh"
@@ -152,10 +151,15 @@ public:
{
// check if we have an explicit endpoint set.
if (!_options.endpoint.empty()) {
auto info = utils::http::parse_simple_url(_options.endpoint);
_options.https = info.is_https();
_options.host = info.host;
_options.port = info.port;
static std::regex simple_url(R"foo((https?):\/\/(?:([\w\.]+)|\[([\w:]+)\]):?(\d+)?\/?)foo");
std::transform(_options.endpoint.begin(), _options.endpoint.end(), _options.endpoint.begin(), ::tolower);
std::smatch m;
if (!std::regex_match(_options.endpoint, m, simple_url)) {
throw std::invalid_argument(fmt::format("Could not parse URL: {}", _options.endpoint));
}
_options.https = m[1].str() == "https";
_options.host = m[2].length() > 0 ? m[2].str() : m[3].str();
_options.port = m[4].length() > 0 ? std::stoi(m[4].str()) : 0;
}
if (_options.endpoint.empty() && _options.host.empty() && _options.aws_region.empty() && !_options.aws_use_ec2_region) {
throw std::invalid_argument("No AWS region or endpoint specified");

View File

@@ -176,8 +176,6 @@ public:
gms::feature rack_list_rf { *this, "RACK_LIST_RF"sv };
gms::feature driver_service_level { *this, "DRIVER_SERVICE_LEVEL"sv };
gms::feature strongly_consistent_tables { *this, "STRONGLY_CONSISTENT_TABLES"sv };
// Enable position_range in RPC interfaces instead of clustering_range
gms::feature position_range { *this, "POSITION_RANGE"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;

View File

@@ -26,8 +26,3 @@ class position_in_partition {
bound_weight get_bound_weight();
std::optional<clustering_key_prefix> get_clustering_key_prefix();
};
class position_range {
position_in_partition start();
position_in_partition end();
};

View File

@@ -55,7 +55,6 @@ debian_base_packages=(
librapidxml-dev
libcrypto++-dev
libxxhash-dev
zlib1g-dev
slapd
ldap-utils
libcpp-jwt-dev
@@ -118,7 +117,6 @@ fedora_packages=(
makeself
libzstd-static libzstd-devel
lz4-static lz4-devel
zlib-ng-compat-devel
rpm-build
devscripts
debhelper

View File

@@ -157,7 +157,6 @@ adjust_bin() {
export GNUTLS_SYSTEM_PRIORITY_FILE="\${GNUTLS_SYSTEM_PRIORITY_FILE-$prefix/libreloc/gnutls.config}"
export LD_LIBRARY_PATH="$prefix/libreloc"
export UBSAN_OPTIONS="${UBSAN_OPTIONS:+$UBSAN_OPTIONS:}suppressions=$prefix/libexec/ubsan-suppressions.supp"
${p11_trust_paths:+export SCYLLA_P11_TRUST_PATHS="$p11_trust_paths"}
exec -a "\$0" "$prefix/libexec/$bin" "\$@"
EOF
chmod 755 "$root/$prefix/bin/$bin"
@@ -331,6 +330,7 @@ if ! $nonroot; then
rsysconfdir=$(realpath -m "$root/$sysconfdir")
rusr=$(realpath -m "$root/usr")
rsystemd=$(realpath -m "$rusr/lib/systemd/system")
rshare="$rprefix/share"
rdoc="$rprefix/share/doc"
rdata=$(realpath -m "$root/var/lib/scylla")
rhkdata=$(realpath -m "$root/var/lib/scylla-housekeeping")
@@ -338,6 +338,7 @@ else
retc="$rprefix/etc"
rsysconfdir="$rprefix/$sysconfdir"
rsystemd="$HOME/.config/systemd/user"
rshare="$rprefix/share"
rdoc="$rprefix/share/doc"
rdata="$rprefix"
fi
@@ -521,6 +522,16 @@ PRODUCT="$product"
EOS
chmod 644 "$rprefix"/scripts/scylla_product.py
install -d -m755 "$rshare"/p11-kit/modules
cat << EOS > "$rshare"/p11-kit/modules/p11-kit-trust.module
module: $prefix/libreloc/pkcs11/p11-kit-trust.so
priority: 1
trust-policy: yes
x-trust-lookup: pkcs11:library-description=PKCS%2311%20Kit%20Trust%20Module
disable-in: p11-kit-proxy
x-init-reserved: paths=$p11_trust_paths
EOS
if ! $nonroot && ! $without_systemd; then
install -d -m755 "$retc"/systemd/system/scylla-server.service.d
install -m644 dist/common/systemd/scylla-server.service.d/dependencies.conf -Dt "$retc"/systemd/system/scylla-server.service.d

41
main.cc
View File

@@ -10,8 +10,6 @@
#include <functional>
#include <fmt/ranges.h>
#include <gnutls/pkcs11.h>
#include <seastar/util/closeable.hh>
#include <seastar/core/abort_source.hh>
#include "db/view/view_building_worker.hh"
@@ -125,6 +123,11 @@
#include "tools/utils.hh"
#define P11_KIT_FUTURE_UNSTABLE_API
extern "C" {
#include <p11-kit/p11-kit.h>
}
namespace fs = std::filesystem;
#include <seastar/core/metrics_api.hh>
#include <seastar/core/relabel_config.hh>
@@ -706,6 +709,14 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
print_starting_message(ac, av, parsed_opts);
}
// We have to override p11-kit config path before p11-kit initialization.
// And the initialization will invoke on seastar initialization, so it has to
// be before app.run()
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe"));
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
auto p11_modules_str = p11_modules.string<char>();
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
sharded<locator::shared_token_metadata> token_metadata;
sharded<locator::effective_replication_map_factory> erm_factory;
sharded<service::migration_notifier> mm_notifier;
@@ -1823,6 +1834,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ss.stop().get();
});
api::set_server_storage_service(ctx, ss, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});
checkpoint(stop_signal, "initializing query processor remote part");
// TODO: do this together with proxy.start_remote(...)
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(mapreduce_service),
@@ -2177,11 +2193,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// This will also disable migration manager schema pulls if needed.
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
api::set_server_storage_service(ctx, ss, group0_client).get();
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
api::unset_server_storage_service(ctx).get();
});
with_scheduling_group(maintenance_scheduling_group, [&] {
return messaging.invoke_on_all([&] (auto& ms) {
return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) {
@@ -2685,15 +2696,13 @@ int main(int ac, char** av) {
// #3583 - need to potentially ensure this for tools as well, since at least
// sstable* might need crypto libraries.
auto scylla_path = fs::read_symlink(fs::path("/proc/self/exe")); // could just be argv[0] I guess...
auto p11_trust_paths_from_env = std::getenv("SCYLLA_P11_TRUST_PATHS");
auto trust_module_path = scylla_path.parent_path().parent_path().append("libreloc/pkcs11/p11-kit-trust.so");
if (fs::exists(trust_module_path) && p11_trust_paths_from_env) {
gnutls_pkcs11_init(GNUTLS_PKCS11_FLAG_MANUAL, nullptr);
auto trust_config = fmt::format("p11-kit:paths={} trusted=yes", p11_trust_paths_from_env);
auto ret = gnutls_pkcs11_add_provider(trust_module_path.string().c_str(), trust_config.c_str());
if (ret != GNUTLS_E_SUCCESS) {
startlog.warn("Could not initialize p11-kit trust module: {}\n", gnutls_strerror(ret));
}
auto p11_modules = scylla_path.parent_path().parent_path().append("share/p11-kit/modules");
// Note: must be in scope for application lifetime. p11_kit_override_system_files does _not_
// copy input strings.
auto p11_modules_str = p11_modules.string<char>();
// #3392 only do this if we are actually packaged and the path exists.
if (fs::exists(p11_modules)) {
::p11_kit_override_system_files(NULL, NULL, p11_modules_str.c_str(), NULL, NULL);
}
return main_func(ac, av);

View File

@@ -1,70 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "mutation/position_in_partition.hh"
#include "query/query-request.hh"
#include "keys/clustering_interval_set.hh"
namespace query {
/// Helper utilities for working with position_range and migrating from clustering_range.
///
/// These utilities support the gradual migration from clustering_range (interval<clustering_key_prefix>)
/// to position_range. See docs/dev/clustering-range-to-position-range-migration.md for details.
/// Convert a vector of clustering_ranges to a vector of position_ranges
inline std::vector<position_range> clustering_row_ranges_to_position_ranges(const clustering_row_ranges& ranges) {
std::vector<position_range> result;
result.reserve(ranges.size());
for (const auto& r : ranges) {
result.emplace_back(position_range::from_range(r));
}
return result;
}
/// Convert a vector of position_ranges to a vector of clustering_ranges
/// Note: Empty position ranges (those that don't contain any keys) are skipped
inline clustering_row_ranges position_ranges_to_clustering_row_ranges(const std::vector<position_range>& ranges, const schema& s) {
clustering_row_ranges result;
result.reserve(ranges.size());
for (const auto& r : ranges) {
if (auto cr = position_range_to_clustering_range(r, s)) {
result.emplace_back(std::move(*cr));
}
}
return result;
}
/// Deoverlap clustering_row_ranges correctly using clustering_interval_set.
/// This avoids the known bugs with clustering_range::deoverlap (see scylladb#22817, #21604, #8157).
inline clustering_row_ranges deoverlap_clustering_row_ranges(const schema& s, const clustering_row_ranges& ranges) {
clustering_interval_set interval_set(s, ranges);
return interval_set.to_clustering_row_ranges();
}
/// Intersect two clustering_row_ranges correctly using clustering_interval_set.
/// This avoids the known bugs with clustering_range::intersection (see scylladb#22817, #21604, #8157).
inline clustering_row_ranges intersect_clustering_row_ranges(const schema& s,
const clustering_row_ranges& ranges1,
const clustering_row_ranges& ranges2) {
clustering_interval_set set1(s, ranges1);
clustering_interval_set set2(s, ranges2);
clustering_interval_set result;
for (const auto& r : set1) {
if (set2.overlaps(s, r)) {
result.add(s, r);
}
}
return result.to_clustering_row_ranges();
}
} // namespace query

View File

@@ -2329,7 +2329,11 @@ future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_m
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now()- start);
rlogger.info("repair[{}]: Finished tablet repair for table={}.{} range={} duration={} replicas={} global_tablet_id={} flush_time={}",
id.uuid(), keyspace_name, table_name, range, duration, replicas, gid, flush_time);
co_return flush_time;
if (!flush_time.has_value()) {
throw std::runtime_error(format("Batchlog reply failed for table={}.{} range={} replicas={} global_tablet_id={}",
id.uuid(), keyspace_name, table_name, range, replicas, gid));
}
co_return flush_time.value();
}
tasks::is_user_task repair::tablet_repair_task_impl::is_user_task() const noexcept {
@@ -2406,11 +2410,9 @@ future<> repair::tablet_repair_task_impl::run() {
});
auto parent_shard = this_shard_id();
auto flush_time = _flush_time;
auto res = rs.container().map_reduce0([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<std::pair<gc_clock::time_point, bool>> {
std::vector<std::optional<gc_clock::time_point>> flush_times(smp::count, gc_clock::time_point{});
rs.container().invoke_on_all([&idx, &flush_times, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_info = sched_info, ranges_parallelism = _ranges_parallelism, parent_shard, topo_guard = _topo_guard, skip_flush = _skip_flush] (repair_service& rs) -> future<> {
std::exception_ptr error;
gc_clock::time_point shard_flush_time;
bool flush_failed = false;
for (auto& m : metas) {
if (m.master_shard_id != this_shard_id()) {
continue;
@@ -2464,24 +2466,27 @@ future<> repair::tablet_repair_task_impl::run() {
error = std::move(ep);
}
}
auto time = task->get_flush_time();
shard_flush_time = shard_flush_time == gc_clock::time_point() ? time : std::min(shard_flush_time, time);
flush_failed = flush_failed || (needs_flush_before_repair && !hints_batchlog_flushed);
auto current = flush_times[this_shard_id()];
if ((needs_flush_before_repair &&!hints_batchlog_flushed) || !current.has_value()) {
flush_times[this_shard_id()] = std::nullopt;
} else {
auto time = task->get_flush_time();
flush_times[this_shard_id()] = current == gc_clock::time_point() ? time : std::min(current.value(), time);
}
}
if (error) {
co_await coroutine::return_exception_ptr(std::move(error));
}
co_return std::make_pair(shard_flush_time, flush_failed);
}, std::make_pair<gc_clock::time_point, bool>(std::move(flush_time), false), [] (const auto& p1, const auto& p2) {
auto& [time1, failed1] = p1;
auto& [time2, failed2] = p2;
auto flush_time = time1 == gc_clock::time_point() ? time2 :
(time2 == gc_clock::time_point() ? time1 : std::min(time1, time2));
auto failed = failed1 || failed2;
return std::make_pair(flush_time, failed);
}).get();
_flush_time = res.first;
_should_flush_and_flush_failed = res.second;
for (auto& time : flush_times) {
if (!time.has_value()) {
_flush_time = std::nullopt;
break;
}
if (time != gc_clock::time_point()) {
_flush_time = _flush_time == gc_clock::time_point() ? time : std::min(_flush_time.value(), time.value());
}
}
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time);
rlogger.info("repair[{}]: Finished user-requested repair for tablet keyspace={} tables={} repair_id={} tablets_repaired={} duration={}",
id.uuid(), _keyspace, _tables, id.id, _metas.size(), duration);

View File

@@ -2529,7 +2529,7 @@ future<repair_update_system_table_response> repair_service::repair_update_system
}
}
if (req.range.end()) {
if (!req.range.end()->is_inclusive() && req.range.end()->value() != dht::maximum_token()) {
if (!req.range.end()->is_inclusive()) {
is_valid_range = false;
}
}

View File

@@ -112,8 +112,7 @@ private:
optimized_optional<abort_source::subscription> _abort_subscription;
std::optional<int> _ranges_parallelism;
size_t _metas_size = 0;
gc_clock::time_point _flush_time = gc_clock::time_point();
bool _should_flush_and_flush_failed = false;
std::optional<gc_clock::time_point> _flush_time = gc_clock::time_point();
service::frozen_topology_guard _topo_guard;
bool _skip_flush;
public:
@@ -135,12 +134,7 @@ public:
return tasks::is_abortable(!_abort_subscription);
}
gc_clock::time_point get_flush_time() const {
if (_should_flush_and_flush_failed) {
throw std::runtime_error(fmt::format("Flush is needed for repair {} with parent {}, but failed", id(), _parent_id));
}
return _flush_time;
}
std::optional<gc_clock::time_point> get_flush_time() const { return _flush_time; }
tasks::is_user_task is_user_task() const noexcept override;
virtual future<> release_resources() noexcept override;

View File

@@ -3704,7 +3704,7 @@ future<utils::chunked_vector<temporary_buffer<char>>> database::sample_data_file
}), std::ref(state));
// [1, 2, 3, 0] --> [0, 1, 3, 6]
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), uint64_t(0), std::plus());
std::exclusive_scan(global_offset.begin(), global_offset.end(), global_offset.begin(), 0, std::plus());
// We can't generate random non-negative integers smaller than 0,
// so let's just deal with the `total_chunks == 0` case with an early return.

View File

@@ -301,7 +301,6 @@ protected:
class ghost_row_deleting_query_pager : public service::pager::query_pager {
service::storage_proxy& _proxy;
db::timeout_clock::duration _timeout_duration;
size_t _concurrency;
public:
ghost_row_deleting_query_pager(schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
@@ -310,12 +309,10 @@ public:
dht::partition_range_vector ranges,
cql3::cql_stats& stats,
service::storage_proxy& proxy,
db::timeout_clock::duration timeout_duration,
size_t concurrency)
db::timeout_clock::duration timeout_duration)
: query_pager(proxy, s, selection, state, options, std::move(cmd), std::move(ranges), std::nullopt)
, _proxy(proxy)
, _timeout_duration(timeout_duration)
, _concurrency(concurrency)
{}
virtual ~ghost_row_deleting_query_pager() {}
@@ -325,12 +322,8 @@ public:
_query_read_repair_decision = qr.read_repair_decision;
qr.query_result->ensure_counts();
return seastar::async([this, query_result = std::move(qr.query_result), page_size, now] () mutable -> result<> {
std::exception_ptr ex;
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration, _concurrency, ex},
handle_result(db::view::delete_ghost_rows_visitor{_proxy, _state, view_ptr(_query_schema), _timeout_duration},
std::move(query_result), page_size, now);
if (ex) {
std::rethrow_exception(ex);
}
return bo::success();
});
}));
@@ -510,8 +503,7 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
dht::partition_range_vector ranges,
cql3::cql_stats& stats,
storage_proxy& proxy,
db::timeout_clock::duration duration,
size_t concurrency) {
db::timeout_clock::duration duration) {
return ::make_shared<ghost_row_deleting_query_pager>(std::move(s), std::move(selection), state,
options, std::move(cmd), std::move(ranges), stats, proxy, duration, concurrency);
options, std::move(cmd), std::move(ranges), stats, proxy, duration);
}

View File

@@ -47,8 +47,7 @@ public:
dht::partition_range_vector,
cql3::cql_stats& stats,
storage_proxy& proxy,
db::timeout_clock::duration timeout_duration,
size_t concurrency);
db::timeout_clock::duration timeout_duration);
};
}

View File

@@ -1643,27 +1643,25 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
break;
case locator::tablet_transition_stage::cleanup_target:
if (do_barrier()) {
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
if (!trinfo.pending_replica) {
rtlogger.info("Tablet cleanup of {} skipped because no replicas pending", gid);
return make_ready_future<>();
}
locator::tablet_replica dst = *trinfo.pending_replica;
if (is_excluded(raft::server_id(dst.host.uuid()))) {
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded and doesn't need to revert migration", gid, dst);
return make_ready_future<>();
}
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
transition_to(locator::tablet_transition_stage::revert_migration);
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
if (!trinfo.pending_replica) {
rtlogger.info("Tablet cleanup of {} skipped because no replicas pending", gid);
return make_ready_future<>();
}
locator::tablet_replica dst = *trinfo.pending_replica;
if (is_excluded(raft::server_id(dst.host.uuid()))) {
rtlogger.info("Tablet cleanup of {} on {} skipped because node is excluded and doesn't need to revert migration", gid, dst);
return make_ready_future<>();
}
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
transition_to(locator::tablet_transition_stage::revert_migration);
}
break;
case locator::tablet_transition_stage::revert_migration:

View File

@@ -27,7 +27,6 @@ enum class component_type {
TemporaryTOC,
TemporaryStatistics,
Scylla,
TemporaryScylla,
Rows,
Partitions,
TemporaryHashes,
@@ -77,8 +76,6 @@ struct fmt::formatter<sstables::component_type> : fmt::formatter<string_view> {
return formatter<string_view>::format("TemporaryStatistics", ctx);
case Scylla:
return formatter<string_view>::format("Scylla", ctx);
case TemporaryScylla:
return formatter<string_view>::format("TemporaryScylla", ctx);
case Partitions:
return formatter<string_view>::format("Partitions", ctx);
case Rows:

View File

@@ -632,10 +632,6 @@ private:
std::unique_ptr<file_writer> close_writer(std::unique_ptr<file_writer>& w);
void close_data_writer();
void close_index_writer();
void close_rows_writer();
void close_partitions_writer();
void ensure_tombstone_is_written() {
if (!_tombstone_written) {
consume(tombstone());
@@ -948,16 +944,17 @@ void writer::init_file_writers() {
_sst._schema->get_compressor_params(),
std::move(compressor)), _sst.get_filename());
}
if (_sst.has_component(component_type::Index)) {
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Index).get();
_index_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, _sst.index_filename());
_index_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), _sst.index_filename());
}
if (_sst.has_component(component_type::Partitions) && _sst.has_component(component_type::Rows)) {
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Rows).get();
_rows_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Rows));
_rows_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Rows));
_bti_row_index_writer = trie::bti_row_index_writer(*_rows_writer);
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Partitions).get();
_partitions_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Partitions));
_partitions_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Partitions));
_bti_partition_index_writer = trie::bti_partition_index_writer(*_partitions_writer);
}
if (_delayed_filter) {
@@ -985,41 +982,6 @@ void writer::close_data_writer() {
}
}
void writer::close_index_writer() {
if (_index_writer) {
auto writer = close_writer(_index_writer);
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
_sst.get_components_digests().index_digest = chksum_wr->full_checksum();
}
}
void writer::close_partitions_writer() {
if (_partitions_writer) {
_sst._partitions_db_footer = std::move(*_bti_partition_index_writer).finish(
_sst.get_version(),
_first_key.value(),
_last_key.value());
auto writer = close_writer(_partitions_writer);
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
_sst.get_components_digests().partitions_digest = chksum_wr->full_checksum();
}
}
void writer::close_rows_writer() {
if (_rows_writer) {
// Append some garbage padding to the file just to ensure that it's never empty.
// (Otherwise it would be empty if the sstable contains only small partitions).
// This is a hack to work around some bad interactions between zero-sized files
// and object storage. (It seems that e.g. minio considers a zero-sized file
// upload to be a no-op, which breaks some assumptions).
uint32_t garbage = seastar::cpu_to_be(0x13371337);
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
auto writer = close_writer(_rows_writer);
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
_sst.get_components_digests().rows_digest = chksum_wr->full_checksum();
}
}
void writer::consume_new_partition(const dht::decorated_key& dk) {
_c_stats.start_offset = _data_writer->offset();
_prev_row_start = _data_writer->offset();
@@ -1668,10 +1630,27 @@ void writer::consume_end_of_stream() {
_collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
}
close_index_writer();
if (_index_writer) {
close_writer(_index_writer);
}
close_partitions_writer();
close_rows_writer();
if (_partitions_writer) {
_sst._partitions_db_footer = std::move(*_bti_partition_index_writer).finish(
_sst.get_version(),
_first_key.value(),
_last_key.value());
close_writer(_partitions_writer);
}
if (_rows_writer) {
// Append some garbage padding to the file just to ensure that it's never empty.
// (Otherwise it would be empty if the sstable contains only small partitions).
// This is a hack to work around some bad interactions between zero-sized files
// and object storage. (It seems that e.g. minio considers a zero-sized file
// upload to be a no-op, which breaks some assumptions).
uint32_t garbage = seastar::cpu_to_be(0x13371337);
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
close_writer(_rows_writer);
}
if (_hashes_writer) {
close_writer(_hashes_writer);

View File

@@ -44,7 +44,6 @@ sstable_version_constants::component_map_t sstable_version_constants::create_com
{ component_type::Filter, "Filter.db" },
{ component_type::Statistics, "Statistics.db" },
{ component_type::Scylla, "Scylla.db" },
{ component_type::TemporaryScylla, "Scylla.db.tmp" },
{ component_type::TemporaryTOC, TEMPORARY_TOC_SUFFIX },
{ component_type::TemporaryStatistics, "Statistics.db.tmp" }
};

View File

@@ -956,22 +956,16 @@ future<file_writer> sstable::make_component_file_writer(component_type c, file_o
});
}
future<std::unique_ptr<crc32_digest_file_writer>> sstable::make_digests_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept {
return _storage->make_component_sink(*this, c, oflags, std::move(options)).then([this, comp = component_name(*this, c)] (data_sink sink) mutable {
return std::make_unique<crc32_digest_file_writer>(std::move(sink), sstable_buffer_size, comp);
});
}
void sstable::open_sstable(const sstring& origin) {
_origin = origin;
generate_toc();
_storage->open(*this);
}
void sstable::write_toc(std::unique_ptr<crc32_digest_file_writer> w) {
void sstable::write_toc(file_writer w) {
sstlog.debug("Writing TOC file {} ", toc_filename());
do_write_simple(*w, [&] (version_types v, file_writer& w) {
do_write_simple(std::move(w), [&] (version_types v, file_writer& w) {
for (auto&& key : _recognized_components) {
// new line character is appended to the end of each component name.
auto value = sstable_version_constants::get_component_map(v).at(key) + "\n";
@@ -979,8 +973,6 @@ void sstable::write_toc(std::unique_ptr<crc32_digest_file_writer> w) {
write(v, w, b);
}
});
_components_digests.toc_digest = w->full_checksum();
}
void sstable::write_crc(const checksum& c) {
@@ -997,7 +989,6 @@ void sstable::write_digest(uint32_t full_checksum) {
auto digest = to_sstring<bytes>(full_checksum);
write(v, w, digest);
}, buffer_size);
_components_digests.data_digest = full_checksum;
}
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache;
@@ -1054,7 +1045,7 @@ future<> sstable::read_simple(T& component) {
});
}
void sstable::do_write_simple(file_writer& writer,
void sstable::do_write_simple(file_writer&& writer,
noncopyable_function<void (version_types, file_writer&)> write_component) {
write_component(_version, writer);
_metadata_size_on_disk += writer.offset();
@@ -1069,7 +1060,7 @@ void sstable::do_write_simple(component_type type,
file_output_stream_options options;
options.buffer_size = buffer_size;
auto w = make_component_file_writer(type, std::move(options)).get();
do_write_simple(w, std::move(write_component));
do_write_simple(std::move(w), std::move(write_component));
}
template <component_type Type, typename T>
@@ -1079,30 +1070,10 @@ void sstable::write_simple(const T& component) {
}, sstable_buffer_size);
}
uint32_t sstable::do_write_simple_with_digest(component_type type,
noncopyable_function<void (version_types version, file_writer& writer)> write_component, unsigned buffer_size) {
auto file_path = filename(type);
sstlog.debug("Writing {} file {}", sstable_version_constants::get_component_map(_version).at(type), file_path);
file_output_stream_options options;
options.buffer_size = buffer_size;
auto w = make_digests_component_file_writer(type, std::move(options)).get();
do_write_simple(*w, std::move(write_component));
return w->full_checksum();
}
template <component_type Type, typename T>
uint32_t sstable::write_simple_with_digest(const T& component) {
return do_write_simple_with_digest(Type, [&component] (version_types v, file_writer& w) {
write(v, w, component);
}, sstable_buffer_size);
}
template future<> sstable::read_simple<component_type::Filter>(sstables::filter& f);
template void sstable::write_simple<component_type::Filter>(const sstables::filter& f);
template void sstable::write_simple<component_type::Summary>(const sstables::summary_ka&);
template uint32_t sstable::write_simple_with_digest<component_type::Summary>(const sstables::summary_ka&);
future<> sstable::read_compression() {
// FIXME: If there is no compression, we should expect a CRC file to be present.
@@ -1121,8 +1092,7 @@ void sstable::write_compression() {
return;
}
uint32_t digest = write_simple_with_digest<component_type::CompressionInfo>(_components->compression);
_components_digests.compression_digest = digest;
write_simple<component_type::CompressionInfo>(_components->compression);
}
void sstable::validate_partitioner() {
@@ -1347,8 +1317,7 @@ future<> sstable::read_partitions_db_footer() {
}
void sstable::write_statistics() {
auto digest = write_simple_with_digest<component_type::Statistics>(_components->statistics);
_components_digests.statistics_digest = digest;
write_simple<component_type::Statistics>(_components->statistics);
}
void sstable::mark_as_being_repaired(const service::session_id& id) {
@@ -1373,23 +1342,10 @@ void sstable::rewrite_statistics() {
file_output_stream_options options;
options.buffer_size = sstable_buffer_size;
auto w = make_digests_component_file_writer(component_type::TemporaryStatistics, std::move(options),
auto w = make_component_file_writer(component_type::TemporaryStatistics, std::move(options),
open_flags::wo | open_flags::create | open_flags::truncate).get();
write(_version, *w, _components->statistics);
w->close();
// When rewriting statistics, we also need to update the scylla component
// because it contains the digest of the statistics component.
if (has_scylla_component()) {
_components_digests.statistics_digest = w->full_checksum();
_components->scylla_metadata->data.set<scylla_metadata_type::ComponentsDigests>(components_digests{_components_digests});
sstlog.debug("Rewriting scylla component of sstable {}", get_filename());
write_simple<component_type::TemporaryScylla>(*_components->scylla_metadata);
// rename() guarantees atomicity when renaming a file into place.
sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryScylla)), fmt::to_string(filename(component_type::Scylla))).get();
}
write(_version, w, _components->statistics);
w.close();
// rename() guarantees atomicity when renaming a file into place.
sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryStatistics)), fmt::to_string(filename(component_type::Statistics))).get();
}
@@ -1583,8 +1539,7 @@ void sstable::write_filter() {
auto&& bs = f->bits();
auto filter_ref = sstables::filter_ref(f->num_hashes(), bs.get_storage());
uint32_t digest = write_simple_with_digest<component_type::Filter>(filter_ref);
_components_digests.filter_digest = digest;
write_simple<component_type::Filter>(filter_ref);
}
void sstable::maybe_rebuild_filter_from_index(uint64_t num_partitions) {
@@ -2043,8 +1998,6 @@ sstable::read_scylla_metadata() noexcept {
}
return read_simple<component_type::Scylla>(*_components->scylla_metadata).then([this] {
_features = _components->scylla_metadata->get_features();
_components_digests = _components->scylla_metadata->get_components_digests();
_components->digest = _components_digests.data_digest;
});
});
}
@@ -2134,7 +2087,6 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
sstable_schema.columns.elements.push_back(sstable_column_description{to_sstable_column_kind(col.kind), {col.name()}, {to_bytes(col.type->name())}});
}
_components->scylla_metadata->data.set<scylla_metadata_type::Schema>(std::move(sstable_schema));
_components->scylla_metadata->data.set<scylla_metadata_type::ComponentsDigests>(components_digests(_components_digests));
write_simple<component_type::Scylla>(*_components->scylla_metadata);
}
@@ -3123,31 +3075,6 @@ void sstable::set_sstable_level(uint32_t new_level) {
s.sstable_level = new_level;
}
std::optional<uint32_t> sstable::get_component_digest(component_type c) const {
switch (c) {
case component_type::Index:
return _components_digests.index_digest;
case component_type::Summary:
return _components_digests.summary_digest;
case component_type::TOC:
return _components_digests.toc_digest;
case component_type::CompressionInfo:
return _components_digests.compression_digest;
case component_type::Filter:
return _components_digests.filter_digest;
case component_type::Partitions:
return _components_digests.partitions_digest;
case component_type::Rows:
return _components_digests.rows_digest;
case component_type::Data:
return _components_digests.data_digest;
case component_type::Statistics:
return _components_digests.statistics_digest;
default:
return std::nullopt;
}
}
future<> sstable::mutate_sstable_level(uint32_t new_level) {
if (!has_component(component_type::Statistics)) {
return make_ready_future<>();

View File

@@ -9,7 +9,6 @@
#pragma once
#include "sstables/writer.hh"
#include "version.hh"
#include "shared_sstable.hh"
#include "open_info.hh"
@@ -628,8 +627,6 @@ private:
// Total memory reclaimed so far from this sstable
size_t _total_memory_reclaimed{0};
bool _unlinked{false};
components_digests _components_digests;
public:
bool has_component(component_type f) const;
sstables_manager& manager() { return _manager; }
@@ -650,18 +647,12 @@ private:
template <component_type Type, typename T>
void write_simple(const T& comp);
void do_write_simple(file_writer& writer,
void do_write_simple(file_writer&& writer,
noncopyable_function<void (version_types, file_writer&)> write_component);
void do_write_simple(component_type type,
noncopyable_function<void (version_types version, file_writer& writer)> write_component,
unsigned buffer_size);
template <component_type Type, typename T>
uint32_t write_simple_with_digest(const T& comp);
uint32_t do_write_simple_with_digest(component_type type,
noncopyable_function<void (version_types version, file_writer& writer)> write_component,
unsigned buffer_size);
void write_crc(const checksum& c);
void write_digest(uint32_t full_checksum);
@@ -672,9 +663,6 @@ private:
future<file_writer> make_component_file_writer(component_type c, file_output_stream_options options,
open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept;
future<std::unique_ptr<crc32_digest_file_writer>> make_digests_component_file_writer(component_type c, file_output_stream_options options,
open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept;
void generate_toc();
void open_sstable(const sstring& origin);
@@ -705,8 +693,7 @@ private:
future<> read_summary() noexcept;
void write_summary() {
uint32_t digest = write_simple_with_digest<component_type::Summary>(_components->summary);
_components_digests.summary_digest = digest;
write_simple<component_type::Summary>(_components->summary);
}
// To be called when we try to load an SSTable that lacks a Summary. Could
@@ -836,7 +823,7 @@ private:
future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept;
// runs in async context (called from storage::open)
void write_toc(std::unique_ptr<crc32_digest_file_writer> w);
void write_toc(file_writer w);
static future<uint32_t> read_digest_from_file(file f);
static future<lw_shared_ptr<checksum>> read_checksum_from_file(file f);
public:
@@ -1026,12 +1013,6 @@ public:
return _components->digest;
}
components_digests& get_components_digests() {
return _components_digests;
}
std::optional<uint32_t> get_component_digest(component_type c) const;
// Gets ratio of droppable tombstone. A tombstone is considered droppable here
// for cells and tombstones expired before the time point "GC before", which
// is the point before which expiring data can be purged.

View File

@@ -204,13 +204,13 @@ void filesystem_storage::open(sstable& sst) {
open_flags::create |
open_flags::exclusive,
options).get();
auto w = std::make_unique<crc32_digest_file_writer>(std::move(sink), sst.sstable_buffer_size, component_name(sst, component_type::TemporaryTOC));
auto w = file_writer(output_stream<char>(std::move(sink)), component_name(sst, component_type::TemporaryTOC));
bool toc_exists = file_exists(fmt::to_string(sst.filename(component_type::TOC))).get();
if (toc_exists) {
// TOC will exist at this point if write_components() was called with
// the generation of a sstable that exists.
w->close();
w.close();
remove_file(fmt::to_string(sst.filename(component_type::TemporaryTOC))).get();
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name()));
}
@@ -670,10 +670,15 @@ void object_storage_base::open(sstable& sst) {
sst.manager().sstables_registry().create_entry(owner(), status_creating, sst._state, std::move(desc)).get();
memory_data_sink_buffers bufs;
auto out = data_sink(std::make_unique<memory_data_sink>(bufs));
auto w = std::make_unique<crc32_digest_file_writer>(std::move(out), sst.sstable_buffer_size, component_name(sst, component_type::TOC));
sst.write_toc(std::move(w));
sst.write_toc(
file_writer(
output_stream<char>(
data_sink(
std::make_unique<memory_data_sink>(bufs)
)
)
)
);
put_object(make_object_name(sst, component_type::TOC), std::move(bufs)).get();
}

View File

@@ -547,7 +547,6 @@ enum class scylla_metadata_type : uint32_t {
ExtTimestampStats = 9,
SSTableIdentifier = 10,
Schema = 11,
ComponentsDigests = 12,
};
// UUID is used for uniqueness across nodes, such that an imported sstable
@@ -574,24 +573,6 @@ struct sstable_identifier_type {
auto describe_type(sstable_version_types v, Describer f) { return f(value); }
};
// Component digests stored in scylla metadata to track integrity of individual components
struct components_digests {
std::optional<uint32_t> data_digest;
std::optional<uint32_t> compression_digest;
std::optional<uint32_t> filter_digest;
std::optional<uint32_t> statistics_digest;
std::optional<uint32_t> summary_digest;
std::optional<uint32_t> index_digest;
std::optional<uint32_t> toc_digest;
std::optional<uint32_t> partitions_digest;
std::optional<uint32_t> rows_digest;
template <typename Describer>
auto describe_type(sstable_version_types v, Describer f) {
return f(data_digest,compression_digest, filter_digest, statistics_digest, summary_digest, index_digest, toc_digest, partitions_digest, rows_digest);
}
};
// Types of large data statistics.
//
// Note: For extensibility, never reuse an identifier,
@@ -675,8 +656,7 @@ struct scylla_metadata {
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::ScyllaVersion, scylla_version>,
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::ExtTimestampStats, ext_timestamp_stats>,
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::SSTableIdentifier, sstable_identifier>,
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::Schema, sstable_schema>,
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::ComponentsDigests, components_digests>
disk_tagged_union_member<scylla_metadata_type, scylla_metadata_type::Schema, sstable_schema>
> data;
sstable_enabled_features get_features() const {
@@ -711,13 +691,6 @@ struct scylla_metadata {
auto* sid = data.get<scylla_metadata_type::SSTableIdentifier, scylla_metadata::sstable_identifier>();
return sid ? sid->value : sstable_id::create_null_id();
}
const components_digests get_components_digests() const {
auto cd = data.get<scylla_metadata_type::ComponentsDigests, components_digests>();
if (!cd) {
return {};
}
return *cd;
}
template <typename Describer>
auto describe_type(sstable_version_types v, Describer f) { return f(data); }

View File

@@ -65,7 +65,7 @@ serialized_size(sstable_version_types v, const T& object) {
return size;
}
template <typename ChecksumType, bool calculate_chunk_checksums>
template <typename ChecksumType>
requires ChecksumUtils<ChecksumType>
class checksummed_file_data_sink_impl : public data_sink_impl {
data_sink _out;
@@ -92,9 +92,7 @@ public:
per_chunk_checksum = ChecksumType::checksum(per_chunk_checksum, buf.begin() + offset, size);
_full_checksum = checksum_combine_or_feed<ChecksumType>(_full_checksum, per_chunk_checksum, buf.begin() + offset, size);
if constexpr (calculate_chunk_checksums) {
_c.checksums.push_back(per_chunk_checksum);
}
_c.checksums.push_back(per_chunk_checksum);
}
}
return _out.put(std::move(bufs));
@@ -114,29 +112,29 @@ public:
}
};
template <typename ChecksumType, bool calculate_chunk_checksums>
template <typename ChecksumType>
requires ChecksumUtils<ChecksumType>
class checksummed_file_data_sink : public data_sink {
public:
checksummed_file_data_sink(data_sink out, struct checksum& cinfo, uint32_t& full_file_checksum)
: data_sink(std::make_unique<checksummed_file_data_sink_impl<ChecksumType, calculate_chunk_checksums>>(std::move(out), cinfo, full_file_checksum)) {}
: data_sink(std::make_unique<checksummed_file_data_sink_impl<ChecksumType>>(std::move(out), cinfo, full_file_checksum)) {}
};
template <typename ChecksumType, bool calculate_chunk_checksums>
template <typename ChecksumType>
requires ChecksumUtils<ChecksumType>
inline
output_stream<char> make_checksummed_file_output_stream(data_sink out, struct checksum& cinfo, uint32_t& full_file_checksum) {
return output_stream<char>(checksummed_file_data_sink<ChecksumType, calculate_chunk_checksums>(std::move(out), cinfo, full_file_checksum));
return output_stream<char>(checksummed_file_data_sink<ChecksumType>(std::move(out), cinfo, full_file_checksum));
}
template <typename ChecksumType, bool calculate_chunk_checksums>
template <typename ChecksumType>
requires ChecksumUtils<ChecksumType>
class checksummed_file_writer : public file_writer {
checksum _c;
uint32_t _full_checksum;
public:
checksummed_file_writer(data_sink out, size_t buffer_size, component_name c)
: file_writer(make_checksummed_file_output_stream<ChecksumType, calculate_chunk_checksums>(std::move(out), _c, _full_checksum), std::move(c))
: file_writer(make_checksummed_file_output_stream<ChecksumType>(std::move(out), _c, _full_checksum), std::move(c))
, _c(uint32_t(std::min(size_t(DEFAULT_CHUNK_SIZE), buffer_size)), {})
, _full_checksum(ChecksumType::init_checksum()) {}
@@ -154,10 +152,8 @@ public:
}
};
using adler32_checksummed_file_writer = checksummed_file_writer<adler32_utils, true>;
using crc32_checksummed_file_writer = checksummed_file_writer<crc32_utils, true>;
using crc32_digest_file_writer = checksummed_file_writer<crc32_utils, false>;
using adler32_checksummed_file_writer = checksummed_file_writer<adler32_utils>;
using crc32_checksummed_file_writer = checksummed_file_writer<crc32_utils>;
template <typename T, typename W>
requires Writer<W>

View File

@@ -1,7 +1,5 @@
add_scylla_test(UUID_test
KIND BOOST)
add_scylla_test(url_parse_test
KIND BOOST)
add_scylla_test(advanced_rpc_compressor_test
KIND SEASTAR)
add_scylla_test(allocation_strategy_test

View File

@@ -1,126 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/scylla_test_case.hh"
#include "test/lib/simple_schema.hh"
#include "query/position_range_utils.hh"
#include "keys/clustering_interval_set.hh"
#include "schema/schema_builder.hh"
using namespace query;
SEASTAR_THREAD_TEST_CASE(test_deoverlap_clustering_row_ranges) {
simple_schema s;
// Create overlapping ranges
auto ck1 = s.make_ckey(1);
auto ck2 = s.make_ckey(2);
auto ck3 = s.make_ckey(3);
auto ck4 = s.make_ckey(4);
clustering_row_ranges ranges;
ranges.push_back(clustering_range::make({ck1, true}, {ck3, true})); // [1, 3]
ranges.push_back(clustering_range::make({ck2, true}, {ck4, true})); // [2, 4]
// Deoverlap should merge these into a single range [1, 4]
auto deoverlapped = deoverlap_clustering_row_ranges(*s.schema(), ranges);
BOOST_REQUIRE_EQUAL(deoverlapped.size(), 1);
BOOST_REQUIRE(deoverlapped[0].start());
BOOST_REQUIRE(deoverlapped[0].end());
BOOST_REQUIRE(deoverlapped[0].start()->value().equal(*s.schema(), ck1));
BOOST_REQUIRE(deoverlapped[0].end()->value().equal(*s.schema(), ck4));
}
SEASTAR_THREAD_TEST_CASE(test_deoverlap_with_non_overlapping_ranges) {
simple_schema s;
auto ck1 = s.make_ckey(1);
auto ck2 = s.make_ckey(2);
auto ck3 = s.make_ckey(3);
auto ck4 = s.make_ckey(4);
clustering_row_ranges ranges;
ranges.push_back(clustering_range::make({ck1, true}, {ck2, true})); // [1, 2]
ranges.push_back(clustering_range::make({ck3, true}, {ck4, true})); // [3, 4]
// These don't overlap, should remain as two separate ranges
auto deoverlapped = deoverlap_clustering_row_ranges(*s.schema(), ranges);
BOOST_REQUIRE_EQUAL(deoverlapped.size(), 2);
}
SEASTAR_THREAD_TEST_CASE(test_clustering_row_ranges_conversion) {
simple_schema s;
auto ck1 = s.make_ckey(1);
auto ck2 = s.make_ckey(2);
clustering_row_ranges ranges;
ranges.push_back(clustering_range::make({ck1, true}, {ck2, false})); // [1, 2)
// Convert to position_ranges and back
auto pos_ranges = clustering_row_ranges_to_position_ranges(ranges);
BOOST_REQUIRE_EQUAL(pos_ranges.size(), 1);
auto converted_back = position_ranges_to_clustering_row_ranges(pos_ranges, *s.schema());
BOOST_REQUIRE_EQUAL(converted_back.size(), 1);
// Check that the conversion is correct
BOOST_REQUIRE(converted_back[0].start());
BOOST_REQUIRE(converted_back[0].end());
BOOST_REQUIRE(converted_back[0].start()->value().equal(*s.schema(), ck1));
BOOST_REQUIRE(converted_back[0].start()->is_inclusive());
BOOST_REQUIRE(converted_back[0].end()->value().equal(*s.schema(), ck2));
BOOST_REQUIRE(!converted_back[0].end()->is_inclusive());
}
SEASTAR_THREAD_TEST_CASE(test_intersect_clustering_row_ranges) {
simple_schema s;
auto ck1 = s.make_ckey(1);
auto ck2 = s.make_ckey(2);
auto ck3 = s.make_ckey(3);
auto ck4 = s.make_ckey(4);
clustering_row_ranges ranges1;
ranges1.push_back(clustering_range::make({ck1, true}, {ck3, true})); // [1, 3]
clustering_row_ranges ranges2;
ranges2.push_back(clustering_range::make({ck2, true}, {ck4, true})); // [2, 4]
// Intersection should be [2, 3]
auto intersected = intersect_clustering_row_ranges(*s.schema(), ranges1, ranges2);
BOOST_REQUIRE_EQUAL(intersected.size(), 1);
BOOST_REQUIRE(intersected[0].start());
BOOST_REQUIRE(intersected[0].end());
BOOST_REQUIRE(intersected[0].start()->value().equal(*s.schema(), ck2));
BOOST_REQUIRE(intersected[0].end()->value().equal(*s.schema(), ck3));
}
SEASTAR_THREAD_TEST_CASE(test_intersect_non_overlapping_ranges) {
simple_schema s;
auto ck1 = s.make_ckey(1);
auto ck2 = s.make_ckey(2);
auto ck3 = s.make_ckey(3);
auto ck4 = s.make_ckey(4);
clustering_row_ranges ranges1;
ranges1.push_back(clustering_range::make({ck1, true}, {ck2, true})); // [1, 2]
clustering_row_ranges ranges2;
ranges2.push_back(clustering_range::make({ck3, true}, {ck4, true})); // [3, 4]
// No overlap, should return empty
auto intersected = intersect_clustering_row_ranges(*s.schema(), ranges1, ranges2);
BOOST_REQUIRE_EQUAL(intersected.size(), 0);
}

View File

@@ -2062,90 +2062,6 @@ SEASTAR_TEST_CASE(test_returning_failure_from_ghost_rows_deletion) {
});
}
// We can't verify that concurrency is actually used in a unit test, so check that the USING CONCURRENCY clause
// at least still results in correct ghost row deletions.
SEASTAR_TEST_CASE(test_deleting_ghost_rows_using_concurrency) {
return do_with_cql_env_thread([] (auto& e) {
cquery_nofail(e, "CREATE TABLE t (p int, c int, v int, PRIMARY KEY (p, c))");
cquery_nofail(e, "CREATE MATERIALIZED VIEW tv AS SELECT v, p, c FROM t WHERE v IS NOT NULL AND c IS NOT NULL PRIMARY KEY (v, p, c);");
for (int i = 0; i < 100; i++) {
cquery_nofail(e, format("INSERT INTO t (p,c,v) VALUES ({},{},{})", i, i * 100, i % 10));
}
std::vector<std::vector<bytes_opt>> expected_view_rows;
for (int i = 0; i < 100; i++) {
expected_view_rows.push_back({int32_type->decompose(i % 10), int32_type->decompose(i), int32_type->decompose(i * 100)});
}
auto inject_ghost_row = [&e] (int p, int c, int v) {
e.db().invoke_on_all([p, c, v] (replica::database& db) {
schema_ptr schema = db.find_schema("ks", "tv");
replica::table& t = db.find_column_family(schema);
mutation m(schema, partition_key::from_singular(*schema, v));
auto& row = m.partition().clustered_row(*schema, clustering_key::from_exploded(*schema, {int32_type->decompose(p), int32_type->decompose(c)}));
row.apply(row_marker{api::new_timestamp()});
unsigned shard = t.shard_for_reads(m.token());
if (shard == this_shard_id()) {
t.apply(m);
}
}).get();
};
inject_ghost_row(1, 100, 1111);
eventually([&] {
// The ghost row exists, but it can only be queried from the view, not from the base
auto msg = cquery_nofail(e, "SELECT * FROM tv WHERE v = 1111;");
assert_that(msg).is_rows().with_rows({
{int32_type->decompose(1111), int32_type->decompose(1), int32_type->decompose(100)},
});
});
// Ghost row deletion is attempted for a single view partition
cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv WHERE v = 1111 USING CONCURRENCY 2");
eventually([&] {
// The ghost row is deleted
auto msg = cquery_nofail(e, "SELECT * FROM tv where v = 1111;");
assert_that(msg).is_rows().with_size(0);
});
for (int i = 0; i < 100; ++i) {
inject_ghost_row(i, i * 100, (i + 1) % 10);
}
eventually([&] {
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_size(200);
});
// Ghost row deletion is attempted for the whole table
cquery_nofail(e, "PRUNE MATERIALIZED VIEW tv USING CONCURRENCY 3;");
eventually([&] {
// Ghost rows are deleted
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows);
});
for (int i = 0; i < 100; ++i) {
inject_ghost_row(i, i * 100 + 1, (i + 2) % 10);
}
eventually([&] {
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_size(200);
});
// Ghost row deletion is attempted with a parallelized table scan
when_all(
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) >= -9223372036854775807 AND token(v) <= 0 USING CONCURRENCY 1"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 0 AND token(v) <= 10000000 USING CONCURRENCY 2"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 10000000 AND token(v) <= 20000000 USING CONCURRENCY 4"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 20000000 AND token(v) <= 30000000 USING CONCURRENCY 100"),
e.execute_cql("PRUNE MATERIALIZED VIEW tv WHERE token(v) > 30000000 AND token(v) <= 9223372036854775807 USING CONCURRENCY 1000")
).get();
eventually([&] {
// Ghost rows are deleted
auto msg = cquery_nofail(e, "SELECT * FROM tv;");
assert_that(msg).is_rows().with_rows_ignore_order(expected_view_rows);
});
});
}
// Reproducer for #18536.
//
// Paged index queries have been reported to cause reactor stalls on the

View File

@@ -15,14 +15,11 @@
#include <seastar/core/smp.hh>
#include <seastar/util/closeable.hh>
#include "sstables/checksum_utils.hh"
#include <seastar/util/short_streams.hh>
#include "sstables/generation_type.hh"
#include "sstables/sstables.hh"
#include "sstables/key.hh"
#include "sstables/open_info.hh"
#include "sstables/version.hh"
#include "test/lib/random_schema.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/scylla_test_case.hh"
@@ -35,7 +32,6 @@
#include "partition_slice_builder.hh"
#include "sstables/sstable_mutation_reader.hh"
#include "sstables/binary_search.hh"
#include "test/lib/random_utils.hh"
#include <boost/range/combine.hpp>
@@ -883,101 +879,3 @@ BOOST_AUTO_TEST_CASE(test_parse_path_bad) {
BOOST_CHECK_THROW(parse_path(path), std::exception);
}
}
using compress_sstable = tests::random_schema_specification::compress_sstable;
static future<> test_component_digest_persistence(component_type component, sstable::version_types version, compress_sstable compress = compress_sstable::no, bool rewrite_statistics = false) {
return test_env::do_with_async([component, version, compress, rewrite_statistics] (test_env& env) mutable {
auto random_spec = tests::make_random_schema_specification(
"ks",
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8),
compress);
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
const auto muts = tests::generate_random_mutations(random_schema, 2).get();
auto sst_original = make_sstable_containing(env.make_sstable(schema, version), muts);
auto& components = sstables::test(sst_original).get_components();
bool has_component = components.find(component) != components.end();
BOOST_REQUIRE(has_component);
auto toc_path = fmt::to_string(sst_original->toc_filename());
auto entry_desc = sstables::parse_path(toc_path, schema->ks_name(), schema->cf_name());
auto dir_path = std::filesystem::path(toc_path).parent_path().string();
std::optional<uint32_t> original_digest;
if (rewrite_statistics) {
original_digest = sst_original->get_component_digest(component);
BOOST_REQUIRE(original_digest.has_value());
sst_original->mutate_sstable_level(10).get();
auto new_digest = sst_original->get_component_digest(component);
BOOST_REQUIRE(new_digest.has_value());
BOOST_REQUIRE(original_digest.value() != new_digest.value());
}
sst_original = nullptr;
auto sst_reopened = env.make_sstable(schema, dir_path, entry_desc.generation, entry_desc.version, entry_desc.format);
sst_reopened->load(schema->get_sharder()).get();
auto loaded_digest = sst_reopened->get_component_digest(component);
BOOST_REQUIRE(loaded_digest.has_value());
auto f = open_file_dma(sstables::test(sst_reopened).filename(component).native(), open_flags::ro).get();
auto stream = make_file_input_stream(f);
auto close_stream = deferred_close(stream);
auto component_data = util::read_entire_stream_contiguous(stream).get();
auto calculated_digest = crc32_utils::checksum(component_data.begin(), component_data.size());
BOOST_REQUIRE_EQUAL(calculated_digest, loaded_digest.value());
});
}
SEASTAR_TEST_CASE(test_digest_persistence_index) {
return test_component_digest_persistence(component_type::Index, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_partitions) {
return test_component_digest_persistence(component_type::Partitions, sstable::version_types::ms);
}
SEASTAR_TEST_CASE(test_digest_persistence_rows) {
return test_component_digest_persistence(component_type::Rows, sstable::version_types::ms);
}
SEASTAR_TEST_CASE(test_digest_persistence_summary) {
return test_component_digest_persistence(component_type::Summary, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_filter) {
return test_component_digest_persistence(component_type::Filter, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_compression) {
return test_component_digest_persistence(component_type::CompressionInfo, sstable::version_types::me, compress_sstable::yes);
}
SEASTAR_TEST_CASE(test_digest_persistence_toc) {
return test_component_digest_persistence(component_type::TOC, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_statistics) {
return test_component_digest_persistence(component_type::Statistics, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_statistics_rewrite) {
return test_component_digest_persistence(component_type::Statistics, sstable::version_types::me, compress_sstable::no, true);
}
SEASTAR_TEST_CASE(test_digest_persistence_data) {
return test_component_digest_persistence(component_type::Data, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_data_compressed) {
return test_component_digest_persistence(component_type::Data, sstable::version_types::me, compress_sstable::yes);
}

View File

@@ -1,65 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#define BOOST_TEST_MODULE core
#include <boost/test/unit_test.hpp>
#include "utils/http.hh"
using namespace utils::http;
BOOST_AUTO_TEST_CASE(test_parse_ipv6) {
static const std::string ipv6_addr_str = "2001:db8:4006:812::200e";
auto info = parse_simple_url("http://[" + ipv6_addr_str + "]:8080");
BOOST_CHECK_EQUAL(info.host, ipv6_addr_str);
BOOST_CHECK_EQUAL(info.scheme, "http");
BOOST_CHECK(!info.is_https());
BOOST_CHECK_EQUAL(info.port, 8080);
BOOST_CHECK_EQUAL(info.path, "");
}
BOOST_AUTO_TEST_CASE(test_parse_kmip) {
auto info = parse_simple_url("kmip://127.0.0.1");
BOOST_CHECK_EQUAL(info.host, "127.0.0.1");
BOOST_CHECK_EQUAL(info.scheme, "kmip");
BOOST_CHECK(!info.is_https());
BOOST_CHECK_EQUAL(info.port, 80); // default
BOOST_CHECK_EQUAL(info.path, "");
}
BOOST_AUTO_TEST_CASE(test_parse_https) {
auto info = parse_simple_url("https://127.0.0.1");
BOOST_CHECK_EQUAL(info.host, "127.0.0.1");
BOOST_CHECK_EQUAL(info.scheme, "https");
BOOST_CHECK(info.is_https());
BOOST_CHECK_EQUAL(info.port, 443); // default
info = parse_simple_url("HTTPS://www.apa.org");
BOOST_CHECK_EQUAL(info.host, "www.apa.org");
BOOST_CHECK_EQUAL(info.scheme, "HTTPS");
BOOST_CHECK(info.is_https());
BOOST_CHECK_EQUAL(info.port, 443); // default
BOOST_CHECK_EQUAL(info.path, "");
}
BOOST_AUTO_TEST_CASE(test_parse_path) {
auto info = parse_simple_url("https://127.0.0.1:333/ola/korv");
BOOST_CHECK_EQUAL(info.host, "127.0.0.1");
BOOST_CHECK_EQUAL(info.scheme, "https");
BOOST_CHECK(info.is_https());
BOOST_CHECK_EQUAL(info.port, 333); // default
BOOST_CHECK_EQUAL(info.path, "/ola/korv");
}

View File

@@ -61,7 +61,7 @@ async def test_coordinator_queue_management(manager: ManagerClient):
s = await manager.server_add(start=False)
tasks = [asyncio.create_task(manager.server_start(s.server_id, expected_error="request canceled because some required nodes are dead|received notification of being banned from the cluster from")),
tasks = [asyncio.create_task(manager.server_start(s.server_id, expected_error="request canceled because some required nodes are dead")),
asyncio.create_task(manager.decommission_node(servers[1].server_id, expected_error="Decommission failed. See earlier errors"))]
await wait_for_first_completed([

View File

@@ -65,7 +65,7 @@ async def test_cannot_add_new_node(manager: ManagerClient, raft_op_timeout: int)
manager.server_stop_gracefully(servers[4].server_id))
logger.info("starting a sixth node with no quorum")
await manager.server_add(expected_error="raft operation \\[read_barrier\\] timed out, there is no raft quorum",
await manager.server_add(expected_error="raft operation [read_barrier] timed out, there is no raft quorum",
timeout=60)
logger.info("done")
@@ -96,7 +96,7 @@ async def test_quorum_lost_during_node_join(manager: ManagerClient, raft_op_time
logger.info("starting a fourth node")
fourth_node_future = asyncio.create_task(manager.server_add(
seeds=[servers[0].ip_addr],
expected_error="raft operation \\[add_entry\\] timed out, there is no raft quorum",
expected_error="raft operation [add_entry] timed out, there is no raft quorum",
timeout=60))
logger.info(f"waiting for the leader node {servers[0]} to start handling the join request")
@@ -141,7 +141,7 @@ async def test_quorum_lost_during_node_join_response_handler(manager: ManagerCli
logger.info("starting a fourth node")
fourth_node_future = asyncio.create_task(
manager.server_start(servers[3].server_id,
expected_error="raft operation \\[read_barrier\\] timed out, there is no raft quorum",
expected_error="raft operation [read_barrier] timed out, there is no raft quorum",
timeout=60))
logger.info(

View File

@@ -339,17 +339,3 @@ async def test_repair_timtestamp_difference(manager):
logger.info("Checking timestamps after repair")
check({host1: update2_timestamp, host2: update2_timestamp})
@pytest.mark.asyncio
async def test_small_table_optimization_repair(manager):
servers = await manager.servers_add(2, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND TABLETS = {'enabled': false}")
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
await manager.api.repair(servers[0].ip_addr, "ks", "tbl", small_table_optimization=True)
rows = await cql.run_async(f"SELECT * from system.repair_history")
assert len(rows) == 1

View File

@@ -1,63 +0,0 @@
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
from test.pylib.manager_client import ManagerClient
import asyncio
from datetime import datetime, timedelta
import pytest
import logging
import time
from test.pylib.rest_client import HTTPError
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_rest_api_on_startup(request, manager: ManagerClient):
host = None
stop_loop = False
# Asynchronously keep sending a REST API request in a loop.
# This lets us verify that the server doesn't crash when it is
# started or restarted while requests are ongoing.
async def test_rest_api():
timeout = 60 # seconds
start_time = time.time()
while True:
if time.time() - start_time > timeout:
raise TimeoutError
try:
logger.info(f"Sending raft_topology/reload request")
result = await manager.api.client.post("/storage_service/raft_topology/reload", host=host)
logger.info(f"Received result {result}")
if stop_loop:
return
except Exception as ex:
# Some errors are expected, for example:
# - Initially, `host=None`, so `manager.api.client.post` fails
# - Scylla returns 404 until the `/storage_service/raft_topology/reload` endpoint exists
# - aiohttp raises ClientConnectorError when it cannot connect
# This is okay. The important point is that ScyllaDB does not crash,
# so the final request should succeed.
pass
# Avoid spamming requests to prevent log flooding.
# One request per millisecond should be sufficient to expose issues.
await asyncio.sleep(0.001)
fut = asyncio.create_task(test_rest_api())
logger.info("Starting server")
server = await manager.server_add()
host = server.ip_addr
logger.info("Restarting server")
await manager.server_restart(server.server_id)
logger.info("Stopping the loop")
stop_loop = True
await fut

View File

@@ -9,7 +9,6 @@ from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError, read_barrier
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_info
from test.pylib.util import start_writes
from test.cluster.conftest import skip_mode
from test.cluster.util import wait_for_cql_and_get_hosts, new_test_keyspace, reconnect_driver, wait_for
import time
@@ -128,10 +127,9 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
await make_server("r1")
await make_server("r2")
await make_server("r3")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
@@ -155,30 +153,30 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 3
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
last_token = replicas[0].last_token
old_replica = None
for r in replicas[0].replicas:
assert r[0] != host_ids[3], "Tablet got migrated to node3"
assert r[0] != host_ids[2], "Tablet got migrated to node2"
if r[0] == host_ids[1]:
old_replica = r
assert old_replica is not None
new_replica = (host_ids[3], 0)
new_replica = (host_ids[2], 0)
logger.info(f"Moving tablet {old_replica} -> {new_replica}")
class node_failer:
def __init__(self, stage, replica, ks):
self.stage = stage
self.replica = replica
self.fail_idx = 1 if self.replica == "source" else 3
self.fail_idx = 1 if self.replica == "source" else 2
self.ks = ks
async def setup(self):
logger.info(f"Will fail {self.stage}")
if self.stage == "streaming":
await manager.api.enable_injection(servers[3].ip_addr, "stream_mutation_fragments", one_shot=True)
self.log = await manager.server_open_log(servers[3].server_id)
await manager.api.enable_injection(servers[2].ip_addr, "stream_mutation_fragments", one_shot=True)
self.log = await manager.server_open_log(servers[2].server_id)
self.mark = await self.log.mark()
elif self.stage in [ "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "end_migration", "do_revert_migration" ]:
await manager.api.enable_injection(servers[self.fail_idx].ip_addr, "raft_topology_barrier_and_drain_fail", one_shot=False,
@@ -190,7 +188,7 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
self.log = await manager.server_open_log(servers[self.fail_idx].server_id)
self.mark = await self.log.mark()
elif self.stage == "cleanup_target":
assert self.fail_idx == 3
assert self.fail_idx == 2
self.stream_fail = node_failer('streaming', 'source', ks)
await self.stream_fail.setup()
self.cleanup_fail = node_failer('cleanup', 'destination', ks)
@@ -224,11 +222,11 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
async def stop(self, via=0):
if self.stage == "cleanup_target":
await self.cleanup_fail.stop(via=4) # removenode of source is happening via node0 already
await self.cleanup_fail.stop(via=3) # removenode of source is happening via node0 already
await self.stream_stop_task
return
if self.stage == "revert_migration":
await self.revert_fail.stop(via=4)
await self.revert_fail.stop(via=3)
await self.wbro_fail_task
return
@@ -238,7 +236,6 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
await manager.remove_node(servers[via].server_id, servers[self.fail_idx].server_id)
logger.info(f"Done with {self.replica} {host_ids[self.fail_idx]}")
finish_writes = await start_writes(cql, ks, "test")
failer = node_failer(fail_stage, fail_replica, ks)
await failer.setup()
@@ -256,8 +253,6 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
for r in replicas[0].replicas:
assert r[0] != host_ids[failer.fail_idx]
await finish_writes()
# For dropping the keyspace after the node failure
await reconnect_driver(manager)

View File

@@ -1,39 +0,0 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import pytest
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
# Regression test for SCYLLADB-81.
#
# Unlike regular secondary indexes, vector indexes do not use materialized views
# in their implementation, therefore they do not need the keyspace
# to be rf-rack-valid in order to function properly.
#
# In this test, we check that creating a vector index without
# rf_rack_valid_keyspaces being set is possible.
@pytest.mark.asyncio
async def test_vector_store_can_be_created_without_rf_rack_valid(manager: ManagerClient):
# Explicitly disable the rf_rack_valid_keyspaces option.
config = {"rf_rack_valid_keyspaces": False}
srv = await manager.server_add(config=config)
cql, _ = await manager.get_ready_cql([srv])
# Explicitly create a keyspace with tablets.
await cql.run_async("CREATE KEYSPACE ks WITH replication = "
"{'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
"AND tablets = {'enabled': true}")
await cql.run_async("CREATE TABLE ks.t (pk int PRIMARY KEY, v vector<float, 3>)")
# Creating a vector store index should succeed.
await cql.run_async("CREATE CUSTOM INDEX ON ks.t(v) USING 'vector_index'")

View File

@@ -457,7 +457,7 @@ class ScyllaRESTAPIClient:
data = await self.client.get_json("/raft/leader_host", host=node_ip, params=params)
return HostID(data)
async def repair(self, node_ip: str, keyspace: str, table: str, ranges: str = '', small_table_optimization: bool = False) -> None:
async def repair(self, node_ip: str, keyspace: str, table: str, ranges: str = '') -> None:
"""Repair the given table and wait for it to complete"""
vnode_keyspaces = await self.client.get_json(f"/storage_service/keyspaces", host=node_ip, params={"replication": "vnodes"})
if keyspace in vnode_keyspaces:
@@ -465,8 +465,6 @@ class ScyllaRESTAPIClient:
params = {"columnFamilies": table, "ranges": ranges}
else:
params = {"columnFamilies": table}
if small_table_optimization:
params["small_table_optimization"] = "true"
sequence_number = await self.client.post_json(f"/storage_service/repair_async/{keyspace}", host=node_ip, params=params)
status = await self.client.get_json(f"/storage_service/repair_status", host=node_ip, params={"id": str(sequence_number)})
if status != 'SUCCESSFUL':

View File

@@ -794,7 +794,7 @@ class ScyllaServer:
if expected_error is not None:
with self.log_filename.open("r", encoding="utf-8") as log_file:
for line in log_file:
if re.search(expected_error, line):
if expected_error in line:
return
await report_error("the node startup failed, but the log file doesn't contain the expected error")
await report_error("failed to start the node")

View File

@@ -1356,7 +1356,6 @@ const char* to_string(sstables::scylla_metadata_type t) {
case sstables::scylla_metadata_type::ExtTimestampStats: return "ext_timestamp_stats";
case sstables::scylla_metadata_type::SSTableIdentifier: return "sstable_identifier";
case sstables::scylla_metadata_type::Schema: return "schema";
case sstables::scylla_metadata_type::ComponentsDigests: return "components_digests";
}
std::abort();
}
@@ -1414,28 +1413,6 @@ public:
}
_writer.EndArray();
}
void operator()(const sstables::components_digests& val) const {
_writer.StartObject();
auto write_digest = [&](std::string_view key, const std::optional<uint32_t>& digest) {
if (digest) {
_writer.Key(key);
_writer.Uint(*digest);
}
};
write_digest("data_digest", val.data_digest);
write_digest("compression_digest", val.compression_digest);
write_digest("filter_digest",val.filter_digest);
write_digest("statistics_digest", val.statistics_digest);
write_digest("summary_digest", val.summary_digest);
write_digest("index_digest", val.index_digest);
write_digest("toc_digest", val.toc_digest);
write_digest("partitions_digest", val.partitions_digest);
write_digest("rows_digest", val.rows_digest);
_writer.EndObject();
}
void operator()(const sstables::sstable_enabled_features& val) const {
std::pair<sstables::sstable_feature, const char*> all_features[] = {
{sstables::sstable_feature::NonCompoundPIEntries, "NonCompoundPIEntries"},

View File

@@ -85,17 +85,10 @@ future<connected_socket> utils::http::dns_connection_factory::make(abort_source*
static const char HTTPS[] = "https";
utils::http::url_info utils::http::parse_simple_url(std::string_view uri) {
/**
* https://en.wikipedia.org/wiki/IPv6#Addressing
* In case a port is included with a numerical ipv6 address,
* the address part is encases in a "[]" wrapper, like
* http://[2001:db8:4006:812::200e]:8080
*/
static boost::regex simple_url(R"foo(([a-zA-Z]+):\/\/((?:\[[^\]]+\])|[^\/:]+)(:\d+)?(\/.*)?)foo");
static boost::regex simple_url(R"foo((https?):\/\/([^\/:]+)(:\d+)?(\/.*)?)foo");
boost::smatch m;
std::string tmp(uri);
if (!boost::regex_match(tmp, m, simple_url)) {
throw std::invalid_argument(fmt::format("Could not parse URI {}", uri));
}
@@ -105,12 +98,8 @@ utils::http::url_info utils::http::parse_simple_url(std::string_view uri) {
auto port = m[3].str();
auto path = m[4].str();
bool https = (strcasecmp(scheme.c_str(), HTTPS) == 0);
bool https = scheme == HTTPS;
// check for numeric ipv6 address + port case
if (host.size() > 2 && host.front() == '[' && host.back() == ']') {
host = host.substr(1, host.size() - 2);
}
return url_info {
.scheme = std::move(scheme),
.host = std::move(host),
@@ -120,6 +109,6 @@ utils::http::url_info utils::http::parse_simple_url(std::string_view uri) {
}
bool utils::http::url_info::is_https() const {
return strcasecmp(scheme.c_str(), HTTPS) == 0;
return scheme == HTTPS;
}

View File

@@ -2948,10 +2948,18 @@ void allocating_section::on_alloc_failure(logalloc::region& r) {
r.allocator().invalidate_references();
if (r.get_tracker().get_impl().segment_pool().allocation_failure_flag()) {
_lsa_reserve *= 2;
llogger.info("LSA allocation failure, increasing reserve in section {} to {} segments; trace: {}", fmt::ptr(this), _lsa_reserve, current_backtrace());
if (!_namer) {
llogger.info("LSA allocation failure, increasing reserve in section {} to {} segments; trace: {}", fmt::ptr(this), _lsa_reserve, current_backtrace());
} else {
llogger.info("LSA allocation failure, increasing reserve in section {} ({}) to {} segments; trace: {}", fmt::ptr(this), _namer, _lsa_reserve, current_backtrace());
}
} else {
_std_reserve *= 2;
llogger.info("Standard allocator failure, increasing head-room in section {} to {} [B]; trace: {}", fmt::ptr(this), _std_reserve, current_backtrace());
if (!_namer) {
llogger.info("Standard allocator failure, increasing head-room in section {} to {} [B]; trace: {}", fmt::ptr(this), _std_reserve, current_backtrace());
} else {
llogger.info("Standard allocator failure, increasing head-room in section {} ({}) to {} [B]; trace: {}", fmt::ptr(this), _namer, _std_reserve, current_backtrace());
}
}
reserve(r.get_tracker().get_impl());
}

View File

@@ -9,6 +9,10 @@
#pragma once
#include <memory>
#include <string>
#include <string_view>
#include <functional>
#include <fmt/format.h>
#include <seastar/core/memory.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/shared_ptr.hh>
@@ -20,6 +24,28 @@
namespace logalloc {
// Forward declaration for use in allocating_section_namer
class schema;
// Helper class to format allocating section names without intermediate string allocation
// This is a formattable object that can be passed directly to fmt logger
class allocating_section_namer {
std::function<void(fmt::memory_buffer&)> _formatter;
public:
allocating_section_namer() = default;
template<typename Func>
explicit allocating_section_namer(Func&& f) : _formatter(std::forward<Func>(f)) {}
void format_to(fmt::memory_buffer& buf) const {
if (_formatter) {
_formatter(buf);
}
}
explicit operator bool() const noexcept { return bool(_formatter); }
};
struct occupancy_stats;
class region;
class region_impl;
@@ -442,6 +468,7 @@ class allocating_section {
size_t _minimum_lsa_emergency_reserve = 0;
int64_t _remaining_std_bytes_until_decay = s_bytes_per_decay;
int _remaining_lsa_segments_until_decay = s_segments_per_decay;
allocating_section_namer _namer; // Optional namer for debugging
private:
struct guard {
tracker::impl& _tracker;
@@ -453,6 +480,8 @@ private:
void maybe_decay_reserve() noexcept;
void on_alloc_failure(logalloc::region&);
public:
allocating_section() = default;
explicit allocating_section(allocating_section_namer namer) : _namer(std::move(namer)) {}
void set_lsa_reserve(size_t) noexcept;
void set_std_reserve(size_t) noexcept;
@@ -542,6 +571,15 @@ future<> use_standard_allocator_segment_pool_backend(size_t available_memory);
}
template <> struct fmt::formatter<logalloc::allocating_section_namer> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const logalloc::allocating_section_namer& namer, fmt::format_context& ctx) const {
fmt::memory_buffer buf;
namer.format_to(buf);
return fmt::format_to(ctx.out(), "{}", std::string_view(buf.data(), buf.size()));
}
};
template <> struct fmt::formatter<logalloc::occupancy_stats> : fmt::formatter<string_view> {
auto format(const logalloc::occupancy_stats& stats, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{:.2f}%, {:d} / {:d} [B]",

View File

@@ -20,7 +20,6 @@
#include <seastar/core/with_timeout.hh>
#include <chrono>
#include <fmt/format.h>
#include <netinet/tcp.h>
using namespace seastar;
using namespace std::chrono_literals;
@@ -46,8 +45,6 @@ public:
socket.set_nodelay(true);
socket.set_keepalive_parameters(get_keepalive_parameters(timeout()));
socket.set_keepalive(true);
unsigned int timeout_ms = timeout().count();
socket.set_sockopt(IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_ms, sizeof(timeout_ms));
co_return socket;
}