Compare commits

..

4 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
bb1ab98fc9 Remove unused unimplemented::cause enum values and document remaining ones
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-05 22:26:58 +00:00
copilot-swe-agent[bot]
bdbc47a333 Restore unimplemented::cause::SUPER - still needed for error reporting
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-05 21:01:46 +00:00
copilot-swe-agent[bot]
5407b6b43d Remove dead code for super table handling
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-05 20:59:18 +00:00
copilot-swe-agent[bot]
8582156257 Initial plan 2025-12-05 20:48:35 +00:00
949 changed files with 12911 additions and 38185 deletions

9
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall
auth/* @nuivall @ptrsmrn
# CACHE
row_cache* @tgrabiec
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall
cql3/* @tgrabiec @nuivall @ptrsmrn
# COUNTERS
counters* @nuivall
tests/counter_test* @nuivall
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
# DOCS
docs/* @annastuchlik @tzach
@@ -57,6 +57,7 @@ repair/* @tgrabiec @asias
# SCHEMA MANAGEMENT
db/schema_tables* @tgrabiec
db/legacy_schema_migrator* @tgrabiec
service/migration* @tgrabiec
schema* @tgrabiec

View File

@@ -84,14 +84,3 @@ ninja build/<mode>/scylla
- Strive for simplicity and clarity, add complexity only when clearly justified
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context
## Test Philosophy
- Performance matters. Tests should run as quickly as possible. Sleeps in the code are highly discouraged and should be avoided, to reduce run time and flakiness.
- Stability matters. Tests should be stable. New tests should be executed 100 times at least to ensure they pass 100 out of 100 times. (use --repeat 100 --max-failures 1 when running it)
- Unit tests should ideally test one thing and one thing only.
- Tests for bug fixes should run before the fix - and show the failure and after the fix - and show they now pass.
- Tests for bug fixes should have in their comments which bug fixes (GitHub or JIRA issue) they test.
- Tests in debug are always slower, so if needed, reduce number of iterations, rows, data used, cycles, etc. in debug mode.
- Tests should strive to be repeatable, and not use random input that will make their results unpredictable.
- Tests should consume as little resources as possible. Prefer running tests on a single node if it is sufficient, for example.

View File

@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
if is_draft:
labels_to_add.append("conflicts")
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
pr_comment += "Please resolve them and mark this PR as ready for review"
backport_pr.create_issue_comment(pr_comment)
# Apply all labels at once if we have any

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|(?:https://scylladb\\.atlassian\\.net/browse/)?([A-Z]+-\\d+))`;
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -0,0 +1,12 @@
name: Call Jira Status In Progress
on:
pull_request_target:
types: [opened]
jobs:
call-jira-status-in-progress:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status In Review
on:
pull_request_target:
types: [ready_for_review, review_requested]
jobs:
call-jira-status-in-review:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status Ready For Merge
on:
pull_request_target:
types: [labeled]
jobs:
call-jira-status-update:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,41 +0,0 @@
name: Sync Jira Based on PR Events
on:
pull_request_target:
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
permissions:
contents: read
pull-requests: write
issues: write
jobs:
jira-sync-pr-opened:
if: github.event.action == 'opened'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-in-review:
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-add-label:
if: github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-remove-label:
if: github.event.action == 'unlabeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-pr-closed:
if: github.event.action == 'closed'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,22 +0,0 @@
name: Sync Jira Based on PR Milestone Events
on:
pull_request_target:
types: [milestoned, demilestoned]
permissions:
contents: read
pull-requests: read
jobs:
jira-sync-milestone-set:
if: github.event.action == 'milestoned'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_set.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-milestone-removed:
if: github.event.action == 'demilestoned'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_removed.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,14 +0,0 @@
name: Call Jira release creation for new milestone
on:
milestone:
types: [created]
jobs:
sync-milestone-to-jira:
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
with:
# Comma-separated list of Jira project keys
jira_project_keys: "SCYLLADB,CUSTOMER,SMI"
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,13 +0,0 @@
name: validate_pr_author_email
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
jobs:
validate_pr_author_email:
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main

View File

@@ -1,62 +0,0 @@
name: Close issues created by Scylla associates
on:
issues:
types: [opened, reopened]
permissions:
issues: write
jobs:
comment-and-close:
runs-on: ubuntu-latest
steps:
- name: Comment and close if author email is scylladb.com
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
const issue = context.payload.issue;
const actor = context.actor;
// Get user data (only public email is available)
const { data: user } = await github.rest.users.getByUsername({
username: actor,
});
const email = user.email || "";
console.log(`Actor: ${actor}, public email: ${email || "<none>"}`);
// Only continue if email exists and ends with @scylladb.com
if (!email || !email.toLowerCase().endsWith("@scylladb.com")) {
console.log("User is not a scylladb.com email (or email not public); skipping.");
return;
}
const owner = context.repo.owner;
const repo = context.repo.repo;
const issue_number = issue.number;
const body = "Issues in this repository are closed automatically. Scylla associates should use Jira to manage issues.\nPlease move this issue to Jira https://scylladb.atlassian.net/jira/software/c/projects/SCYLLADB/list";
// Add the comment
await github.rest.issues.createComment({
owner,
repo,
issue_number,
body,
});
console.log(`Comment added to #${issue_number}`);
// Close the issue
await github.rest.issues.update({
owner,
repo,
issue_number,
state: "closed",
state_reason: "not_planned"
});
console.log(`Issue #${issue_number} closed.`);

View File

@@ -13,5 +13,5 @@ jobs:
- uses: codespell-project/actions-codespell@master
with:
only_warn: 1
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison,iif,tread"
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison"
skip: "./.git,./build,./tools,*.js,*.lock,./test,./licenses,./redis/lolwut.cc,*.svg"

View File

@@ -18,8 +18,6 @@ on:
jobs:
release:
permissions:
contents: write
runs-on: ubuntu-latest
steps:
- name: Checkout

View File

@@ -2,9 +2,6 @@ name: "Docs / Build PR"
# For more information,
# see https://sphinx-theme.scylladb.com/stable/deployment/production.html#available-workflows
permissions:
contents: read
env:
FLAG: ${{ github.repository == 'scylladb/scylla-enterprise' && 'enterprise' || 'opensource' }}

View File

@@ -1,8 +1,5 @@
name: Docs / Validate metrics
permissions:
contents: read
on:
pull_request:
branches:
@@ -10,7 +7,7 @@ on:
- enterprise
paths:
- '**/*.cc'
- 'scripts/metrics-config.yml'
- 'scripts/metrics-config.yml'
- 'scripts/get_description.py'
- 'docs/_ext/scylladb_metrics.py'
@@ -18,20 +15,20 @@ jobs:
validate-metrics:
runs-on: ubuntu-latest
name: Check metrics documentation coverage
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
- name: Install dependencies
run: pip install PyYAML
- name: Validate metrics
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml

View File

@@ -14,8 +14,7 @@ env:
CLEANER_DIRS: test/unit exceptions alternator api auth cdc compaction db dht gms index lang message mutation mutation_writer node_ops raft redis replica service
SEASTAR_BAD_INCLUDE_OUTPUT_PATH: build/seastar-bad-include.log
permissions:
contents: read
permissions: {}
# cancel the in-progress run upon a repush
concurrency:
@@ -35,6 +34,8 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- run: |
sudo dnf -y install clang-tools-extra
- name: Generate compilation database
run: |
cmake \

View File

@@ -10,8 +10,6 @@ on:
jobs:
read-toolchain:
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
image: ${{ steps.read.outputs.image }}
steps:

View File

@@ -3,40 +3,19 @@ name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
pull_request_target:
types:
- unlabeled
jobs:
trigger-jenkins:
if: (github.event_name == 'issue_comment' && github.event.comment.user.login != 'scylladbbot') || github.event.label.name == 'conflicts'
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Validate Comment Trigger
if: github.event_name == 'issue_comment'
id: verify_comment
shell: bash
run: |
BODY=$(cat << 'EOF'
${{ github.event.comment.body }}
EOF
)
CLEAN_BODY=$(echo "$BODY" | grep -v '^[[:space:]]*>')
if echo "$CLEAN_BODY" | grep -qi '@scylladbbot' && echo "$CLEAN_BODY" | grep -qi 'trigger-ci'; then
echo "trigger=true" >> $GITHUB_OUTPUT
else
echo "trigger=false" >> $GITHUB_OUTPUT
fi
- name: Trigger Scylla-CI-Route Jenkins Job
if: github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true'
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
run: |
PR_NUMBER=${{ github.event.issue.number || github.event.pull_request.number }}
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

View File

@@ -1,197 +0,0 @@
# Implementation Summary: Error Injection Event Stream
## Problem Statement
Tests using error injections had to rely on log parsing to detect when injection points were hit:
```python
mark, _ = await log.wait_for('topology_coordinator_pause_before_processing_backlog: waiting', from_mark=mark)
```
This approach was:
- **Slow**: Required waiting for log flushes and buffer processing
- **Unreliable**: Regex matching could fail or match wrong lines
- **Fragile**: Changes to log messages broke tests
## Solution
Implemented a Server-Sent Events (SSE) API that sends real-time notifications when error injection points are triggered.
## Implementation
### 1. Backend Event System (`utils/error_injection.hh`)
**Added**:
- `error_injection_event_callback` type for event notifications
- `_event_callbacks` vector to store registered callbacks
- `notify_event()` method called by all `inject()` methods
- `register_event_callback()` / `clear_event_callbacks()` methods
- Cross-shard registration via `register_event_callback_on_all()`
**Modified**:
- All `inject()` methods now call `notify_event()` after logging
- Changed log level from DEBUG to INFO for better visibility
- Both enabled/disabled template specializations updated
### 2. SSE API Endpoint (`api/error_injection.cc`)
**Added**:
- `GET /v2/error_injection/events` endpoint
- Streams events in SSE format: `data: {"injection":"name","type":"handler","shard":0}\n\n`
- Cross-shard event collection using `foreign_ptr` and `smp::submit_to()`
- Automatic cleanup on client disconnect
**Architecture**:
1. Client connects → queue created on handler shard
2. Callbacks registered on ALL shards
3. When injection fires → event sent via `smp::submit_to()` to queue
4. Queue → SSE stream → client
5. Client disconnect → callbacks cleared on all shards
### 3. Python Client (`test/pylib/rest_client.py`)
**Added**:
- `InjectionEventStream` class:
- `wait_for_injection(name, timeout)` - wait for specific injection
- Background task reads SSE stream
- Queue-based event delivery
- `injection_event_stream()` context manager for lifecycle
- Full async/await support
**Usage**:
```python
async with injection_event_stream(server_ip) as stream:
await api.enable_injection(server_ip, "my_injection", one_shot=True)
# ... trigger operation ...
event = await stream.wait_for_injection("my_injection", timeout=30)
```
### 4. Tests (`test/cluster/test_error_injection_events.py`)
**Added**:
- `test_injection_event_stream_basic` - basic functionality
- `test_injection_event_stream_multiple_injections` - multiple tracking
- `test_injection_event_vs_log_parsing_comparison` - old vs new
### 5. Documentation (`docs/dev/error_injection_events.md`)
Complete documentation covering:
- Architecture and design
- Usage examples
- Migration guide from log parsing
- Thread safety and cleanup
## Key Design Decisions
### Why SSE instead of WebSocket?
- **Unidirectional**: We only need server → client events
- **Simpler**: Built on HTTP, easier to implement
- **Standard**: Well-supported in Python (aiohttp)
- **Sufficient**: No need for bidirectional communication
### Why Thread-Local Callbacks?
- **Performance**: No cross-shard synchronization overhead
- **Simplicity**: Each shard independent
- **Safety**: No shared mutable state
- Event delivery handled by `smp::submit_to()`
### Why Info Level Logging?
- **Visibility**: Events should be visible in logs AND via SSE
- **Debugging**: Easier to correlate events with log context
- **Consistency**: Matches importance of injection triggers
## Benefits
### Performance
- **Instant notification**: No waiting for log flushes
- **No regex matching**: Direct event delivery
- **Parallel processing**: Events from all shards
### Reliability
- **Type-safe**: Structured JSON events
- **No missed events**: Queue-based delivery
- **Automatic cleanup**: RAII ensures no leaks
### Developer Experience
- **Clean API**: Simple async/await pattern
- **Better errors**: Timeout on specific injection name
- **Metadata**: Event includes type and shard ID
- **Backward compatible**: Existing tests unchanged
## Testing
### Security
✅ CodeQL scan: **0 alerts** (Python)
### Validation Needed
Due to build environment limitations, the following validations are recommended:
- [ ] Build C++ code in dev mode
- [ ] Run example tests: `./test.py --mode=dev test/cluster/test_error_injection_events.py`
- [ ] Verify SSE connection lifecycle (connect, disconnect, reconnect)
- [ ] Test with multiple concurrent clients
- [ ] Verify cross-shard event delivery
- [ ] Performance comparison with log parsing
## Files Changed
```
api/api-doc/error_injection.json | 15 +++
api/error_injection.cc | 82 ++++++++++++++
docs/dev/error_injection_events.md | 132 +++++++++++++++++++++
test/cluster/test_error_injection_events.py | 140 ++++++++++++++++++++++
test/pylib/rest_client.py | 144 ++++++++++++++++++++++
utils/error_injection.hh | 81 +++++++++++++
6 files changed, 587 insertions(+), 7 deletions(-)
```
## Migration Guide
### Old Approach
```python
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
await manager.api.enable_injection(server.ip_addr, "my_injection", one_shot=True)
# ... trigger operation ...
mark, _ = await log.wait_for('my_injection: waiting', from_mark=mark)
```
### New Approach
```python
async with injection_event_stream(server.ip_addr) as stream:
await manager.api.enable_injection(server.ip_addr, "my_injection", one_shot=True)
# ... trigger operation ...
event = await stream.wait_for_injection("my_injection", timeout=30)
```
### Backward Compatibility
- ✅ All existing log-based tests continue to work
- ✅ Logging still happens (now at INFO level)
- ✅ No breaking changes to existing APIs
- ✅ SSE is opt-in for new tests
## Future Enhancements
Possible improvements:
1. Server-side filtering by injection name (query parameter)
2. Include injection parameters in events
3. Add event timestamps
4. Event history/replay support
5. Multiple concurrent SSE clients per server
6. WebSocket support if bidirectional communication needed
## Conclusion
This implementation successfully addresses the problem statement:
- ✅ Eliminates log parsing
- ✅ Faster tests
- ✅ More reliable detection
- ✅ Clean API
- ✅ Backward compatible
- ✅ Well documented
- ✅ Security validated
The solution follows ScyllaDB best practices:
- RAII for resource management
- Seastar async patterns (coroutines, futures)
- Cross-shard communication via `smp::submit_to()`
- Thread-local state, no locks
- Comprehensive error handling

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.2.0-dev
VERSION=2026.1.0-dev
if test -f version
then

View File

@@ -0,0 +1,109 @@
# Analysis of unimplemented::cause Enum Values
This document provides an analysis of the `unimplemented::cause` enum values after cleanup.
## Removed Unused Enum Values (20 values removed)
The following enum values had **zero usages** in the codebase and have been removed:
- `LWT` - Lightweight transactions
- `PAGING` - Query result paging
- `AUTH` - Authentication
- `PERMISSIONS` - Permission checking
- `COUNTERS` - Counter columns
- `MIGRATIONS` - Schema migrations
- `GOSSIP` - Gossip protocol
- `TOKEN_RESTRICTION` - Token-based restrictions
- `LEGACY_COMPOSITE_KEYS` - Legacy composite key handling
- `COLLECTION_RANGE_TOMBSTONES` - Collection range tombstones
- `RANGE_DELETES` - Range deletion operations
- `COMPRESSION` - Compression features
- `NONATOMIC` - Non-atomic operations
- `CONSISTENCY` - Consistency level handling
- `WRAP_AROUND` - Token wrap-around handling
- `STORAGE_SERVICE` - Storage service operations
- `SCHEMA_CHANGE` - Schema change operations
- `MIXED_CF` - Mixed column family operations
- `SSTABLE_FORMAT_M` - SSTable format M
## Remaining Enum Values (8 values kept)
### 1. `API` (4 usages)
**Impact**: REST API features that are not fully implemented.
**Usages**:
- `api/column_family.cc:1052` - Fails when `split_output` parameter is used in major compaction
- `api/compaction_manager.cc:100,146,216` - Warns when force_user_defined_compaction or related operations are called
**User Impact**: Some REST API endpoints for compaction management are stubs and will warn or fail.
### 2. `INDEXES` (6 usages)
**Impact**: Secondary index features not fully supported.
**Usages**:
- `api/column_family.cc:433,440,449,456` - Warns about index-related operations
- `cql3/restrictions/statement_restrictions.cc:1158` - Fails when attempting filtering on collection columns without proper indexing
- `cql3/statements/update_statement.cc:149` - Warns about index operations
**User Impact**: Some advanced secondary index features (especially filtering on collections) are not available.
### 3. `TRIGGERS` (2 usages)
**Impact**: Trigger support is not implemented.
**Usages**:
- `db/schema_tables.cc:2017` - Warns when loading trigger metadata from schema tables
- `service/storage_proxy.cc:4166` - Warns when processing trigger-related operations
**User Impact**: Cassandra triggers (stored procedures that execute on data changes) are not supported.
### 4. `METRICS` (1 usage)
**Impact**: Some query processor metrics are not collected.
**Usages**:
- `cql3/query_processor.cc:585` - Warns about missing metrics implementation
**User Impact**: Minor - some internal metrics may not be available.
### 5. `VALIDATION` (4 usages)
**Impact**: Schema validation checks are partially implemented.
**Usages**:
- `cql3/functions/token_fct.hh:38` - Warns about validation in token functions
- `cql3/statements/drop_keyspace_statement.cc:40` - Warns when dropping keyspace
- `cql3/statements/truncate_statement.cc:87` - Warns when truncating table
- `service/migration_manager.cc:750` - Warns during schema migrations
**User Impact**: Some schema validation checks are skipped (with warnings logged).
### 6. `REVERSED` (1 usage)
**Impact**: Reversed type support in CQL protocol.
**Usages**:
- `transport/server.cc:2085` - Fails when trying to use reversed types in CQL protocol
**User Impact**: Reversed types are not supported in the CQL protocol implementation.
### 7. `HINT` (1 usage)
**Impact**: Hint replaying is not implemented.
**Usages**:
- `db/batchlog_manager.cc:251` - Warns when attempting to replay hints
**User Impact**: Cassandra hints (temporary storage of writes when nodes are down) are not supported.
### 8. `SUPER` (2 usages)
**Impact**: Super column families are not supported.
**Usages**:
- `db/legacy_schema_migrator.cc:157` - Fails when encountering super column family in legacy schema
- `db/schema_tables.cc:2288` - Fails when encountering super column family in schema tables
**User Impact**: Super column families (legacy Cassandra feature) will cause errors if encountered in legacy data or schema migrations.
## Summary
- **Removed**: 20 unused enum values (76% reduction)
- **Kept**: 8 actively used enum values (24% remaining)
- **Total lines removed**: ~40 lines from enum definition and switch statement
The remaining enum values represent actual unimplemented features that users may encounter, with varying impacts ranging from warnings (TRIGGERS, METRICS, VALIDATION, HINT) to failures (API split_output, INDEXES on collections, REVERSED types, SUPER tables).

View File

@@ -18,7 +18,6 @@ target_sources(alternator
consumed_capacity.cc
ttl.cc
parsed_expression_cache.cc
http_compression.cc
${cql_grammar_srcs})
target_include_directories(alternator
PUBLIC

View File

@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
if (!comparison_operator.IsString()) {
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
}
std::string op = rjson::to_string(comparison_operator);
std::string op = comparison_operator.GetString();
auto it = ops.find(op);
if (it == ops.end()) {
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
}
if (kv1.name == "S") {
return cmp(rjson::to_string_view(kv1.value),
rjson::to_string_view(kv2.value));
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
}
if (kv1.name == "B") {
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(rjson::to_string_view(kv_v.value),
rjson::to_string_view(kv_lb.value),
rjson::to_string_view(kv_ub.value),
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
bounds_from_query);
}
if (kv_v.name == "B") {
@@ -618,7 +618,7 @@ conditional_operator_type get_conditional_operator(const rjson::value& req) {
// Check if the existing values of the item (previous_item) match the
// conditions given by the Expected and ConditionalOperator parameters
// (if they exist) in the request (an UpdateItem, PutItem or DeleteItem).
// This function can throw a ValidationException API error if there
// This function can throw an ValidationException API error if there
// are errors in the format of the condition itself.
bool verify_expected(const rjson::value& req, const rjson::value* previous_item) {
const rjson::value* expected = rjson::find(req, "Expected");

View File

@@ -8,8 +8,6 @@
#include "consumed_capacity.hh"
#include "error.hh"
#include "utils/rjson.hh"
#include <fmt/format.h>
namespace alternator {
@@ -34,18 +32,18 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
if (!return_consumed->IsString()) {
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
}
std::string_view consumed = rjson::to_string_view(*return_consumed);
std::string consumed = return_consumed->GetString();
if (consumed == "INDEXES") {
throw api_error::validation("INDEXES consumed capacity is not supported");
}
if (consumed != "TOTAL") {
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
throw api_error::validation("Unknown consumed capacity "+ consumed);
}
return true;
}
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
if (_should_add_to_response) {
if (_should_add_to_reponse) {
auto consumption = rjson::empty_object();
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
rjson::add(response, "ConsumedCapacity", std::move(consumption));

View File

@@ -28,9 +28,9 @@ namespace alternator {
class consumed_capacity_counter {
public:
consumed_capacity_counter() = default;
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){}
consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
bool operator()() const noexcept {
return _should_add_to_response;
return _should_add_to_reponse;
}
consumed_capacity_counter& operator +=(uint64_t bytes);
@@ -44,7 +44,7 @@ public:
uint64_t _total_bytes = 0;
static bool should_add_capacity(const rjson::value& request);
protected:
bool _should_add_to_response = false;
bool _should_add_to_reponse = false;
};
class rcu_consumed_capacity_counter : public consumed_capacity_counter {

View File

@@ -28,7 +28,6 @@ static logging::logger logger("alternator_controller");
controller::controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -40,7 +39,6 @@ controller::controller(
: protocol_server(sg)
, _gossiper(gossiper)
, _proxy(proxy)
, _ss(ss)
, _mm(mm)
, _sys_dist_ks(sys_dist_ks)
, _cdc_gen_svc(cdc_gen_svc)
@@ -91,7 +89,7 @@ future<> controller::start_server() {
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
return cfg.alternator_timeout_in_ms;
};
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
@@ -105,23 +103,11 @@ future<> controller::start_server() {
alternator_port = _config.alternator_port();
_listen_addresses.push_back({addr, *alternator_port});
}
std::optional<uint16_t> alternator_port_proxy_protocol;
if (_config.alternator_port_proxy_protocol()) {
alternator_port_proxy_protocol = _config.alternator_port_proxy_protocol();
_listen_addresses.push_back({addr, *alternator_port_proxy_protocol});
}
std::optional<uint16_t> alternator_https_port;
std::optional<uint16_t> alternator_https_port_proxy_protocol;
std::optional<tls::credentials_builder> creds;
if (_config.alternator_https_port() || _config.alternator_https_port_proxy_protocol()) {
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
_listen_addresses.push_back({addr, *alternator_https_port});
}
if (_config.alternator_https_port_proxy_protocol()) {
alternator_https_port_proxy_protocol = _config.alternator_https_port_proxy_protocol();
_listen_addresses.push_back({addr, *alternator_https_port_proxy_protocol});
}
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
_listen_addresses.push_back({addr, *alternator_https_port});
creds.emplace();
auto opts = _config.alternator_encryption_options();
if (opts.empty()) {
@@ -147,29 +133,20 @@ future<> controller::start_server() {
}
}
_server.invoke_on_all(
[this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds,
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
_config.alternator_max_users_query_size_in_trace_output,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).handle_exception([this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] (std::exception_ptr ep) {
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}, proxy-protocol port {}, TLS proxy-protocol port {}: {}",
addr,
alternator_port ? std::to_string(*alternator_port) : "OFF",
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF",
ep);
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}: {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF", ep);
return stop_server().then([ep = std::move(ep)] { return make_exception_future<>(ep); });
}).then([addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}, proxy-protocol port {}, TLS proxy-protocol port {}",
addr,
alternator_port ? std::to_string(*alternator_port) : "OFF",
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF");
}).then([addr, alternator_port, alternator_https_port] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
}).get();
});
}
@@ -192,7 +169,7 @@ future<> controller::request_stop_server() {
});
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
future<utils::chunked_vector<client_data>> controller::get_client_data() {
return _server.local().get_client_data();
}

View File

@@ -15,7 +15,6 @@
namespace service {
class storage_proxy;
class storage_service;
class migration_manager;
class memory_limiter;
}
@@ -58,7 +57,6 @@ class server;
class controller : public protocol_server {
sharded<gms::gossiper>& _gossiper;
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<service::migration_manager>& _mm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<cdc::generation_service>& _cdc_gen_svc;
@@ -76,7 +74,6 @@ public:
controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -96,7 +93,7 @@ public:
// This virtual function is called (on each shard separately) when the
// virtual table "system.clients" is read. It is expected to generate a
// list of clients connected to this server (on this shard).
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
};
}

View File

@@ -17,7 +17,6 @@
#include "auth/service.hh"
#include "db/config.hh"
#include "db/view/view_build_status.hh"
#include "locator/tablets.hh"
#include "mutation/tombstone.hh"
#include "locator/abstract_replication_strategy.hh"
#include "utils/log.hh"
@@ -68,14 +67,6 @@ using namespace std::chrono_literals;
logging::logger elogger("alternator-executor");
namespace std {
template <> struct hash<std::pair<sstring, sstring>> {
size_t operator () (const std::pair<sstring, sstring>& p) const {
return std::hash<sstring>()(p.first) * 1009 + std::hash<sstring>()(p.second) * 3;
}
};
}
namespace alternator {
// Alternator-specific table properties stored as hidden table tags:
@@ -237,7 +228,7 @@ static void validate_is_object(const rjson::value& value, const char* caller) {
}
// This function assumes the given value is an object and returns requested member value.
// If it is not possible, an api_error::validation is thrown.
// If it is not possible an api_error::validation is thrown.
static const rjson::value& get_member(const rjson::value& obj, const char* member_name, const char* caller) {
validate_is_object(obj, caller);
const rjson::value* ret = rjson::find(obj, member_name);
@@ -249,7 +240,7 @@ static const rjson::value& get_member(const rjson::value& obj, const char* membe
// This function assumes the given value is an object with a single member, and returns this member.
// In case the requirements are not met, an api_error::validation is thrown.
// In case the requirements are not met an api_error::validation is thrown.
static const rjson::value::Member& get_single_member(const rjson::value& v, const char* caller) {
if (!v.IsObject() || v.MemberCount() != 1) {
throw api_error::validation(format("{}: expected an object with a single member.", caller));
@@ -257,66 +248,14 @@ static const rjson::value::Member& get_single_member(const rjson::value& v, cons
return *(v.MemberBegin());
}
class executor::describe_table_info_manager : public service::migration_listener::empty_listener {
executor &_executor;
struct table_info {
utils::simple_value_with_expiry<std::uint64_t> size_in_bytes;
};
std::unordered_map<std::pair<sstring, sstring>, table_info> info_for_tables;
bool active = false;
public:
describe_table_info_manager(executor& executor) : _executor(executor) {
_executor._proxy.data_dictionary().real_database_ptr()->get_notifier().register_listener(this);
active = true;
}
describe_table_info_manager(const describe_table_info_manager &) = delete;
describe_table_info_manager(describe_table_info_manager&&) = delete;
~describe_table_info_manager() {
if (active) {
on_fatal_internal_error(elogger, "describe_table_info_manager was not stopped before destruction");
}
}
describe_table_info_manager &operator = (const describe_table_info_manager &) = delete;
describe_table_info_manager &operator = (describe_table_info_manager&&) = delete;
static std::chrono::high_resolution_clock::time_point now() {
return std::chrono::high_resolution_clock::now();
}
std::optional<std::uint64_t> get_cached_size_in_bytes(const sstring &ks_name, const sstring &cf_name) const {
auto it = info_for_tables.find({ks_name, cf_name});
if (it != info_for_tables.end()) {
return it->second.size_in_bytes.get();
}
return std::nullopt;
}
void cache_size_in_bytes(sstring ks_name, sstring cf_name, std::uint64_t size_in_bytes, std::chrono::high_resolution_clock::time_point expiry) {
info_for_tables[{std::move(ks_name), std::move(cf_name)}].size_in_bytes.set_if_longer_expiry(size_in_bytes, expiry);
}
future<> stop() {
co_await _executor._proxy.data_dictionary().real_database_ptr()->get_notifier().unregister_listener(this);
active = false;
co_return;
}
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
if (!ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX)) return;
info_for_tables.erase({ks_name, cf_name});
}
};
executor::executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
smp_service_group ssg,
utils::updateable_value<uint32_t> default_timeout_in_ms)
: _gossiper(gossiper),
_ss(ss),
_proxy(proxy),
_mm(mm),
_sdks(sdks),
@@ -329,7 +268,6 @@ executor::executor(gms::gossiper& gossiper,
_stats))
{
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
_describe_table_info_manager = std::make_unique<describe_table_info_manager>(*this);
register_metrics(_metrics, _stats);
}
@@ -481,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
if (!table_name_value->IsString()) {
throw api_error::validation("Non-string TableName field in request");
}
std::string table_name = rjson::to_string(*table_name_value);
std::string table_name = table_name_value->GetString();
return table_name;
}
@@ -608,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
// does exist but the index does not (ValidationException).
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
@@ -649,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
attribute_name, value));
}
return rjson::to_string(*attribute_value);
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
}
// Convenience function for getting the value of a boolean attribute, or a
@@ -682,7 +620,7 @@ static std::optional<int> get_int_attribute(const rjson::value& value, std::stri
}
// Sets a KeySchema object inside the given JSON parent describing the key
// attributes of the given schema as being either HASH or RANGE keys.
// attributes of the the given schema as being either HASH or RANGE keys.
// Additionally, adds to a given map mappings between the key attribute
// names and their type (as a DynamoDB type string).
void executor::describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>* attribute_types, const std::map<sstring, sstring> *tags) {
@@ -814,44 +752,12 @@ static future<bool> is_view_built(
}
future<> executor::cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl) {
auto expiry = describe_table_info_manager::now() + ttl;
return container().invoke_on_all(
[schema, size_in_bytes, expiry] (executor& exec) {
exec._describe_table_info_manager->cache_size_in_bytes(schema->ks_name(), schema->cf_name(), size_in_bytes, expiry);
});
}
future<> executor::fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting) {
auto cached_size = _describe_table_info_manager->get_cached_size_in_bytes(schema->ks_name(), schema->cf_name());
std::uint64_t total_size = 0;
if (cached_size) {
total_size = *cached_size;
} else {
// there's no point in trying to estimate value of table that is being deleted, as other nodes more often than not might
// move forward with deletion faster than we calculate the size
if (!deleting) {
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// which is also fine, as the specification doesn't give precision guarantees of any kind.
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
}
}
rjson::add(table_description, "TableSizeBytes", total_size);
}
future<rjson::value> executor::fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
{
rjson::value table_description = rjson::empty_object();
auto tags_ptr = db::get_tags_of_table(schema);
rjson::add(table_description, "TableName", rjson::from_string(schema->cf_name()));
co_await fill_table_size(table_description, schema, tbl_status == table_status::deleting);
auto creation_timestamp = get_table_creation_time(*schema);
@@ -895,7 +801,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", wcu);
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
data_dictionary::table t = proxy.data_dictionary().find_column_family(schema);
if (tbl_status != table_status::deleting) {
rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp));
@@ -916,7 +824,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
sstring index_name = cf_name.substr(delim_it + 1);
rjson::add(view_entry, "IndexName", rjson::from_string(index_name));
rjson::add(view_entry, "IndexArn", generate_arn_for_index(*schema, index_name));
// Add index's KeySchema and collect types for AttributeDefinitions:
// Add indexes's KeySchema and collect types for AttributeDefinitions:
executor::describe_key_schema(view_entry, *vptr, key_attribute_types, db::get_tags_of_table(vptr));
// Add projection type
rjson::value projection = rjson::empty_object();
@@ -932,7 +840,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
// (for a built view) or CREATING+Backfilling (if view building
// is in progress).
if (!is_lsi) {
if (co_await is_view_built(vptr, _proxy, client_state, trace_state, permit)) {
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
rjson::add(view_entry, "IndexStatus", "ACTIVE");
} else {
rjson::add(view_entry, "IndexStatus", "CREATING");
@@ -960,8 +868,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
}
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
}
executor::supplement_table_stream_info(table_description, *schema, _proxy);
executor::supplement_table_stream_info(table_description, *schema, proxy);
// FIXME: still missing some response fields (issue #5026)
co_return table_description;
}
@@ -981,7 +890,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
rjson::value table_description = co_await fill_table_description(schema, table_status::active, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
rjson::value response = rjson::empty_object();
rjson::add(response, "Table", std::move(table_description));
elogger.trace("returning {}", response);
@@ -1084,7 +993,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
auto& p = _proxy.container();
schema_ptr schema = get_table(_proxy, request);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
size_t retries = mm.get_concurrent_ddl_retries();
@@ -1171,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
}
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == name) {
auto type = attribute_info["AttributeType"].GetString();
data_type dt = parse_key_type(type);
if (computed_column) {
// Computed column for GSI (doesn't choose a real column as-is
@@ -1207,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First element of KeySchema must be an object");
}
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
throw api_error::validation("First key in KeySchema must be a HASH key");
}
v = rjson::find((*key_schema)[0], "AttributeName");
@@ -1215,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First key in KeySchema must have string AttributeName");
}
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
std::string hash_key = rjson::to_string(*v);
std::string hash_key = v->GetString();
std::string range_key;
if (key_schema->Size() == 2) {
if (!(*key_schema)[1].IsObject()) {
throw api_error::validation("Second element of KeySchema must be an object");
}
v = rjson::find((*key_schema)[1], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
throw api_error::validation("Second key in KeySchema must be a RANGE key");
}
v = rjson::find((*key_schema)[1], "AttributeName");
@@ -1648,7 +1557,8 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
}
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
@@ -1835,7 +1745,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
if (executor::add_stream_options(*stream_specification, builder, _proxy)) {
if (executor::add_stream_options(*stream_specification, builder, sp)) {
validate_cdc_log_name_length(builder.cf_name());
}
}
@@ -1854,7 +1764,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
set_table_creation_time(tags_map, db_clock::now());
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, _stats);
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
schema_ptr schema = builder.build();
for (auto& view_builder : view_builders) {
@@ -1870,49 +1780,33 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
}
size_t retries = _mm.get_concurrent_ddl_retries();
size_t retries = mm.get_concurrent_ddl_retries();
for (;;) {
auto group0_guard = co_await _mm.start_group0_operation();
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
utils::chunked_vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
if (stream_specification && stream_specification->IsObject()) {
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = sp.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
if (rs->uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
}
}
}
// Creating an index in tablets mode requires the keyspace to be RF-rack-valid.
// GSI and LSI indexes are based on materialized views which require RF-rack-validity to avoid consistency issues.
if (!view_builders.empty() || _proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
try {
locator::assert_rf_rack_valid_keyspace(keyspace_name, _proxy.local_db().get_token_metadata_ptr(), *rs);
} catch (const std::invalid_argument& ex) {
if (!view_builders.empty()) {
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes and LocalSecondaryIndexes on a table "
"using tablets require the number of racks in the cluster to be either 1 or 3"));
} else {
co_return api_error::validation(fmt::format("Cannot create table '{}' with tablets: the configuration "
"option 'rf_rack_valid_keyspaces' is enabled, which enforces that tables using tablets can only be created in clusters "
"that have either 1 or 3 racks", table_name));
}
}
}
try {
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
} catch (exceptions::already_exists_exception&) {
if (_proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
co_return api_error::resource_in_use(fmt::format("Table {} already exists", table_name));
}
}
if (_proxy.data_dictionary().try_find_table(schema->id())) {
if (sp.data_dictionary().try_find_table(schema->id())) {
// This should never happen, the ID is supposed to be unique
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
}
@@ -1921,9 +1815,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
for (schema_builder& view_builder : view_builders) {
schemas.push_back(view_builder.build());
}
co_await service::prepare_new_column_families_announcement(schema_mutations, _proxy, *ksm, schemas, ts);
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
if (ksm->uses_tablets()) {
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, _proxy);
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
}
// If a role is allowed to create a table, we must give it permissions to
@@ -1948,7 +1842,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
std::tie(schema_mutations, group0_guard) = co_await std::move(mc).extract();
try {
co_await _mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
break;
} catch (const service::group0_concurrent_modification& ex) {
elogger.info("Failed to execute CreateTable {} due to concurrent schema modifications. {}.",
@@ -1960,9 +1854,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
}
co_await _mm.wait_for_schema_agreement(_proxy.local_db(), db::timeout_clock::now() + 10s, nullptr);
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
rjson::value status = rjson::empty_object();
executor::supplement_table_info(request, *schema, _proxy);
executor::supplement_table_info(request, *schema, sp);
rjson::add(status, "TableDescription", std::move(request));
co_return rjson::print(std::move(status));
}
@@ -1971,11 +1865,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
// `invoke_on` hopped us to shard 0, but `this` points to `executor` is from 'old' shard, we need to hop it too.
co_return co_await e.local().create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), enforce_authorization, warn_authorization, std::move(tablets_mode));
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
});
}
@@ -1994,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string def_type = type_to_string(def.type);
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
auto type = attribute_info["AttributeType"].GetString();
if (type != def_type) {
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
}
@@ -2126,13 +2019,6 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_return api_error::validation(fmt::format(
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
}
try {
locator::assert_rf_rack_valid_keyspace(keyspace_name, p.local().local_db().get_token_metadata_ptr(),
p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy());
} catch (const std::invalid_argument& ex) {
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes on a table "
"using tablets require the number of racks in the cluster to be either 1 or 3"));
}
elogger.trace("Adding GSI {}", index_name);
// FIXME: read and handle "Projection" parameter. This will
@@ -2337,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
// The put_or_delete_item class builds the mutations needed by the PutItem and
// DeleteItem operations - either as stand-alone commands or part of a list
// of commands in BatchWriteItem.
// of commands in BatchWriteItems.
// put_or_delete_item splits each operation into two stages: Constructing the
// object parses and validates the user input (throwing exceptions if there
// are input errors). Later, build() generates the actual mutation, with a
// specified timestamp. This split is needed because of the peculiar needs of
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
@@ -2435,7 +2321,7 @@ std::unordered_map<bytes, std::string> si_key_attributes(data_dictionary::table
// case, this function simply won't be called for this attribute.)
//
// This function checks if the given attribute update is an update to some
// GSI's key, and if the value is unsuitable, an api_error::validation is
// GSI's key, and if the value is unsuitable, a api_error::validation is
// thrown. The checking here is similar to the checking done in
// get_key_from_typed_value() for the base table's key columns.
//
@@ -2476,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
@@ -2853,7 +2739,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
auto read_command = needs_read_before_write ?
previous_item_read_command(proxy, schema(), _ck, selection) :
nullptr;
return proxy.cas(schema(), std::move(*cas_shard), *this, read_command, to_partition_ranges(*schema(), _pk),
return proxy.cas(schema(), std::move(*cas_shard), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout, true, std::move(cdc_opts)).then([this, read_command, &wcu_total] (bool is_applied) mutable {
if (!is_applied) {
@@ -2897,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
return;
}
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
if (!used.contains(rjson::to_string(it->name))) {
if (!used.contains(it->name.GetString())) {
throw api_error::validation(
format("{} has spurious '{}', not used in {}",
field_name, rjson::to_string_view(it->name), operation));
field_name, it->name.GetString(), operation));
}
}
}
@@ -3114,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
}
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
try {
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
} catch(data_dictionary::no_such_column_family&) {
@@ -3140,20 +3026,17 @@ struct primary_key_equal {
};
// This is a cas_request subclass for applying given put_or_delete_items to
// one partition using LWT as part as BatchWriteItem. This is a write-only
// one partition using LWT as part as BatchWriteItems. This is a write-only
// operation, not needing the previous value of the item (the mutation to be
// done is known prior to starting the operation). Nevertheless, we want to
// do this mutation via LWT to ensure that it is serialized with other LWT
// mutations to the same partition.
//
// The std::vector<put_or_delete_item> must remain alive until the
// storage_proxy::cas() future is resolved.
class put_or_delete_item_cas_request : public service::cas_request {
schema_ptr schema;
const std::vector<put_or_delete_item>& _mutation_builders;
std::vector<put_or_delete_item> _mutation_builders;
public:
put_or_delete_item_cas_request(schema_ptr s, const std::vector<put_or_delete_item>& b) :
schema(std::move(s)), _mutation_builders(b) { }
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
schema(std::move(s)), _mutation_builders(std::move(b)) { }
virtual ~put_or_delete_item_cas_request() = default;
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override {
std::optional<mutation> ret;
@@ -3169,48 +3052,20 @@ public:
}
};
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit)
{
if (!cas_shard.this_shard()) {
_stats.shard_bounce_for_lwt++;
return container().invoke_on(cas_shard.shard(), _ssg,
[cs = client_state.move_to_other_shard(),
&mb = mutation_builders,
&dk,
ks = schema->ks_name(),
cf = schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(executor& self) mutable {
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt), &self]
(service::client_state& client_state) mutable {
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
});
}
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
auto timeout = executor::default_timeout();
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
timeout, timeout, true, std::move(cdc_opts)).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
// does not need to support conditional updates.
}
@@ -3232,11 +3087,13 @@ struct schema_decorated_key_equal {
// FIXME: if we failed writing some of the mutations, need to return a list
// of these failed mutations rather than fail the whole write (issue #5650).
future<> executor::do_batch_write(
static future<> do_batch_write(service::storage_proxy& proxy,
smp_service_group ssg,
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit) {
service_permit permit,
stats& stats) {
if (mutation_builders.empty()) {
return make_ready_future<>();
}
@@ -3258,7 +3115,7 @@ future<> executor::do_batch_write(
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
return proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
@@ -3267,48 +3124,55 @@ future<> executor::do_batch_write(
false,
cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
});
} else {
// Do the write via LWT:
// Multiple mutations may be destined for the same partition, adding
// or deleting different items of one partition. Join them together
// because we can do them in one cas() call.
using map_type = std::unordered_map<schema_decorated_key,
std::vector<put_or_delete_item>,
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto&& b : std::move(mutation_builders)) {
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
.schema = b.first,
.dk = dht::decorate_key(*b.first, b.second.pk())
});
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders.try_emplace(schema_decorated_key{b.first, dk});
it->second.push_back(std::move(b.second));
}
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
_stats.write_using_lwt++;
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
auto s = e.first.schema;
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, std::move(e.second), client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
mb = e.second,
dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
static const auto* injection_name = "alternator_executor_batch_write_wait";
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
const auto ks = handler.get("keyspace");
const auto cf = handler.get("table");
const auto shard = std::atoll(handler.get("shard")->data());
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
elogger.info("{}: hit", injection_name);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
elogger.info("{}: continue", injection_name);
}
}).then([&e, desired_shard = std::move(desired_shard),
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
{
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
});
}).finally([key_builders = std::move(key_builders)]{});
// The desired_shard on the original shard remains alive for the duration
// of cas_write on this shard and prevents any tablet operations.
// However, we need a local instance of cas_shard on this shard
// to pass it to sp::cas, so we just create a new one.
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, std::move(mb), client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
});
}
}
@@ -3455,7 +3319,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
_stats.api_operations.batch_write_item_batch_total += total_items;
_stats.api_operations.batch_write_item_histogram.add(total_items);
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
// FIXME: Issue #5650: If we failed writing some of the updates,
// need to return a list of these failed updates in UnprocessedItems
// rather than fail the whole write (issue #5650).
@@ -3500,7 +3364,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
}
rjson::value newv = rjson::empty_object();
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
std::string attr = rjson::to_string(it->name);
std::string attr = it->name.GetString();
auto x = members.find(attr);
if (x != members.end()) {
if (x->second) {
@@ -3548,7 +3412,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
return true;
}
// Add a path to an attribute_path_map. Throws a validation error if the path
// Add a path to a attribute_path_map. Throws a validation error if the path
// "overlaps" with one already in the filter (one is a sub-path of the other)
// or "conflicts" with it (both a member and index is requested).
template<typename T>
@@ -3720,7 +3584,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
attribute_path_map_add("AttributesToGet", ret, it->GetString());
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
@@ -4386,12 +4250,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
// Note that it.key() is the name of the column, *it is the operation
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
const column_definition* cdef = _schema->get_column_definition(column_name);
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
}
std::string action = rjson::to_string((it->value)["Action"]);
std::string action = (it->value)["Action"].GetString();
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -5588,7 +5452,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
sstring key = rjson::to_sstring(it->name);
std::string key = it->name.GetString();
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
@@ -5596,13 +5460,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (key == pk_cdef.name_as_text()) {
if (sstring(key) == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
@@ -6001,14 +5865,9 @@ future<executor::request_return_type> executor::list_tables(client_state& client
_stats.api_operations.list_tables++;
elogger.trace("Listing tables {}", request);
co_await utils::get_local_injector().inject("alternator_list_tables", [] (auto& handler) -> future<> {
handler.set("waiting", true);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
});
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
int limit = limit_json ? limit_json->GetInt() : 100;
if (limit < 1 || limit > 100) {
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");
@@ -6197,10 +6056,9 @@ future<> executor::start() {
}
future<> executor::stop() {
co_await _describe_table_info_manager->stop();
// disconnect from the value source, but keep the value unchanged.
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
co_await _parsed_expression_cache->stop();
return _parsed_expression_cache->stop();
}
} // namespace alternator

View File

@@ -17,13 +17,11 @@
#include "service/client_state.hh"
#include "service_permit.hh"
#include "db/timeout_clock.hh"
#include "db/config.hh"
#include "alternator/error.hh"
#include "stats.hh"
#include "utils/rjson.hh"
#include "utils/updateable_value.hh"
#include "utils/simple_value_with_expiry.hh"
#include "tracing/trace_state.hh"
@@ -42,8 +40,6 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
class storage_service;
}
namespace cdc {
@@ -60,9 +56,7 @@ class schema_builder;
namespace alternator {
enum class table_status;
class rmw_operation;
class put_or_delete_item;
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
@@ -140,7 +134,6 @@ class expression_cache;
class executor : public peering_sharded_service<executor> {
gms::gossiper& _gossiper;
service::storage_service& _ss;
service::storage_proxy& _proxy;
service::migration_manager& _mm;
db::system_distributed_keyspace& _sdks;
@@ -153,11 +146,6 @@ class executor : public peering_sharded_service<executor> {
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
struct describe_table_info_manager;
std::unique_ptr<describe_table_info_manager> _describe_table_info_manager;
future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl);
future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting);
public:
using client_state = service::client_state;
// request_return_type is the return type of the executor methods, which
@@ -183,7 +171,6 @@ public:
executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
@@ -231,18 +218,6 @@ private:
friend class rmw_operation;
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit);
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit);
public:
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);

View File

@@ -50,7 +50,7 @@ public:
_operators.emplace_back(i);
check_depth_limit();
}
void add_dot(std::string name) {
void add_dot(std::string(name)) {
_operators.emplace_back(std::move(name));
check_depth_limit();
}
@@ -85,7 +85,7 @@ struct constant {
}
};
// "value" is a value used in the right hand side of an assignment
// "value" is is a value used in the right hand side of an assignment
// expression, "SET a = ...". It can be a constant (a reference to a value
// included in the request, e.g., ":val"), a path to an attribute from the
// existing item (e.g., "a.b[3].c"), or a function of other such values.
@@ -205,7 +205,7 @@ public:
// The supported primitive conditions are:
// 1. Binary operators - v1 OP v2, where OP is =, <>, <, <=, >, or >= and
// v1 and v2 are values - from the item (an attribute path), the query
// (a ":val" reference), or a function of the above (only the size()
// (a ":val" reference), or a function of the the above (only the size()
// function is supported).
// 2. Ternary operator - v1 BETWEEN v2 and v3 (means v1 >= v2 AND v1 <= v3).
// 3. N-ary operator - v1 IN ( v2, v3, ... )

View File

@@ -1,301 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "alternator/http_compression.hh"
#include "alternator/server.hh"
#include <seastar/coroutine/maybe_yield.hh>
#include <zlib.h>
static logging::logger slogger("alternator-http-compression");
namespace alternator {
static constexpr size_t compressed_buffer_size = 1024;
class zlib_compressor {
z_stream _zs;
temporary_buffer<char> _output_buf;
noncopyable_function<future<>(temporary_buffer<char>&&)> _write_func;
public:
zlib_compressor(bool gzip, int compression_level, noncopyable_function<future<>(temporary_buffer<char>&&)> write_func)
: _write_func(std::move(write_func)) {
memset(&_zs, 0, sizeof(_zs));
if (deflateInit2(&_zs, std::clamp(compression_level, Z_NO_COMPRESSION, Z_BEST_COMPRESSION), Z_DEFLATED,
(gzip ? 16 : 0) + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
}
~zlib_compressor() {
deflateEnd(&_zs);
}
future<> close() {
return compress(nullptr, 0, true);
}
future<> compress(const char* buf, size_t len, bool is_last_chunk = false) {
_zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(buf));
_zs.avail_in = (uInt) len;
int mode = is_last_chunk ? Z_FINISH : Z_NO_FLUSH;
while(_zs.avail_in > 0 || is_last_chunk) {
co_await coroutine::maybe_yield();
if (_output_buf.empty()) {
if (is_last_chunk) {
uint32_t max_buffer_size = 0;
deflatePending(&_zs, &max_buffer_size, nullptr);
max_buffer_size += deflateBound(&_zs, _zs.avail_in) + 1;
_output_buf = temporary_buffer<char>(std::min(compressed_buffer_size, (size_t) max_buffer_size));
} else {
_output_buf = temporary_buffer<char>(compressed_buffer_size);
}
_zs.next_out = reinterpret_cast<unsigned char*>(_output_buf.get_write());
_zs.avail_out = compressed_buffer_size;
}
int e = deflate(&_zs, mode);
if (e < Z_OK) {
throw api_error::internal("Error during compression of response body");
}
if (e == Z_STREAM_END || _zs.avail_out < compressed_buffer_size / 4) {
_output_buf.trim(compressed_buffer_size - _zs.avail_out);
co_await _write_func(std::move(_output_buf));
if (e == Z_STREAM_END) {
break;
}
}
}
}
};
// Helper string_view functions for parsing Accept-Encoding header
struct case_insensitive_cmp_sv {
bool operator()(std::string_view s1, std::string_view s2) const {
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(),
[](char a, char b) { return ::tolower(a) == ::tolower(b); });
}
};
static inline std::string_view trim_left(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.front())))
sv.remove_prefix(1);
return sv;
}
static inline std::string_view trim_right(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.back())))
sv.remove_suffix(1);
return sv;
}
static inline std::string_view trim(std::string_view sv) {
return trim_left(trim_right(sv));
}
inline std::vector<std::string_view> split(std::string_view text, char separator) {
std::vector<std::string_view> tokens;
if (text == "") {
return tokens;
}
while (true) {
auto pos = text.find_first_of(separator);
if (pos != std::string_view::npos) {
tokens.emplace_back(text.data(), pos);
text.remove_prefix(pos + 1);
} else {
tokens.emplace_back(text);
break;
}
}
return tokens;
}
constexpr response_compressor::compression_type response_compressor::get_compression_type(std::string_view encoding) {
for (size_t i = 0; i < static_cast<size_t>(compression_type::count); ++i) {
if (case_insensitive_cmp_sv{}(encoding, compression_names[i])) {
return static_cast<compression_type>(i);
}
}
return compression_type::unknown;
}
response_compressor::compression_type response_compressor::find_compression(std::string_view accept_encoding, size_t response_size) {
std::optional<float> ct_q[static_cast<size_t>(compression_type::count)];
ct_q[static_cast<size_t>(compression_type::none)] = std::numeric_limits<float>::min(); // enabled, but lowest priority
compression_type selected_ct = compression_type::none;
std::vector<std::string_view> entries = split(accept_encoding, ',');
for (auto& e : entries) {
std::vector<std::string_view> params = split(e, ';');
if (params.size() == 0) {
continue;
}
compression_type ct = get_compression_type(trim(params[0]));
if (ct == compression_type::unknown) {
continue; // ignore unknown encoding types
}
if (ct_q[static_cast<size_t>(ct)].has_value() && ct_q[static_cast<size_t>(ct)] != 0.0f) {
continue; // already processed this encoding
}
if (response_size < _threshold[static_cast<size_t>(ct)]) {
continue; // below threshold treat as unknown
}
for (size_t i = 1; i < params.size(); ++i) { // find "q=" parameter
auto pos = params[i].find("q=");
if (pos == std::string_view::npos) {
continue;
}
std::string_view param = params[i].substr(pos + 2);
param = trim(param);
// parse quality value
float q_value = 1.0f;
auto [ptr, ec] = std::from_chars(param.data(), param.data() + param.size(), q_value);
if (ec != std::errc() || ptr != param.data() + param.size()) {
continue;
}
if (q_value < 0.0) {
q_value = 0.0;
} else if (q_value > 1.0) {
q_value = 1.0;
}
ct_q[static_cast<size_t>(ct)] = q_value;
break; // we parsed quality value
}
if (!ct_q[static_cast<size_t>(ct)].has_value()) {
ct_q[static_cast<size_t>(ct)] = 1.0f; // default quality value
}
// keep the highest encoding (in the order, unless 'any')
if (selected_ct == compression_type::any) {
if (ct_q[static_cast<size_t>(ct)] >= ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
} else {
if (ct_q[static_cast<size_t>(ct)] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
}
}
if (selected_ct == compression_type::any) {
// select any not mentioned or highest quality
selected_ct = compression_type::none;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (!ct_q[i].has_value()) {
return static_cast<compression_type>(i);
}
if (ct_q[i] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = static_cast<compression_type>(i);
}
}
}
return selected_ct;
}
static future<chunked_content> compress(response_compressor::compression_type ct, const db::config& cfg, std::string str) {
chunked_content compressed;
auto write = [&compressed](temporary_buffer<char>&& buf) -> future<> {
compressed.push_back(std::move(buf));
return make_ready_future<>();
};
zlib_compressor compressor(ct != response_compressor::compression_type::deflate,
cfg.alternator_response_gzip_compression_level(), std::move(write));
co_await compressor.compress(str.data(), str.size(), true);
co_return compressed;
}
static sstring flatten(chunked_content&& cc) {
size_t total_size = 0;
for (const auto& chunk : cc) {
total_size += chunk.size();
}
sstring result = sstring{ sstring::initialized_later{}, total_size };
size_t offset = 0;
for (const auto& chunk : cc) {
std::copy(chunk.begin(), chunk.end(), result.begin() + offset);
offset += chunk.size();
}
return result;
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, std::string&& response_body) {
response_compressor::compression_type ct = find_compression(accept_encoding, response_body.size());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->set_content_type(content_type);
return compress(ct, cfg, std::move(response_body)).then([rep = std::move(rep)] (chunked_content compressed) mutable {
rep->_content = flatten(std::move(compressed));
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
});
} else {
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(response_body);
rep->set_content_type(content_type);
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
template<typename Compressor>
class compressed_data_sink_impl : public data_sink_impl {
output_stream<char> _out;
Compressor _compressor;
public:
template<typename... Args>
compressed_data_sink_impl(output_stream<char>&& out, Args&&... args)
: _out(std::move(out)), _compressor(std::forward<Args>(args)..., [this](temporary_buffer<char>&& buf) {
return _out.write(std::move(buf));
}) { }
future<> put(std::span<temporary_buffer<char>> data) override {
return data_sink_impl::fallback_put(data, [this] (temporary_buffer<char>&& buf) {
return do_put(std::move(buf));
});
}
private:
future<> do_put(temporary_buffer<char> buf) {
co_return co_await _compressor.compress(buf.get(), buf.size());
}
future<> close() override {
return _compressor.close().then([this] {
return _out.close();
});
}
};
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
output_stream_options opts;
opts.trim_to_size = true;
std::unique_ptr<data_sink_impl> data_sink_impl;
switch (ct) {
case response_compressor::compression_type::gzip:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), true, level);
break;
case response_compressor::compression_type::deflate:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), false, level);
break;
case response_compressor::compression_type::none:
case response_compressor::compression_type::any:
case response_compressor::compression_type::unknown:
on_internal_error(slogger,"Compression not selected");
default:
on_internal_error(slogger, "Unsupported compression type for data sink");
}
return bw(output_stream<char>(data_sink(std::move(data_sink_impl)), compressed_buffer_size, opts));
};
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->write_body(content_type, compress(ct, cfg, std::move(body_writer)));
} else {
rep->write_body(content_type, std::move(body_writer));
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
} // namespace alternator

View File

@@ -1,91 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "alternator/executor.hh"
#include <seastar/http/httpd.hh>
#include "db/config.hh"
namespace alternator {
class response_compressor {
public:
enum class compression_type {
gzip,
deflate,
compressions_count,
any = compressions_count,
none,
count,
unknown = count
};
static constexpr std::string_view compression_names[] = {
"gzip",
"deflate",
"*",
"identity"
};
static sstring get_encoding_name(compression_type ct) {
return sstring(compression_names[static_cast<size_t>(ct)]);
}
static constexpr compression_type get_compression_type(std::string_view encoding);
sstring get_accepted_encoding(const http::request& req) {
if (get_threshold() == 0) {
return "";
}
return req.get_header("Accept-Encoding");
}
compression_type find_compression(std::string_view accept_encoding, size_t response_size);
response_compressor(const db::config& cfg)
: cfg(cfg)
,_gzip_level_observer(
cfg.alternator_response_gzip_compression_level.observe([this](int v) {
update_threshold();
}))
,_gzip_threshold_observer(
cfg.alternator_response_compression_threshold_in_bytes.observe([this](uint32_t v) {
update_threshold();
}))
{
update_threshold();
}
response_compressor(const response_compressor& rhs) : response_compressor(rhs.cfg) {}
private:
const db::config& cfg;
utils::observable<int>::observer _gzip_level_observer;
utils::observable<uint32_t>::observer _gzip_threshold_observer;
uint32_t _threshold[static_cast<size_t>(compression_type::count)];
size_t get_threshold() { return _threshold[static_cast<size_t>(compression_type::any)]; }
void update_threshold() {
_threshold[static_cast<size_t>(compression_type::none)] = std::numeric_limits<uint32_t>::max();
_threshold[static_cast<size_t>(compression_type::any)] = std::numeric_limits<uint32_t>::max();
uint32_t gzip = cfg.alternator_response_gzip_compression_level() <= 0 ? std::numeric_limits<uint32_t>::max()
: cfg.alternator_response_compression_threshold_in_bytes();
_threshold[static_cast<size_t>(compression_type::gzip)] = gzip;
_threshold[static_cast<size_t>(compression_type::deflate)] = gzip;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (_threshold[i] < _threshold[static_cast<size_t>(compression_type::any)]) {
_threshold[static_cast<size_t>(compression_type::any)] = _threshold[i];
}
}
}
public:
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, std::string&& response_body);
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
};
}

View File

@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
return {"", nullptr};
}
auto it = v.MemberBegin();
const std::string it_key = rjson::to_string(it->name);
const std::string it_key = it->name.GetString();
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
return {std::move(it_key), nullptr};
}

View File

@@ -55,7 +55,7 @@ partition_key pk_from_json(const rjson::value& item, schema_ptr schema);
clustering_key ck_from_json(const rjson::value& item, schema_ptr schema);
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema);
// If v encodes a number (i.e., it is a {"N": [...]}), returns an object representing it. Otherwise,
// If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise,
// raises ValidationException with diagnostic.
big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic);

View File

@@ -34,7 +34,6 @@
#include "client_data.hh"
#include "utils/updateable_value.hh"
#include <zlib.h>
#include "alternator/http_compression.hh"
static logging::logger slogger("alternator-server");
@@ -112,12 +111,9 @@ class api_handler : public handler_base {
// type applies to all replies, both success and error.
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
public:
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle,
const db::config& config) : _response_compressor(config), _f_handle(
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
sstring accept_encoding = _response_compressor.get_accepted_encoding(*req);
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped(
[this, rep = std::move(rep), accept_encoding=std::move(accept_encoding)](future<executor::request_return_type> resf) mutable {
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
if (resf.failed()) {
// Exceptions of type api_error are wrapped as JSON and
// returned to the client as expected. Other types of
@@ -137,20 +133,22 @@ public:
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
auto res = resf.get();
return std::visit(overloaded_functor {
std::visit(overloaded_functor {
[&] (std::string&& str) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(str));
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(str);
rep->set_content_type(REPLY_CONTENT_TYPE);
},
[&] (executor::body_writer&& body_writer) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(body_writer));
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
},
[&] (const api_error& err) {
generate_error_reply(*rep, err);
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
}, std::move(res));
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}) { }
@@ -179,7 +177,6 @@ protected:
slogger.trace("api_handler error case: {}", rep._content);
}
response_compressor _response_compressor;
future_handler_function _f_handle;
};
@@ -374,40 +371,13 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
for (const auto& header : signed_headers) {
signed_headers_map.emplace(header, std::string_view());
}
std::vector<std::string> modified_values;
for (auto& header : req._headers) {
std::string header_str;
header_str.resize(header.first.size());
std::transform(header.first.begin(), header.first.end(), header_str.begin(), ::tolower);
auto it = signed_headers_map.find(header_str);
if (it != signed_headers_map.end()) {
// replace multiple spaces in the header value header.second with
// a single space, as required by AWS SigV4 header canonization.
// If we modify the value, we need to save it in modified_values
// to keep it alive.
std::string value;
value.reserve(header.second.size());
bool prev_space = false;
bool modified = false;
for (char ch : header.second) {
if (ch == ' ') {
if (!prev_space) {
value += ch;
prev_space = true;
} else {
modified = true; // skip a space
}
} else {
value += ch;
prev_space = false;
}
}
if (modified) {
modified_values.emplace_back(std::move(value));
it->second = std::string_view(modified_values.back());
} else {
it->second = std::string_view(header.second);
}
it->second = std::string_view(header.second);
}
}
@@ -420,7 +390,6 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
datestamp = std::move(datestamp),
signed_headers_str = std::move(signed_headers_str),
signed_headers_map = std::move(signed_headers_map),
modified_values = std::move(modified_values),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
@@ -591,11 +560,11 @@ read_entire_stream(input_stream<char>& inp, size_t length_limit) {
class safe_gzip_zstream {
z_stream _zs;
public:
// If gzip is true, decode a gzip header (for "Content-Encoding: gzip").
// Otherwise, a zlib header (for "Content-Encoding: deflate").
safe_gzip_zstream(bool gzip = true) {
safe_gzip_zstream() {
memset(&_zs, 0, sizeof(_zs));
if (inflateInit2(&_zs, gzip ? 16 + MAX_WBITS : MAX_WBITS) != Z_OK) {
// The strange 16 + WMAX_BITS tells zlib to expect and decode
// a gzip header, not a zlib header.
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
@@ -614,21 +583,19 @@ public:
}
};
// ungzip() takes a chunked_content of a compressed request body, and returns
// the uncompressed content as a chunked_content. If gzip is true, we expect
// gzip header (for "Content-Encoding: gzip"), if gzip is false, we expect a
// zlib header (for "Content-Encoding: deflate").
// ungzip() takes a chunked_content with a gzip-compressed request body,
// uncompresses it, and returns the uncompressed content as a chunked_content.
// If the uncompressed content exceeds length_limit, an error is thrown.
static future<chunked_content>
ungzip(chunked_content&& compressed_body, size_t length_limit, bool gzip = true) {
ungzip(chunked_content&& compressed_body, size_t length_limit) {
chunked_content ret;
// output_buf can be any size - when uncompressing input_buf, it doesn't
// need to fit in a single output_buf, we'll use multiple output_buf for
// a single input_buf if needed.
constexpr size_t OUTPUT_BUF_SIZE = 4096;
temporary_buffer<char> output_buf;
safe_gzip_zstream strm(gzip);
bool complete_stream = false; // empty input is not a valid gzip/deflate
safe_gzip_zstream strm;
bool complete_stream = false; // empty input is not a valid gzip
size_t total_out_bytes = 0;
for (const temporary_buffer<char>& input_buf : compressed_body) {
if (input_buf.empty()) {
@@ -731,8 +698,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
sstring content_encoding = req->get_header("Content-Encoding");
if (content_encoding == "gzip") {
content = co_await ungzip(std::move(content), request_content_length_limit);
} else if (content_encoding == "deflate") {
content = co_await ungzip(std::move(content), request_content_length_limit, false);
} else if (!content_encoding.empty()) {
// DynamoDB returns a 500 error for unsupported Content-Encoding.
// I'm not sure if this is the best error code, but let's do it too.
@@ -743,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto system_clients_entry = _ongoing_requests.emplace(
req->get_client_address(), std::move(user_agent_header),
req->get_client_address(), req->get_header("User-Agent"),
username, current_scheduling_group(),
req->get_protocol_name() == "https");
@@ -793,7 +754,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
void server::set_routes(routes& r) {
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
return handle_api_request(std::move(req));
}, _proxy.data_dictionary().get_config());
});
r.put(operation_type::POST, "/", req_handler);
r.put(operation_type::GET, "/", new health_handler(_pending_requests));
@@ -904,9 +865,7 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
} {
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
std::optional<tls::credentials_builder> creds,
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
@@ -914,28 +873,20 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
_warn_authorization = std::move(warn_authorization);
_max_concurrent_requests = std::move(max_concurrent_requests);
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
if (!port && !https_port && !port_proxy_protocol && !https_port_proxy_protocol) {
if (!port && !https_port) {
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
" must be specified in order to init an alternator HTTP server instance"));
}
return seastar::async([this, addr, port, https_port, port_proxy_protocol, https_port_proxy_protocol, creds] {
return seastar::async([this, addr, port, https_port, creds] {
_executor.start().get();
if (port || port_proxy_protocol) {
if (port) {
set_routes(_http_server._routes);
_http_server.set_content_streaming(true);
if (port) {
_http_server.listen(socket_address{addr, *port}).get();
}
if (port_proxy_protocol) {
listen_options lo;
lo.reuse_address = true;
lo.proxy_protocol = true;
_http_server.listen(socket_address{addr, *port_proxy_protocol}, lo).get();
}
_http_server.listen(socket_address{addr, *port}).get();
_enabled_servers.push_back(std::ref(_http_server));
}
if (https_port || https_port_proxy_protocol) {
if (https_port) {
set_routes(_https_server._routes);
_https_server.set_content_streaming(true);
@@ -955,15 +906,7 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
} else {
_credentials = creds->build_server_credentials();
}
if (https_port) {
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
}
if (https_port_proxy_protocol) {
listen_options lo;
lo.reuse_address = true;
lo.proxy_protocol = true;
_https_server.listen(socket_address{addr, *https_port_proxy_protocol}, lo, _credentials).get();
}
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
_enabled_servers.push_back(std::ref(_https_server));
}
});
@@ -1036,15 +979,16 @@ client_data server::ongoing_request::make_client_data() const {
// and keep "driver_version" unset.
cd.driver_name = _user_agent;
// Leave "protocol_version" unset, it has no meaning in Alternator.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
// As reported in issue #9216, we never set these fields in CQL
// either (see cql_server::connection::make_client_data()).
return cd;
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
future<utils::chunked_vector<client_data>> server::get_client_data() {
utils::chunked_vector<client_data> ret;
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
ret.emplace_back(r.make_client_data());
});
co_return ret;
}

View File

@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
// though it isn't really relevant for Alternator which defines its own
// timeouts separately. We can create this object only once.
updateable_timeout_config _timeout_config;
client_options_cache_type _connection_options_keys_and_values;
alternator_callbacks_map _callbacks;
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
// is called when reading the "system.clients" virtual table.
struct ongoing_request {
socket_address _client_address;
client_options_cache_entry_type _user_agent;
sstring _user_agent;
sstring _username;
scheduling_group _scheduling_group;
bool _is_https;
@@ -100,9 +99,7 @@ class server : public peering_sharded_service<server> {
public:
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
std::optional<tls::credentials_builder> creds,
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
@@ -110,7 +107,7 @@ public:
// table "system.clients" is read. It is expected to generate a list of
// clients connected to this server (on this shard). This function is
// called by alternator::controller::get_client_data().
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
future<utils::chunked_vector<client_data>> get_client_data();
private:
void set_routes(seastar::httpd::routes& r);
// If verification succeeds, returns the authenticated user's username

View File

@@ -491,7 +491,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
if (!opts.enabled()) {
rjson::add(ret, "StreamDescription", std::move(stream_desc));
co_return rjson::print(std::move(ret));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
}
// TODO: label
@@ -502,121 +502,123 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
std::map<db_clock::time_point, cdc::streams_version> topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
auto e = topologies.end();
auto prev = e;
auto shards = rjson::empty_array();
return _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }).then([db, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc)] (std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
std::optional<shard_id> last;
auto e = topologies.end();
auto prev = e;
auto shards = rjson::empty_array();
auto i = topologies.begin();
// if we're a paged query, skip to the generation where we left of.
if (shard_start) {
i = topologies.find(shard_start->time);
}
std::optional<shard_id> last;
// for parent-child stuff we need id:s to be sorted by token
// (see explanation above) since we want to find closest
// token boundary when determining parent.
// #7346 - we processed and searched children/parents in
// stored order, which is not necessarily token order,
// so the finding of "closest" token boundary (using upper bound)
// could give somewhat weird results.
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return id1.token() < id2.token();
};
auto i = topologies.begin();
// if we're a paged query, skip to the generation where we left of.
if (shard_start) {
i = topologies.find(shard_start->time);
}
// #7409 - shards must be returned in lexicographical order,
// normal bytes compare is string_traits<int8_t>::compare.
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
// compare instead we inadvertently will sort in string lexical.
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
};
// need a prev even if we are skipping stuff
if (i != topologies.begin()) {
prev = std::prev(i);
}
for (; limit > 0 && i != e; prev = i, ++i) {
auto& [ts, sv] = *i;
last = std::nullopt;
auto lo = sv.streams.begin();
auto end = sv.streams.end();
// for parent-child stuff we need id:s to be sorted by token
// (see explanation above) since we want to find closest
// token boundary when determining parent.
// #7346 - we processed and searched children/parents in
// stored order, which is not necessarily token order,
// so the finding of "closest" token boundary (using upper bound)
// could give somewhat weird results.
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return id1.token() < id2.token();
};
// #7409 - shards must be returned in lexicographical order,
std::sort(lo, end, id_cmp);
// normal bytes compare is string_traits<int8_t>::compare.
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
// compare instead we inadvertently will sort in string lexical.
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
};
if (shard_start) {
// find next shard position
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
shard_start = std::nullopt;
// need a prev even if we are skipping stuff
if (i != topologies.begin()) {
prev = std::prev(i);
}
if (lo != end && prev != e) {
// We want older stuff sorted in token order so we can find matching
// token range when determining parent shard.
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
}
auto expired = [&]() -> std::optional<db_clock::time_point> {
auto j = std::next(i);
if (j == e) {
return std::nullopt;
}
// add this so we sort of match potential
// sequence numbers in get_records result.
return j->first + confidence_interval(db);
}();
while (lo != end) {
auto& id = *lo++;
auto shard = rjson::empty_object();
if (prev != e) {
auto& pids = prev->second.streams;
auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) {
return t < id.token();
});
if (pid != pids.begin()) {
pid = std::prev(pid);
}
if (pid != pids.end()) {
rjson::add(shard, "ParentShardId", shard_id(prev->first, *pid));
}
}
last.emplace(ts, id);
rjson::add(shard, "ShardId", *last);
auto range = rjson::empty_object();
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
if (expired) {
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
}
rjson::add(shard, "SequenceNumberRange", std::move(range));
rjson::push_back(shards, std::move(shard));
if (--limit == 0) {
break;
}
for (; limit > 0 && i != e; prev = i, ++i) {
auto& [ts, sv] = *i;
last = std::nullopt;
auto lo = sv.streams.begin();
auto end = sv.streams.end();
// #7409 - shards must be returned in lexicographical order,
std::sort(lo, end, id_cmp);
if (shard_start) {
// find next shard position
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
shard_start = std::nullopt;
}
if (lo != end && prev != e) {
// We want older stuff sorted in token order so we can find matching
// token range when determining parent shard.
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
}
auto expired = [&]() -> std::optional<db_clock::time_point> {
auto j = std::next(i);
if (j == e) {
return std::nullopt;
}
// add this so we sort of match potential
// sequence numbers in get_records result.
return j->first + confidence_interval(db);
}();
while (lo != end) {
auto& id = *lo++;
auto shard = rjson::empty_object();
if (prev != e) {
auto& pids = prev->second.streams;
auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) {
return t < id.token();
});
if (pid != pids.begin()) {
pid = std::prev(pid);
}
if (pid != pids.end()) {
rjson::add(shard, "ParentShardId", shard_id(prev->first, *pid));
}
}
last.emplace(ts, id);
rjson::add(shard, "ShardId", *last);
auto range = rjson::empty_object();
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
if (expired) {
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
}
rjson::add(shard, "SequenceNumberRange", std::move(range));
rjson::push_back(shards, std::move(shard));
if (--limit == 0) {
break;
}
last = std::nullopt;
}
}
}
if (last) {
rjson::add(stream_desc, "LastEvaluatedShardId", *last);
}
if (last) {
rjson::add(stream_desc, "LastEvaluatedShardId", *last);
}
rjson::add(stream_desc, "Shards", std::move(shards));
rjson::add(ret, "StreamDescription", std::move(stream_desc));
co_return rjson::print(std::move(ret));
rjson::add(stream_desc, "Shards", std::move(shards));
rjson::add(ret, "StreamDescription", std::move(stream_desc));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
});
}
enum class shard_iterator_type {
@@ -896,169 +898,172 @@ future<executor::request_return_type> executor::get_records(client_state& client
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul));
service::storage_proxy::coordinator_query_result qr = co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state));
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
co_return co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)).then(
[this, schema, partition_slice = std::move(partition_slice), selection = std::move(selection), start_time = std::move(start_time), limit, key_names = std::move(key_names), attr_names = std::move(attr_names), type, iter, high_ts] (service::storage_proxy::coordinator_query_result qr) mutable {
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build();
auto records = rjson::empty_array();
auto result_set = builder.build();
auto records = rjson::empty_array();
auto& metadata = result_set->get_metadata();
auto& metadata = result_set->get_metadata();
auto op_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == op_column_name;
})
);
auto ts_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == timestamp_column_name;
})
);
auto eor_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == eor_column_name;
})
);
auto op_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == op_column_name;
})
);
auto ts_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == timestamp_column_name;
})
);
auto eor_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == eor_column_name;
})
);
std::optional<utils::UUID> timestamp;
auto dynamodb = rjson::empty_object();
auto record = rjson::empty_object();
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
std::optional<utils::UUID> timestamp;
auto dynamodb = rjson::empty_object();
auto record = rjson::empty_object();
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
using op_utype = std::underlying_type_t<cdc::operation>;
using op_utype = std::underlying_type_t<cdc::operation>;
auto maybe_add_record = [&] {
if (!dynamodb.ObjectEmpty()) {
rjson::add(record, "dynamodb", std::move(dynamodb));
dynamodb = rjson::empty_object();
}
if (!record.ObjectEmpty()) {
rjson::add(record, "awsRegion", rjson::from_string(dc_name));
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
rjson::add(record, "eventSource", "scylladb:alternator");
rjson::add(record, "eventVersion", "1.1");
rjson::push_back(records, std::move(record));
record = rjson::empty_object();
--limit;
}
};
auto maybe_add_record = [&] {
if (!dynamodb.ObjectEmpty()) {
rjson::add(record, "dynamodb", std::move(dynamodb));
dynamodb = rjson::empty_object();
}
if (!record.ObjectEmpty()) {
rjson::add(record, "awsRegion", rjson::from_string(dc_name));
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
rjson::add(record, "eventSource", "scylladb:alternator");
rjson::add(record, "eventVersion", "1.1");
rjson::push_back(records, std::move(record));
record = rjson::empty_object();
--limit;
}
};
for (auto& row : result_set->rows()) {
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
for (auto& row : result_set->rows()) {
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
if (!dynamodb.HasMember("Keys")) {
auto keys = rjson::empty_object();
describe_single_item(*selection, row, key_names, keys);
rjson::add(dynamodb, "Keys", std::move(keys));
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
rjson::add(dynamodb, "StreamViewType", type);
// TODO: SizeBytes
}
if (!dynamodb.HasMember("Keys")) {
auto keys = rjson::empty_object();
describe_single_item(*selection, row, key_names, keys);
rjson::add(dynamodb, "Keys", std::move(keys));
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
rjson::add(dynamodb, "StreamViewType", type);
// TODO: SizeBytes
}
/**
* We merge rows with same timestamp into a single event.
* This is pretty much needed, because a CDC row typically
* encodes ~half the info of an alternator write.
*
* A big, big downside to how alternator records are written
* (i.e. CQL), is that the distinction between INSERT and UPDATE
* is somewhat lost/unmappable to actual eventName.
* A write (currently) always looks like an insert+modify
* regardless whether we wrote existing record or not.
*
* Maybe RMW ops could be done slightly differently so
* we can distinguish them here...
*
* For now, all writes will become MODIFY.
*
* Note: we do not check the current pre/post
* flags on CDC log, instead we use data to
* drive what is returned. This is (afaict)
* consistent with dynamo streams
*/
switch (op) {
case cdc::operation::pre_image:
case cdc::operation::post_image:
{
auto item = rjson::empty_object();
describe_single_item(*selection, row, attr_names, item, nullptr, true);
describe_single_item(*selection, row, key_names, item);
rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
break;
}
case cdc::operation::update:
rjson::add(record, "eventName", "MODIFY");
break;
case cdc::operation::insert:
rjson::add(record, "eventName", "INSERT");
break;
case cdc::operation::service_row_delete:
case cdc::operation::service_partition_delete:
{
auto user_identity = rjson::empty_object();
rjson::add(user_identity, "Type", "Service");
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
rjson::add(record, "userIdentity", std::move(user_identity));
rjson::add(record, "eventName", "REMOVE");
break;
}
default:
rjson::add(record, "eventName", "REMOVE");
break;
}
if (eor) {
maybe_add_record();
timestamp = ts;
if (limit == 0) {
/**
* We merge rows with same timestamp into a single event.
* This is pretty much needed, because a CDC row typically
* encodes ~half the info of an alternator write.
*
* A big, big downside to how alternator records are written
* (i.e. CQL), is that the distinction between INSERT and UPDATE
* is somewhat lost/unmappable to actual eventName.
* A write (currently) always looks like an insert+modify
* regardless whether we wrote existing record or not.
*
* Maybe RMW ops could be done slightly differently so
* we can distinguish them here...
*
* For now, all writes will become MODIFY.
*
* Note: we do not check the current pre/post
* flags on CDC log, instead we use data to
* drive what is returned. This is (afaict)
* consistent with dynamo streams
*/
switch (op) {
case cdc::operation::pre_image:
case cdc::operation::post_image:
{
auto item = rjson::empty_object();
describe_single_item(*selection, row, attr_names, item, nullptr, true);
describe_single_item(*selection, row, key_names, item);
rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
break;
}
case cdc::operation::update:
rjson::add(record, "eventName", "MODIFY");
break;
case cdc::operation::insert:
rjson::add(record, "eventName", "INSERT");
break;
case cdc::operation::service_row_delete:
case cdc::operation::service_partition_delete:
{
auto user_identity = rjson::empty_object();
rjson::add(user_identity, "Type", "Service");
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
rjson::add(record, "userIdentity", std::move(user_identity));
rjson::add(record, "eventName", "REMOVE");
break;
}
default:
rjson::add(record, "eventName", "REMOVE");
break;
}
if (eor) {
maybe_add_record();
timestamp = ts;
if (limit == 0) {
break;
}
}
}
}
auto ret = rjson::empty_object();
auto nrecords = records.Size();
rjson::add(ret, "Records", std::move(records));
auto ret = rjson::empty_object();
auto nrecords = records.Size();
rjson::add(ret, "Records", std::move(records));
if (nrecords != 0) {
// #9642. Set next iterators threshold to > last
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
// Note that here we unconditionally return NextShardIterator,
// without checking if maybe we reached the end-of-shard. If the
// shard did end, then the next read will have nrecords == 0 and
// will notice end end of shard and not return NextShardIterator.
rjson::add(ret, "NextShardIterator", next_iter);
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
co_return rjson::print(std::move(ret));
}
if (nrecords != 0) {
// #9642. Set next iterators threshold to > last
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
// Note that here we unconditionally return NextShardIterator,
// without checking if maybe we reached the end-of-shard. If the
// shard did end, then the next read will have nrecords == 0 and
// will notice end end of shard and not return NextShardIterator.
rjson::add(ret, "NextShardIterator", next_iter);
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
}
// ugh. figure out if we are and end-of-shard
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
// ugh. figure out if we are and end-of-shard
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
auto& shard = iter.shard;
return _sdks.cdc_current_generation_timestamp({ normal_token_owners }).then([this, iter, high_ts, start_time, ret = std::move(ret)](db_clock::time_point ts) mutable {
auto& shard = iter.shard;
if (shard.time < ts && ts < high_ts) {
// The DynamoDB documentation states that when a shard is
// closed, reading it until the end has NextShardIterator
// "set to null". Our test test_streams_closed_read
// confirms that by "null" they meant not set at all.
} else {
// We could have return the same iterator again, but we did
// a search from it until high_ts and found nothing, so we
// can also start the next search from high_ts.
// TODO: but why? It's simpler just to leave the iterator be.
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
rjson::add(ret, "NextShardIterator", iter);
}
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
if (is_big(ret)) {
co_return make_streamed(std::move(ret));
}
co_return rjson::print(std::move(ret));
if (shard.time < ts && ts < high_ts) {
// The DynamoDB documentation states that when a shard is
// closed, reading it until the end has NextShardIterator
// "set to null". Our test test_streams_closed_read
// confirms that by "null" they meant not set at all.
} else {
// We could have return the same iterator again, but we did
// a search from it until high_ts and found nothing, so we
// can also start the next search from high_ts.
// TODO: but why? It's simpler just to leave the iterator be.
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
rjson::add(ret, "NextShardIterator", iter);
}
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
if (is_big(ret)) {
return make_ready_future<executor::request_return_type>(make_streamed(std::move(ret)));
}
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
});
});
}
bool executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {

View File

@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
}
sstring attribute_name = rjson::to_sstring(*v);
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
@@ -141,7 +141,7 @@ future<executor::request_return_type> executor::describe_time_to_live(client_sta
// expiration_service is a sharded service responsible for cleaning up expired
// items in all tables with per-item expiration enabled. Currently, this means
// Alternator tables with TTL configured via an UpdateTimeToLive request.
// Alternator tables with TTL configured via a UpdateTimeToLive request.
//
// Here is a brief overview of how the expiration service works:
//
@@ -593,7 +593,7 @@ static future<> scan_table_ranges(
if (retries >= 10) {
// Don't get stuck forever asking the same page, maybe there's
// a bug or a real problem in several replicas. Give up on
// this scan and retry the scan from a random position later,
// this scan an retry the scan from a random position later,
// in the next scan period.
throw runtime_exception("scanner thread failed after too many timeouts for the same page");
}

View File

@@ -30,7 +30,7 @@ namespace alternator {
// expiration_service is a sharded service responsible for cleaning up expired
// items in all tables with per-item expiration enabled. Currently, this means
// Alternator tables with TTL configured via an UpdateTimeToLive request.
// Alternator tables with TTL configured via a UpdateTimeToLeave request.
class expiration_service final : public seastar::peering_sharded_service<expiration_service> {
public:
// Object holding per-shard statistics related to the expiration service.
@@ -52,7 +52,7 @@ private:
data_dictionary::database _db;
service::storage_proxy& _proxy;
gms::gossiper& _gossiper;
// _end is set by start(), and resolves when the background service
// _end is set by start(), and resolves when the the background service
// started by it ends. To ask the background service to end, _abort_source
// should be triggered. stop() below uses both _abort_source and _end.
std::optional<future<>> _end;

View File

@@ -31,7 +31,6 @@ set(swagger_files
api-doc/column_family.json
api-doc/commitlog.json
api-doc/compaction_manager.json
api-doc/client_routes.json
api-doc/config.json
api-doc/cql_server_test.json
api-doc/endpoint_snitch_info.json
@@ -69,7 +68,6 @@ target_sources(api
PRIVATE
api.cc
cache_service.cc
client_routes.cc
collectd.cc
column_family.cc
commitlog.cc

View File

@@ -1,23 +0,0 @@
, "client_routes_entry": {
"id": "client_routes_entry",
"summary": "An entry storing client routes",
"properties": {
"connection_id": {"type": "string"},
"host_id": {"type": "string", "format": "uuid"},
"address": {"type": "string"},
"port": {"type": "integer"},
"tls_port": {"type": "integer"},
"alternator_port": {"type": "integer"},
"alternator_https_port": {"type": "integer"}
},
"required": ["connection_id", "host_id", "address"]
}
, "client_routes_key": {
"id": "client_routes_key",
"summary": "A key of client_routes_entry",
"properties": {
"connection_id": {"type": "string"},
"host_id": {"type": "string", "format": "uuid"}
}
}

View File

@@ -1,74 +0,0 @@
, "/v2/client-routes":{
"get": {
"description":"List all client route entries",
"operationId":"get_client_routes",
"tags":["client_routes"],
"produces":[
"application/json"
],
"parameters":[],
"responses":{
"200":{
"schema":{
"type":"array",
"items":{ "$ref":"#/definitions/client_routes_entry" }
}
},
"default":{
"description":"unexpected error",
"schema":{"$ref":"#/definitions/ErrorModel"}
}
}
},
"post": {
"description":"Upsert one or more client route entries",
"operationId":"set_client_routes",
"tags":["client_routes"],
"parameters":[
{
"name":"body",
"in":"body",
"required":true,
"schema":{
"type":"array",
"items":{ "$ref":"#/definitions/client_routes_entry" }
}
}
],
"responses":{
"200":{ "description": "OK" },
"default":{
"description":"unexpected error",
"schema":{ "$ref":"#/definitions/ErrorModel" }
}
}
},
"delete": {
"description":"Delete one or more client route entries",
"operationId":"delete_client_routes",
"tags":["client_routes"],
"parameters":[
{
"name":"body",
"in":"body",
"required":true,
"schema":{
"type":"array",
"items":{ "$ref":"#/definitions/client_routes_key" }
}
}
],
"responses":{
"200":{
"description": "OK"
},
"default":{
"description":"unexpected error",
"schema":{
"$ref":"#/definitions/ErrorModel"
}
}
}
}
}

View File

@@ -112,21 +112,6 @@
}
]
},
{
"path":"/v2/error_injection/events",
"operations":[
{
"method":"GET",
"summary":"Subscribe to Server-Sent Events stream of error injection events",
"type":"void",
"nickname":"injection_events",
"produces":[
"text/event-stream"
],
"parameters":[]
}
]
},
{
"path":"/v2/error_injection/disconnect/{ip}",
"operations":[

View File

@@ -37,7 +37,6 @@
#include "raft.hh"
#include "gms/gossip_address_map.hh"
#include "service_levels.hh"
#include "client_routes.hh"
logging::logger apilog("api");
@@ -68,11 +67,9 @@ future<> set_server_init(http_context& ctx) {
rb02->set_api_doc(r);
rb02->register_api_file(r, "swagger20_header");
rb02->register_api_file(r, "metrics");
rb02->register_api_file(r, "client_routes");
rb->register_function(r, "system",
"The system related API");
rb02->add_definitions_file(r, "metrics");
rb02->add_definitions_file(r, "client_routes");
set_system(ctx, r);
rb->register_function(r, "error_injection",
"The error injection API");
@@ -132,16 +129,6 @@ future<> unset_server_storage_service(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_service(ctx, r); });
}
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr) {
return ctx.http_server.set_routes([&ctx, &cr] (routes& r) {
set_client_routes(ctx, r, cr);
});
}
future<> unset_server_client_routes(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_client_routes(ctx, r); });
}
future<> set_load_meter(http_context& ctx, service::load_meter& lm) {
return ctx.http_server.set_routes([&ctx, &lm] (routes& r) { set_load_meter(ctx, r, lm); });
}

View File

@@ -29,7 +29,6 @@ class storage_proxy;
class storage_service;
class raft_group0_client;
class raft_group_registry;
class client_routes_service;
} // namespace service
@@ -100,8 +99,6 @@ future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snit
future<> unset_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
future<> unset_server_storage_service(http_context& ctx);
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
future<> unset_server_client_routes(http_context& ctx);
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
future<> unset_server_sstables_loader(http_context& ctx);
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb, sharded<gms::gossiper>& g);

View File

@@ -1,176 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/http/short_streams.hh>
#include "client_routes.hh"
#include "api/api.hh"
#include "service/storage_service.hh"
#include "service/client_routes.hh"
#include "utils/rjson.hh"
#include "api/api-doc/client_routes.json.hh"
using namespace seastar::httpd;
using namespace std::chrono_literals;
using namespace json;
extern logging::logger apilog;
namespace api {
static void validate_client_routes_endpoint(sharded<service::client_routes_service>& cr, sstring endpoint_name) {
if (!cr.local().get_feature_service().client_routes) {
apilog.warn("{}: called before the cluster feature was enabled", endpoint_name);
throw std::runtime_error(fmt::format("{} requires all nodes to support the CLIENT_ROUTES cluster feature", endpoint_name));
}
}
static sstring parse_string(const char* name, rapidjson::Value const& v) {
const auto it = v.FindMember(name);
if (it == v.MemberEnd()) {
throw bad_param_exception(fmt::format("Missing '{}'", name));
}
if (!it->value.IsString()) {
throw bad_param_exception(fmt::format("'{}' must be a string", name));
}
return {it->value.GetString(), it->value.GetStringLength()};
}
static std::optional<uint32_t> parse_port(const char* name, rapidjson::Value const& v) {
const auto it = v.FindMember(name);
if (it == v.MemberEnd()) {
return std::nullopt;
}
if (!it->value.IsInt()) {
throw bad_param_exception(fmt::format("'{}' must be an integer", name));
}
auto port = it->value.GetInt();
if (port < 1 || port > 65535) {
throw bad_param_exception(fmt::format("'{}' value={} is outside the allowed port range", name, port));
}
return port;
}
static std::vector<service::client_routes_service::client_route_entry> parse_set_client_array(const rapidjson::Document& root) {
if (!root.IsArray()) {
throw bad_param_exception("Body must be a JSON array");
}
std::vector<service::client_routes_service::client_route_entry> v;
v.reserve(root.GetArray().Size());
for (const auto& element : root.GetArray()) {
if (!element.IsObject()) { throw bad_param_exception("Each element must be object"); }
const auto port = parse_port("port", element);
const auto tls_port = parse_port("tls_port", element);
const auto alternator_port = parse_port("alternator_port", element);
const auto alternator_https_port = parse_port("alternator_https_port", element);
if (!port.has_value() && !tls_port.has_value() && !alternator_port.has_value() && !alternator_https_port.has_value()) {
throw bad_param_exception("At least one port field ('port', 'tls_port', 'alternator_port', 'alternator_https_port') must be specified");
}
v.emplace_back(
parse_string("connection_id", element),
utils::UUID{parse_string("host_id", element)},
parse_string("address", element),
port,
tls_port,
alternator_port,
alternator_https_port
);
}
return v;
}
static
future<json::json_return_type>
rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
validate_client_routes_endpoint(cr, "rest_set_client_routes");
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
co_await cr.local().set_client_routes(parse_set_client_array(root));
co_return seastar::json::json_void();
}
static std::vector<service::client_routes_service::client_route_key> parse_delete_client_array(const rapidjson::Document& root) {
if (!root.IsArray()) {
throw bad_param_exception("Body must be a JSON array");
}
std::vector<service::client_routes_service::client_route_key> v;
v.reserve(root.GetArray().Size());
for (const auto& element : root.GetArray()) {
v.emplace_back(
parse_string("connection_id", element),
utils::UUID{parse_string("host_id", element)}
);
}
return v;
}
static
future<json::json_return_type>
rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
validate_client_routes_endpoint(cr, "delete_client_routes");
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
co_return seastar::json::json_void();
}
static
future<json::json_return_type>
rest_get_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
validate_client_routes_endpoint(cr, "get_client_routes");
co_return co_await cr.invoke_on(0, [] (service::client_routes_service& cr) -> future<json::json_return_type> {
co_return json::json_return_type(stream_range_as_array(co_await cr.get_client_routes(), [](const service::client_routes_service::client_route_entry & entry) {
seastar::httpd::client_routes_json::client_routes_entry obj;
obj.connection_id = entry.connection_id;
obj.host_id = fmt::to_string(entry.host_id);
obj.address = entry.address;
if (entry.port.has_value()) { obj.port = entry.port.value(); }
if (entry.tls_port.has_value()) { obj.tls_port = entry.tls_port.value(); }
if (entry.alternator_port.has_value()) { obj.alternator_port = entry.alternator_port.value(); }
if (entry.alternator_https_port.has_value()) { obj.alternator_https_port = entry.alternator_https_port.value(); }
return obj;
}));
});
}
void set_client_routes(http_context& ctx, routes& r, sharded<service::client_routes_service>& cr) {
seastar::httpd::client_routes_json::set_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
return rest_set_client_routes(ctx, cr, std::move(req));
});
seastar::httpd::client_routes_json::delete_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
return rest_delete_client_routes(ctx, cr, std::move(req));
});
seastar::httpd::client_routes_json::get_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
return rest_get_client_routes(ctx, cr, std::move(req));
});
}
void unset_client_routes(http_context& ctx, routes& r) {
seastar::httpd::client_routes_json::set_client_routes.unset(r);
seastar::httpd::client_routes_json::delete_client_routes.unset(r);
seastar::httpd::client_routes_json::get_client_routes.unset(r);
}
}

View File

@@ -1,20 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/sharded.hh>
#include <seastar/json/json_elements.hh>
#include "api/api_init.hh"
namespace api {
void set_client_routes(http_context& ctx, httpd::routes& r, sharded<service::client_routes_service>& cr);
void unset_client_routes(http_context& ctx, httpd::routes& r);
}

View File

@@ -13,22 +13,12 @@
#include "utils/rjson.hh"
#include <seastar/core/future-util.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/core/queue.hh>
#include <seastar/core/when_all.hh>
#include <seastar/core/sharded.hh>
namespace api {
using namespace seastar::httpd;
namespace hf = httpd::error_injection_json;
// Structure to hold error injection event data
struct injection_event {
sstring injection_name;
sstring injection_type;
unsigned shard_id;
};
void set_error_injection(http_context& ctx, routes& r) {
hf::enable_injection.set(r, [](std::unique_ptr<request> req) -> future<json::json_return_type> {
@@ -111,79 +101,6 @@ void set_error_injection(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(json::json_void());
});
});
// Server-Sent Events endpoint for injection events
// This allows clients to subscribe to real-time injection events instead of log parsing
r.add(operation_type::GET, url("/v2/error_injection/events"), [](std::unique_ptr<request> req) -> future<json::json_return_type> {
// Create a shared foreign_ptr to a queue that will receive events from all shards
// Using a queue on the current shard to collect events
using event_queue_t = seastar::queue<injection_event>;
auto event_queue = make_lw_shared<event_queue_t>();
auto queue_ptr = make_foreign(event_queue);
// Register callback on all shards to send events to our queue
auto& errinj = utils::get_local_injector();
// Capture the current shard ID for event delivery
auto target_shard = this_shard_id();
// Setup event callback that forwards events to the queue on the target shard
// Note: We use shared_ptr wrapper for foreign_ptr to make it copyable
auto callback = [queue_ptr = queue_ptr.copy(), target_shard] (std::string_view name, std::string_view type) {
injection_event evt{
.injection_name = sstring(name),
.injection_type = sstring(type),
.shard_id = this_shard_id()
};
// Send event to the target shard's queue (discard future, fire-and-forget)
(void)smp::submit_to(target_shard, [queue_ptr = queue_ptr.copy(), evt = std::move(evt)] () mutable {
return queue_ptr->push_eventually(std::move(evt));
});
};
// Register the callback on all shards
co_await errinj.register_event_callback_on_all(callback);
// Return a streaming function that sends SSE events
noncopyable_function<future<>(output_stream<char>&&)> stream_func =
[event_queue](output_stream<char>&& os) -> future<> {
auto s = std::move(os);
std::exception_ptr ex;
try {
// Send initial SSE comment to establish connection
co_await s.write(": connected\n\n");
co_await s.flush();
// Stream events as they arrive from any shard
while (true) {
auto evt = co_await event_queue->pop_eventually();
// Format as SSE event
// data: {"injection":"name","type":"handler","shard":0}
auto json_data = format("{{\"injection\":\"{}\",\"type\":\"{}\",\"shard\":{}}}",
evt.injection_name, evt.injection_type, evt.shard_id);
co_await s.write(format("data: {}\n\n", json_data));
co_await s.flush();
}
} catch (...) {
ex = std::current_exception();
}
// Cleanup: clear callbacks on all shards
co_await utils::get_local_injector().clear_event_callbacks_on_all();
co_await s.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
};
co_return json::json_return_type(std::move(stream_func));
});
}
} // namespace api

View File

@@ -515,15 +515,6 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto sstables = parsed.GetArray() |
std::views::transform([] (const auto& s) { return sstring(rjson::to_string_view(s)); }) |
std::ranges::to<std::vector>();
apilog.info("Restore invoked with following parameters: keyspace={}, table={}, endpoint={}, bucket={}, prefix={}, sstables_count={}, scope={}, primary_replica_only={}",
keyspace,
table,
endpoint,
bucket,
prefix,
sstables.size(),
scope,
primary_replica_only);
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket, scope, primary_replica_only);
co_return json::json_return_type(fmt::to_string(task_id));
});
@@ -556,13 +547,17 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
vp.insert(b.second);
}
}
std::vector<sstring> res;
replica::database& db = vb.local().get_db();
auto uuid = validate_table(db, ks, cf_name);
replica::column_family& cf = db.find_column_family(uuid);
co_return cf.get_index_manager().list_indexes()
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
| std::ranges::to<std::vector>();
res.reserve(cf.get_index_manager().list_indexes().size());
for (auto&& i : cf.get_index_manager().list_indexes()) {
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
res.emplace_back(i.metadata().name());
}
}
co_return res;
});
}
@@ -2025,14 +2020,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
};
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
try {
if (column_families.empty()) {
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
} else {
if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified");
@@ -2040,7 +2033,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
}
co_return json_void();
} catch (...) {
@@ -2075,8 +2068,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
}
compaction::compaction_stats stats;

View File

@@ -9,7 +9,6 @@
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/http/exception.hh>
#include "task_manager.hh"
@@ -265,7 +264,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
if (id) {
module->unregister_task(id);
}
co_await coroutine::maybe_yield();
co_await maybe_yield();
}
});
co_return json_void();

View File

@@ -146,8 +146,7 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();

View File

@@ -209,11 +209,15 @@ future<> audit::stop_audit() {
});
}
audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch) {
audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table) {
if (!audit_instance().local_is_initialized()) {
return nullptr;
}
return std::make_unique<audit_info>(cat, keyspace, table, batch);
return std::make_unique<audit_info>(cat, keyspace, table);
}
audit_info_ptr audit::create_no_audit_info() {
return audit_info_ptr();
}
future<> audit::start(const db::config& cfg) {
@@ -263,21 +267,18 @@ future<> audit::log_login(const sstring& username, socket_address client_ip, boo
}
future<> inspect(shared_ptr<cql3::cql_statement> statement, service::query_state& query_state, const cql3::query_options& options, bool error) {
auto audit_info = statement->get_audit_info();
if (!audit_info) {
return make_ready_future<>();
}
if (audit_info->batch()) {
cql3::statements::batch_statement* batch = static_cast<cql3::statements::batch_statement*>(statement.get());
cql3::statements::batch_statement* batch = dynamic_cast<cql3::statements::batch_statement*>(statement.get());
if (batch != nullptr) {
return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) {
return inspect(m.statement, query_state, options, error);
});
} else {
if (audit::local_audit_instance().should_log(audit_info)) {
auto audit_info = statement->get_audit_info();
if (bool(audit_info) && audit::local_audit_instance().should_log(audit_info)) {
return audit::local_audit_instance().log(audit_info, query_state, options, error);
}
return make_ready_future<>();
}
return make_ready_future<>();
}
future<> inspect_login(const sstring& username, socket_address client_ip, bool error) {

View File

@@ -75,13 +75,11 @@ class audit_info final {
sstring _keyspace;
sstring _table;
sstring _query;
bool _batch;
public:
audit_info(statement_category cat, sstring keyspace, sstring table, bool batch)
audit_info(statement_category cat, sstring keyspace, sstring table)
: _category(cat)
, _keyspace(std::move(keyspace))
, _table(std::move(table))
, _batch(batch)
{ }
void set_query_string(const std::string_view& query_string) {
_query = sstring(query_string);
@@ -91,7 +89,6 @@ public:
const sstring& query() const { return _query; }
sstring category_string() const;
statement_category category() const { return _category; }
bool batch() const { return _batch; }
};
using audit_info_ptr = std::unique_ptr<audit_info>;
@@ -129,7 +126,8 @@ public:
}
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table);
static audit_info_ptr create_no_audit_info();
audit(locator::shared_token_metadata& stm,
cql3::query_processor& qp,
service::migration_manager& mm,

View File

@@ -53,10 +53,10 @@ static std::string json_escape(std::string_view str) {
}
future<> audit_syslog_storage_helper::syslog_send_helper(temporary_buffer<char> msg) {
future<> audit_syslog_storage_helper::syslog_send_helper(const sstring& msg) {
try {
auto lock = co_await get_units(_semaphore, 1, std::chrono::hours(1));
co_await _sender.send(_syslog_address, std::span(&msg, 1));
co_await _sender.send(_syslog_address, net::packet{msg.data(), msg.size()});
}
catch (const std::exception& e) {
auto error_msg = seastar::format(
@@ -90,7 +90,7 @@ future<> audit_syslog_storage_helper::start(const db::config& cfg) {
co_return;
}
co_await syslog_send_helper(temporary_buffer<char>::copy_of("Initializing syslog audit backend."));
co_await syslog_send_helper("Initializing syslog audit backend.");
}
future<> audit_syslog_storage_helper::stop() {
@@ -120,7 +120,7 @@ future<> audit_syslog_storage_helper::write(const audit_info* audit_info,
audit_info->table(),
username);
co_await syslog_send_helper(std::move(msg).release());
co_await syslog_send_helper(msg);
}
future<> audit_syslog_storage_helper::write_login(const sstring& username,
@@ -139,7 +139,7 @@ future<> audit_syslog_storage_helper::write_login(const sstring& username,
client_ip,
username);
co_await syslog_send_helper(std::move(msg).release());
co_await syslog_send_helper(msg.c_str());
}
}

View File

@@ -26,7 +26,7 @@ class audit_syslog_storage_helper : public storage_helper {
net::datagram_channel _sender;
seastar::semaphore _semaphore;
future<> syslog_send_helper(seastar::temporary_buffer<char> msg);
future<> syslog_send_helper(const sstring& msg);
public:
explicit audit_syslog_storage_helper(cql3::query_processor&, service::migration_manager&);
virtual ~audit_syslog_storage_helper();

View File

@@ -9,6 +9,7 @@
#include "auth/allow_all_authenticator.hh"
#include "service/migration_manager.hh"
#include "utils/alien_worker.hh"
#include "utils/class_registrator.hh"
namespace auth {
@@ -22,6 +23,7 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -14,6 +14,7 @@
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
class query_processor;
@@ -29,7 +30,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
}
virtual future<> start() override {

View File

@@ -15,7 +15,6 @@
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/core/abort_source.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
@@ -23,11 +22,9 @@ namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp, abort_source& as) noexcept
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp)
, _loading_sem(1)
, _as(as) {
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
@@ -119,8 +116,6 @@ future<> cache::load_all() {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
auto units = co_await get_units(_loading_sem, 1, _as);
++_current_version;
logger.info("Loading all roles");
@@ -151,9 +146,6 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
auto units = co_await get_units(_loading_sem, 1, _as);
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);

View File

@@ -8,7 +8,6 @@
#pragma once
#include <seastar/core/abort_source.hh>
#include <unordered_set>
#include <unordered_map>
@@ -16,7 +15,6 @@
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/semaphore.hh>
#include <absl/container/flat_hash_map.h>
@@ -43,7 +41,7 @@ public:
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp, abort_source& as) noexcept;
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
@@ -54,8 +52,6 @@ private:
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
semaphore _loading_sem;
abort_source& _as;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;

View File

@@ -8,7 +8,6 @@
*/
#include "auth/certificate_authenticator.hh"
#include "auth/cache.hh"
#include <boost/regex.hpp>
#include <fmt/ranges.h>
@@ -35,13 +34,13 @@ static const class_registrator<auth::authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();
@@ -76,9 +75,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
}
continue;
} catch (const std::out_of_range&) {
} catch (std::out_of_range&) {
// just fallthrough
} catch (const boost::regex_error&) {
} catch (boost::regex_error&) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
}
}

View File

@@ -10,6 +10,7 @@
#pragma once
#include "auth/authenticator.hh"
#include "utils/alien_worker.hh"
#include <boost/regex_fwd.hpp> // IWYU pragma: keep
namespace cql3 {
@@ -25,15 +26,13 @@ class raft_group0_client;
namespace auth {
class cache;
extern const std::string_view certificate_authenticator_name;
class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
~certificate_authenticator();
future<> start() override;

View File

@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
try {
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
} catch (const exceptions::already_exists_exception&) {}
} catch (exceptions::already_exists_exception&) {}
}
}

View File

@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
} else {
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
}
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
}
}
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
[resource](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}

View File

@@ -49,7 +49,8 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
cache&,
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -63,13 +64,14 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
, _hashing_worker(hashing_worker)
{}
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
@@ -328,18 +330,20 @@ future<authenticated_user> password_authenticator::authenticate(
}
salted_hash = role->salted_hash;
}
const bool password_match = co_await passwords::check(password, *salted_hash);
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
return passwords::check(password, *salted_hash);
});
if (!password_match) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
co_return username;
} catch (const std::system_error &) {
} catch (std::system_error &) {
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (const exceptions::authentication_exception& e) {
} catch (exceptions::authentication_exception& e) {
std::throw_with_nested(e);
} catch (const exceptions::unavailable_exception& e) {
} catch (exceptions::unavailable_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));

View File

@@ -18,6 +18,7 @@
#include "auth/passwords.hh"
#include "auth/cache.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
namespace db {
class config;
@@ -48,12 +49,13 @@ class password_authenticator : public authenticator {
shared_promise<> _superuser_created_promise;
// We used to also support bcrypt, SHA-256, and MD5 (ref. scylladb#24524).
constexpr static auth::passwords::scheme _scheme = passwords::scheme::sha_512;
utils::alien_worker& _hashing_worker;
public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
~password_authenticator();

View File

@@ -7,8 +7,6 @@
*/
#include "auth/passwords.hh"
#include "utils/crypt_sha512.hh"
#include <seastar/core/coroutine.hh>
#include <cerrno>
@@ -23,46 +21,25 @@ static thread_local crypt_data tlcrypt = {};
namespace detail {
void verify_hashing_output(const char * res) {
if (!res || (res[0] == '*')) {
throw std::system_error(errno, std::system_category());
}
}
void verify_scheme(scheme scheme) {
const sstring random_part_of_salt = "aaaabbbbccccdddd";
const sstring salt = sstring(prefix_for_scheme(scheme)) + random_part_of_salt;
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
try {
verify_hashing_output(e);
} catch (const std::system_error& ex) {
throw no_supported_schemes();
if (e && (e[0] != '*')) {
return;
}
throw no_supported_schemes();
}
sstring hash_with_salt(const sstring& pass, const sstring& salt) {
auto res = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
verify_hashing_output(res);
return res;
}
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt) {
sstring res;
// Only SHA-512 hashes for passphrases shorter than 256 bytes can be computed using
// the __crypt_sha512 method. For other computations, we fall back to the
// crypt_r implementation from `<crypt.h>`, which can stall.
if (salt.starts_with(prefix_for_scheme(scheme::sha_512)) && pass.size() <= 255) {
char buf[128];
const char * output_ptr = co_await __crypt_sha512(pass.c_str(), salt.c_str(), buf);
verify_hashing_output(output_ptr);
res = output_ptr;
} else {
const char * output_ptr = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
verify_hashing_output(output_ptr);
res = output_ptr;
if (!res || (res[0] == '*')) {
throw std::system_error(errno, std::system_category());
}
co_return res;
return res;
}
std::string_view prefix_for_scheme(scheme c) noexcept {
@@ -81,9 +58,8 @@ no_supported_schemes::no_supported_schemes()
: std::runtime_error("No allowed hashing schemes are supported on this system") {
}
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash) {
const auto pwd_hash = co_await detail::hash_with_salt_async(pass, salted_hash);
co_return pwd_hash == salted_hash;
bool check(const sstring& pass, const sstring& salted_hash) {
return detail::hash_with_salt(pass, salted_hash) == salted_hash;
}
} // namespace auth::passwords

View File

@@ -11,7 +11,6 @@
#include <random>
#include <stdexcept>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
@@ -76,23 +75,11 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
///
/// Hash a password combined with an implementation-specific salt string.
/// Deprecated in favor of `hash_with_salt_async`. This function is still used
/// when generating password hashes for storage to ensure that
/// `hash_with_salt` and `hash_with_salt_async` produce identical results,
/// preserving backward compatibility.
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
sstring hash_with_salt(const sstring& pass, const sstring& salt);
///
/// Async version of `hash_with_salt` that returns a future.
/// If possible, hashing uses `coroutine::maybe_yield` to prevent reactor stalls.
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt);
} // namespace detail
///
@@ -120,6 +107,6 @@ sstring hash(const sstring& pass, RandomNumberEngine& g, scheme scheme) {
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash);
bool check(const sstring& pass, const sstring& salted_hash);
} // namespace auth::passwords

View File

@@ -35,9 +35,10 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
cache&,
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -12,6 +12,7 @@
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
class query_processor;
@@ -29,7 +30,7 @@ namespace auth {
class saslauthd_authenticator : public authenticator {
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
public:
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
future<> start() override;

View File

@@ -191,7 +191,8 @@ service::service(
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache)
cache& cache,
utils::alien_worker& hashing_worker)
: service(
std::move(c),
cache,
@@ -199,7 +200,7 @@ service::service(
g0,
mn,
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
used_by_maintenance_socket) {
}
@@ -225,7 +226,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (const ::service::group0_concurrent_modification&) {
} catch (::service::group0_concurrent_modification&) {
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}
@@ -876,6 +877,22 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
continue; // some tables might not have been created if they were not used
}
// use longer than usual timeout as we scan the whole table
// but not infinite or very long as we want to fail reasonably fast
const auto t = 5min;
const timeout_config tc{t, t, t, t, t, t, t};
::service::client_state cs(::service::client_state::internal_tag{}, tc);
::service::query_state qs(cs, empty_service_permit());
auto rows = co_await qp.execute_internal(
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
db::consistency_level::ALL,
qs,
{},
cql3::query_processor::cache_internal::no);
if (rows->empty()) {
continue;
}
std::vector<sstring> col_names;
for (const auto& col : schema->all_columns()) {
col_names.push_back(col.name_as_cql_string());
@@ -884,51 +901,30 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
for (size_t i = 1; i < col_names.size(); ++i) {
val_binders_str += ", ?";
}
std::vector<mutation> collected;
// use longer than usual timeout as we scan the whole table
// but not infinite or very long as we want to fail reasonably fast
const auto t = 5min;
const timeout_config tc{t, t, t, t, t, t, t};
::service::client_state cs(::service::client_state::internal_tag{}, tc);
::service::query_state qs(cs, empty_service_permit());
co_await qp.query_internal(
seastar::format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
db::consistency_level::ALL,
{},
1000,
[&qp, &cf_name, &col_names, &val_binders_str, &schema, ts, &collected] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
std::vector<data_value_or_unset> values;
for (const auto& col : schema->all_columns()) {
if (row.has(col.name_as_text())) {
values.push_back(
col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
} else {
values.push_back(unset_value{});
}
for (const auto& row : *rows) {
std::vector<data_value_or_unset> values;
for (const auto& col : schema->all_columns()) {
if (row.has(col.name_as_text())) {
values.push_back(
col.type->deserialize(row.get_blob_unfragmented(col.name_as_text())));
} else {
values.push_back(unset_value{});
}
auto muts = co_await qp.get_mutations_internal(
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
db::system_keyspace::NAME,
cf_name,
fmt::join(col_names, ", "),
val_binders_str),
internal_distributed_query_state(),
ts,
std::move(values));
if (muts.size() != 1) {
on_internal_error(log,
format("expecting single insert mutation, got {}", muts.size()));
}
collected.push_back(std::move(muts[0]));
co_return stop_iteration::no;
},
std::move(qs));
for (auto& m : collected) {
co_yield std::move(m);
}
auto muts = co_await qp.get_mutations_internal(
seastar::format("INSERT INTO {}.{} ({}) VALUES ({})",
db::system_keyspace::NAME,
cf_name,
fmt::join(col_names, ", "),
val_binders_str),
internal_distributed_query_state(),
ts,
std::move(values));
if (muts.size() != 1) {
on_internal_error(log,
format("expecting single insert mutation, got {}", muts.size()));
}
co_yield std::move(muts[0]);
}
}
co_yield co_await sys_ks.make_auth_version_mutation(ts,

View File

@@ -27,6 +27,7 @@
#include "cql3/description.hh"
#include "seastarx.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
#include "utils/observable.hh"
#include "utils/serialized_action.hh"
#include "service/maintenance_mode.hh"
@@ -130,7 +131,8 @@ public:
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&);
cache&,
utils::alien_worker&);
future<> start(::service::migration_manager&, db::system_keyspace&);

View File

@@ -52,6 +52,13 @@ static const class_registrator<
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
sstring name;
bool is_superuser;
bool can_login;
role_set member_of;
};
static db::consistency_level consistency_for_role(std::string_view role_name) noexcept {
if (role_name == meta::DEFAULT_SUPERUSER_NAME) {
return db::consistency_level::QUORUM;
@@ -60,13 +67,13 @@ static db::consistency_level consistency_for_role(std::string_view role_name) no
return db::consistency_level::LOCAL_ONE;
}
future<std::optional<standard_role_manager::record>> standard_role_manager::legacy_find_record(std::string_view role_name) {
static future<std::optional<record>> find_record(cql3::query_processor& qp, std::string_view role_name) {
const sstring query = seastar::format("SELECT * FROM {}.{} WHERE {} = ?",
get_auth_ks_name(_qp),
get_auth_ks_name(qp),
meta::roles_table::name,
meta::roles_table::role_col_name);
const auto results = co_await _qp.execute_internal(
const auto results = co_await qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_query_state(),
@@ -86,25 +93,8 @@ future<std::optional<standard_role_manager::record>> standard_role_manager::lega
: role_set())});
}
future<std::optional<standard_role_manager::record>> standard_role_manager::find_record(std::string_view role_name) {
if (legacy_mode(_qp)) {
return legacy_find_record(role_name);
}
auto name = sstring(role_name);
auto role = _cache.get(name);
if (!role) {
return make_ready_future<std::optional<record>>(std::nullopt);
}
return make_ready_future<std::optional<record>>(std::make_optional(record{
.name = std::move(name),
.is_superuser = role->is_superuser,
.can_login = role->can_login,
.member_of = role->member_of
}));
}
future<standard_role_manager::record> standard_role_manager::require_record(std::string_view role_name) {
return find_record(role_name).then([role_name](std::optional<record> mr) {
static future<record> require_record(cql3::query_processor& qp, std::string_view role_name) {
return find_record(qp, role_name).then([role_name](std::optional<record> mr) {
if (!mr) {
throw nonexistant_role(role_name);
}
@@ -202,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
log.info("Created default superuser role '{}'.", _superuser);
} catch (const exceptions::unavailable_exception& e) {
} catch(const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}
@@ -396,7 +386,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
return fmt::to_string(fmt::join(assignments, ", "));
};
return require_record(role_name).then([this, role_name, &u, &mc](record) {
return require_record(_qp, role_name).then([this, role_name, &u, &mc](record) {
if (!u.is_superuser && !u.can_login) {
return make_ready_future<>();
}
@@ -630,17 +620,18 @@ standard_role_manager::revoke(std::string_view revokee_name, std::string_view ro
});
}
future<> standard_role_manager::collect_roles(
static future<> collect_roles(
cql3::query_processor& qp,
std::string_view grantee_name,
bool recurse,
role_set& roles) {
return require_record(grantee_name).then([this, &roles, recurse](standard_role_manager::record r) {
return do_with(std::move(r.member_of), [this, &roles, recurse](const role_set& memberships) {
return do_for_each(memberships.begin(), memberships.end(), [this, &roles, recurse](const sstring& role_name) {
return require_record(qp, grantee_name).then([&qp, &roles, recurse](record r) {
return do_with(std::move(r.member_of), [&qp, &roles, recurse](const role_set& memberships) {
return do_for_each(memberships.begin(), memberships.end(), [&qp, &roles, recurse](const sstring& role_name) {
roles.insert(role_name);
if (recurse) {
return collect_roles(role_name, true, roles);
return collect_roles(qp, role_name, true, roles);
}
return make_ready_future<>();
@@ -655,7 +646,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
return do_with(
role_set{sstring(grantee_name)},
[this, grantee_name, recurse](role_set& roles) {
return collect_roles(grantee_name, recurse, roles).then([&roles] { return roles; });
return collect_roles(_qp, grantee_name, recurse, roles).then([&roles] { return roles; });
});
}
@@ -715,21 +706,27 @@ future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
}
future<bool> standard_role_manager::exists(std::string_view role_name) {
return find_record(role_name).then([](std::optional<record> mr) {
return find_record(_qp, role_name).then([](std::optional<record> mr) {
return static_cast<bool>(mr);
});
}
future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
return require_record(role_name).then([](record r) {
return require_record(_qp, role_name).then([](record r) {
return r.is_superuser;
});
}
future<bool> standard_role_manager::can_login(std::string_view role_name) {
return require_record(role_name).then([](record r) {
return r.can_login;
});
if (legacy_mode(_qp)) {
const auto r = co_await require_record(_qp, role_name);
co_return r.can_login;
}
auto role = _cache.get(sstring(role_name));
if (!role) {
throw nonexistant_role(role_name);
}
co_return role->can_login;
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {

View File

@@ -90,12 +90,6 @@ public:
private:
enum class membership_change { add, remove };
struct record final {
sstring name;
bool is_superuser;
bool can_login;
role_set member_of;
};
future<> create_legacy_metadata_tables_if_missing() const;
@@ -113,14 +107,6 @@ private:
future<> legacy_modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change);
future<> modify_membership(std::string_view role_name, std::string_view grantee_name, membership_change, ::service::group0_batch& mc);
future<std::optional<record>> legacy_find_record(std::string_view role_name);
future<std::optional<record>> find_record(std::string_view role_name);
future<record> require_record(std::string_view role_name);
future<> collect_roles(
std::string_view grantee_name,
bool recurse,
role_set& roles);
};
} // namespace auth

View File

@@ -38,8 +38,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -81,7 +81,7 @@ public:
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -126,7 +126,7 @@ public:
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
_complete = true;
return {};
}
@@ -141,7 +141,7 @@ public:
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -241,7 +241,8 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
auth::cache&,
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<
auth::authorizer,

View File

@@ -204,7 +204,7 @@ future<topology_description> topology_description::clone_async() const {
for (const auto& entry : _entries) {
vec.push_back(entry);
co_await coroutine::maybe_yield();
co_await seastar::maybe_yield();
}
co_return topology_description{std::move(vec)};
@@ -814,7 +814,8 @@ generation_service::generation_service(
config cfg, gms::gossiper& g, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
abort_source& abort_src, const locator::shared_token_metadata& stm, gms::feature_service& f,
replica::database& db)
replica::database& db,
std::function<bool()> raft_topology_change_enabled)
: _cfg(std::move(cfg))
, _gossiper(g)
, _sys_dist_ks(sys_dist_ks)
@@ -823,6 +824,7 @@ generation_service::generation_service(
, _token_metadata(stm)
, _feature_service(f)
, _db(db)
, _raft_topology_change_enabled(std::move(raft_topology_change_enabled))
{
}
@@ -876,7 +878,16 @@ future<> generation_service::on_join(gms::inet_address ep, locator::host_id id,
future<> generation_service::on_change(gms::inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
assert_shard_zero(__PRETTY_FUNCTION__);
return make_ready_future<>();
if (_raft_topology_change_enabled()) {
return make_ready_future<>();
}
return on_application_state_change(ep, id, states, gms::application_state::CDC_GENERATION_ID, pid, [this] (gms::inet_address ep, locator::host_id id, const gms::versioned_value& v, gms::permit_id) {
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value());
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
return legacy_handle_cdc_generation(gen_id);
});
}
future<> generation_service::check_and_repair_cdc_streams() {

View File

@@ -79,12 +79,17 @@ private:
std::optional<cdc::generation_id> _gen_id;
future<> _cdc_streams_rewrite_complete = make_ready_future<>();
/* Returns true if raft topology changes are enabled.
* Can only be called from shard 0.
*/
std::function<bool()> _raft_topology_change_enabled;
public:
generation_service(config cfg, gms::gossiper&,
sharded<db::system_distributed_keyspace>&,
sharded<db::system_keyspace>& sys_ks,
abort_source&, const locator::shared_token_metadata&,
gms::feature_service&, replica::database& db);
gms::feature_service&, replica::database& db,
std::function<bool()> raft_topology_change_enabled);
future<> stop();
~generation_service();

View File

@@ -15,7 +15,7 @@
#include "mutation/tombstone.hh"
#include "schema/schema.hh"
#include <seastar/core/sstring.hh>
#include "seastar/core/sstring.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "types/user.hh"

View File

@@ -10,9 +10,7 @@
#include <seastar/net/inet_address.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include "utils/loading_shared_values.hh"
#include <list>
#include <optional>
enum class client_type {
@@ -29,20 +27,6 @@ enum class client_connection_stage {
ready,
};
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
struct options_cache_value_type {};
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
using client_options_cache_key_type = client_options_cache_type::key_type;
// This struct represents a single OPTION key-value pair from the client's connection options.
// Both key and value are represented by corresponding "references" to their cached values.
// Each "reference" is effectively a lw_shared_ptr value.
struct client_option_key_value_cached_entry {
client_options_cache_entry_type key;
client_options_cache_entry_type value;
};
sstring to_string(client_connection_stage ct);
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
@@ -53,8 +37,8 @@ struct client_data {
client_connection_stage connection_stage = client_connection_stage::established;
int32_t shard_id; /// ID of server-side shard which is processing the connection.
std::optional<client_options_cache_entry_type> driver_name;
std::optional<client_options_cache_entry_type> driver_version;
std::optional<sstring> driver_name;
std::optional<sstring> driver_version;
std::optional<sstring> hostname;
std::optional<int32_t> protocol_version;
std::optional<sstring> ssl_cipher_suite;
@@ -62,7 +46,6 @@ struct client_data {
std::optional<sstring> ssl_protocol;
std::optional<sstring> username;
std::optional<sstring> scheduling_group_name;
std::list<client_option_key_value_cached_entry> client_options;
sstring stage_str() const { return to_string(connection_stage); }
sstring client_type_str() const { return to_string(ct); }

View File

@@ -125,6 +125,10 @@ if(target_arch)
add_compile_options("-march=${target_arch}")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
endif()
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")

View File

@@ -12,7 +12,6 @@
#include <seastar/core/condition-variable.hh>
#include "schema/schema_fwd.hh"
#include "sstables/open_info.hh"
#include "compaction_descriptor.hh"
class reader_permit;
@@ -45,7 +44,7 @@ public:
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
virtual sstables::shared_sstable make_sstable() const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;

View File

@@ -416,9 +416,7 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
}
descriptor.creator = [&t] (shard_id) {
// All compaction types going through this path will work on normal input sstables only.
// Off-strategy, for example, waits until the sstables move out of staging state.
return t.make_sstable(sstables::sstable_state::normal);
return t.make_sstable();
};
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
@@ -1849,10 +1847,6 @@ protected:
throw make_compaction_stopped_exception();
}
}, false);
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
throw make_compaction_stopped_exception();
}
co_return co_await do_rewrite_sstable(std::move(sst));
}
};
@@ -2290,16 +2284,12 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
}
future<std::vector<sstables::shared_sstable>>
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
// Throw an error if split cannot be performed due to e.g. out of space prevention.
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
// which is unneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
if (is_disabled()) {
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
"reason might be out of space prevention", sst->get_filename()))));
if (!can_proceed(&t)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
std::vector<sstables::shared_sstable> ret;
@@ -2307,11 +2297,8 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
compaction_progress_monitor monitor;
compaction_data info = create_compaction_data();
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
desc.creator = [&t, sst] (shard_id _) {
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
// For example, if base table has views, it's important that sstable produced by repair will be
// in the staging state.
return t.make_sstable(sst->state());
desc.creator = [&t] (shard_id _) {
return t.make_sstable();
};
desc.replacer = [&] (compaction_completion_desc d) {
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));

View File

@@ -376,8 +376,7 @@ public:
// Splits a single SSTable by segregating all its data according to the classifier.
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
// Run a custom job for a given table, defined by a function
// it completes when future returned by job is ready or returns immediately

View File

@@ -571,10 +571,10 @@ commitlog_total_space_in_mb: -1
# - "none": auditing is disabled (default)
# - "table": save audited events in audit.audit_log column family
# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log)
audit: "table"
# audit: "none"
#
# List of statement categories that should be audited.
audit_categories: "DCL,DDL,AUTH,ADMIN"
# audit_categories: "DCL,DDL,AUTH"
#
# List of tables that should be audited.
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"

View File

@@ -368,87 +368,6 @@ def find_ninja():
sys.exit(1)
def find_compiler(name):
"""
Find a compiler by name, skipping ccache wrapper directories.
This is useful when using sccache to avoid double-caching through ccache.
Args:
name: The compiler name (e.g., 'clang++', 'clang', 'gcc')
Returns:
Path to the compiler, skipping ccache directories, or None if not found.
"""
ccache_dirs = {'/usr/lib/ccache', '/usr/lib64/ccache'}
for path_dir in os.environ.get('PATH', '').split(os.pathsep):
# Skip ccache wrapper directories
if os.path.realpath(path_dir) in ccache_dirs or path_dir in ccache_dirs:
continue
candidate = os.path.join(path_dir, name)
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
return candidate
return None
def resolve_compilers_for_compiler_cache(args, compiler_cache):
"""
When using a compiler cache, resolve compiler paths to avoid ccache directories.
This prevents double-caching when ccache symlinks are in PATH.
Args:
args: The argument namespace with cc and cxx attributes.
compiler_cache: Path to the compiler cache binary, or None.
"""
if not compiler_cache:
return
if not os.path.isabs(args.cxx):
real_cxx = find_compiler(args.cxx)
if real_cxx:
args.cxx = real_cxx
if not os.path.isabs(args.cc):
real_cc = find_compiler(args.cc)
if real_cc:
args.cc = real_cc
def find_compiler_cache(preference):
"""
Find a compiler cache based on the preference.
Args:
preference: One of 'auto', 'sccache', 'ccache', 'none', or a path to a binary.
Returns:
Path to the compiler cache binary, or None if not found/disabled.
"""
if preference == 'none':
return None
if preference == 'auto':
# Prefer sccache over ccache
for cache in ['sccache', 'ccache']:
path = which(cache)
if path:
return path
return None
if preference in ('sccache', 'ccache'):
path = which(preference)
if path:
return path
print(f"Warning: {preference} not found on PATH, disabling compiler cache")
return None
# Assume it's a path to a binary
if os.path.isfile(preference) and os.access(preference, os.X_OK):
return preference
print(f"Warning: compiler cache '{preference}' not found or not executable, disabling compiler cache")
return None
modes = {
'debug': {
'cxxflags': '-DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
@@ -725,9 +644,29 @@ raft_tests = set([
vector_search_tests = set([
'test/vector_search/vector_store_client_test',
'test/vector_search/load_balancer_test',
'test/vector_search/client_test',
'test/vector_search/filter_test',
'test/vector_search/rescoring_test'
'test/vector_search/client_test'
])
vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator'
vector_search_validator_deps = set([
'test/vector_search_validator/build-validator',
'test/vector_search_validator/Cargo.toml',
'test/vector_search_validator/crates/validator/Cargo.toml',
'test/vector_search_validator/crates/validator/src/main.rs',
'test/vector_search_validator/crates/validator-scylla/Cargo.toml',
'test/vector_search_validator/crates/validator-scylla/src/lib.rs',
'test/vector_search_validator/crates/validator-scylla/src/cql.rs',
])
vector_store_bin = 'vector-search-validator/bin/vector-store'
vector_store_deps = set([
'test/vector_search_validator/build-env',
'test/vector_search_validator/build-vector-store',
])
vector_search_validator_bins = set([
vector_search_validator_bin,
vector_store_bin,
])
wasms = set([
@@ -763,7 +702,7 @@ other = set([
'iotune',
])
all_artifacts = apps | cpp_apps | tests | other | wasms
all_artifacts = apps | cpp_apps | tests | other | wasms | vector_search_validator_bins
arg_parser = argparse.ArgumentParser('Configure scylla', add_help=False, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('--out', dest='buildfile', action='store', default='build.ninja',
@@ -793,11 +732,6 @@ arg_parser.add_argument('--compiler', action='store', dest='cxx', default='clang
help='C++ compiler path')
arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='clang',
help='C compiler path')
arg_parser.add_argument('--compiler-cache', action='store', dest='compiler_cache', default='auto',
help='Compiler cache to use: auto (default, prefers sccache), sccache, ccache, none, or a path to a binary')
# Workaround for https://github.com/mozilla/sccache/issues/2575
arg_parser.add_argument('--sccache-rust', action=argparse.BooleanOptionalAction, default=False,
help='Use sccache for rust code (if sccache is selected as compiler cache). Doesn\'t work with distributed builds.')
add_tristate(arg_parser, name='dpdk', dest='dpdk', default=False,
help='Use dpdk (from seastar dpdk sources)')
arg_parser.add_argument('--dpdk-target', action='store', dest='dpdk_target', default='',
@@ -925,10 +859,10 @@ scylla_core = (['message/messaging_service.cc',
'utils/alien_worker.cc',
'utils/array-search.cc',
'utils/base64.cc',
'utils/crypt_sha512.cc',
'utils/logalloc.cc',
'utils/large_bitset.cc',
'test/lib/limiting_data_source.cc',
'utils/buffer_input_stream.cc',
'utils/limiting_data_source.cc',
'utils/updateable_value.cc',
'message/dictionary_service.cc',
'utils/directories.cc',
@@ -1016,10 +950,6 @@ scylla_core = (['message/messaging_service.cc',
'cql3/functions/aggregate_fcts.cc',
'cql3/functions/castas_fcts.cc',
'cql3/functions/error_injection_fcts.cc',
'cql3/statements/strong_consistency/modification_statement.cc',
'cql3/statements/strong_consistency/select_statement.cc',
'cql3/statements/strong_consistency/statement_helpers.cc',
'cql3/functions/vector_similarity_fcts.cc',
'cql3/statements/cf_prop_defs.cc',
'cql3/statements/cf_statement.cc',
'cql3/statements/authentication_statement.cc',
@@ -1044,8 +974,8 @@ scylla_core = (['message/messaging_service.cc',
'cql3/statements/raw/parsed_statement.cc',
'cql3/statements/property_definitions.cc',
'cql3/statements/update_statement.cc',
'cql3/statements/broadcast_modification_statement.cc',
'cql3/statements/broadcast_select_statement.cc',
'cql3/statements/strongly_consistent_modification_statement.cc',
'cql3/statements/strongly_consistent_select_statement.cc',
'cql3/statements/delete_statement.cc',
'cql3/statements/prune_materialized_view_statement.cc',
'cql3/statements/batch_statement.cc',
@@ -1077,7 +1007,6 @@ scylla_core = (['message/messaging_service.cc',
'cql3/statements/list_service_level_attachments_statement.cc',
'cql3/statements/list_effective_service_level_statement.cc',
'cql3/statements/describe_statement.cc',
'cql3/statements/view_prop_defs.cc',
'cql3/update_parameters.cc',
'cql3/util.cc',
'cql3/ut_name.cc',
@@ -1133,6 +1062,7 @@ scylla_core = (['message/messaging_service.cc',
'db/hints/resource_manager.cc',
'db/hints/sync_point.cc',
'db/large_data_handler.cc',
'db/legacy_schema_migrator.cc',
'db/marshal/type_parser.cc',
'db/per_partition_rate_limit_options.cc',
'db/rate_limiter.cc',
@@ -1174,7 +1104,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/gz/crc_combine.cc',
'utils/gz/crc_combine_table.cc',
'utils/http.cc',
'utils/http_client_error_processing.cc',
'utils/rest/client.cc',
'utils/s3/aws_error.cc',
'utils/s3/client.cc',
@@ -1229,7 +1158,6 @@ scylla_core = (['message/messaging_service.cc',
'locator/topology.cc',
'locator/util.cc',
'service/client_state.cc',
'service/client_routes.cc',
'service/storage_service.cc',
'service/session.cc',
'service/task_manager_module.cc',
@@ -1337,9 +1265,6 @@ scylla_core = (['message/messaging_service.cc',
'lang/wasm.cc',
'lang/wasm_alien_thread_runner.cc',
'lang/wasm_instance_cache.cc',
'service/strong_consistency/groups_manager.cc',
'service/strong_consistency/coordinator.cc',
'service/strong_consistency/state_machine.cc',
'service/raft/group0_state_id_handler.cc',
'service/raft/group0_state_machine.cc',
'service/raft/group0_state_machine_merger.cc',
@@ -1369,7 +1294,6 @@ scylla_core = (['message/messaging_service.cc',
'vector_search/dns.cc',
'vector_search/client.cc',
'vector_search/clients.cc',
'vector_search/filter.cc',
'vector_search/truststore.cc'
] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core
@@ -1394,8 +1318,6 @@ api = ['api/api.cc',
'api/storage_proxy.cc',
Json2Code('api/api-doc/cache_service.json'),
'api/cache_service.cc',
Json2Code('api/api-doc/client_routes.json'),
'api/client_routes.cc',
Json2Code('api/api-doc/collectd.json'),
'api/collectd.cc',
Json2Code('api/api-doc/endpoint_snitch_info.json'),
@@ -1445,7 +1367,6 @@ alternator = [
'alternator/auth.cc',
'alternator/streams.cc',
'alternator/ttl.cc',
'alternator/http_compression.cc'
]
idls = ['idl/gossip_digest.idl.hh',
@@ -1479,7 +1400,6 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/hinted_handoff.idl.hh',
'idl/storage_proxy.idl.hh',
'idl/sstables.idl.hh',
'idl/strong_consistency/state_machine.idl.hh',
'idl/group0_state_machine.idl.hh',
'idl/mapreduce_request.idl.hh',
'idl/replica_exception.idl.hh',
@@ -1538,7 +1458,6 @@ scylla_perfs = ['test/perf/perf_alternator.cc',
'test/perf/perf_fast_forward.cc',
'test/perf/perf_row_cache_update.cc',
'test/perf/perf_simple_query.cc',
'test/perf/perf_cql_raw.cc',
'test/perf/perf_sstable.cc',
'test/perf/perf_tablets.cc',
'test/perf/tablet_load_balancing.cc',
@@ -1561,6 +1480,7 @@ deps = {
pure_boost_tests = set([
'test/boost/anchorless_list_test',
'test/boost/auth_passwords_test',
'test/boost/auth_resource_test',
'test/boost/big_decimal_test',
'test/boost/caching_options_test',
@@ -1693,7 +1613,6 @@ deps['test/boost/combined_tests'] += [
'test/boost/schema_registry_test.cc',
'test/boost/secondary_index_test.cc',
'test/boost/sessions_test.cc',
'test/boost/simple_value_with_expiry_test.cc',
'test/boost/sstable_compaction_test.cc',
'test/boost/sstable_compressor_factory_test.cc',
'test/boost/sstable_compression_config_test.cc',
@@ -1776,20 +1695,6 @@ deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vector_store_client_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/filter_test'] = ['test/vector_search/filter_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/rescoring_test'] = ['test/vector_search/rescoring_test.cc'] + scylla_tests_dependencies
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
# We need to link these files to all Boost tests to make sure that
# we can execute `--list_json_content` on them. That will produce
# a similar result as calling `--list_content={HRF,DOT}`.
# Unfortunately, to be able to do that, we're forced to link the
# relevant code by hand.
for key in deps.keys():
for prefix in boost_tests_prefixes:
if key.startswith(prefix):
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
wasm_deps = {}
@@ -2095,7 +2000,7 @@ def semicolon_separated(*flags):
def real_relpath(path, start):
return os.path.relpath(os.path.realpath(path), os.path.realpath(start))
def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
def configure_seastar(build_dir, mode, mode_config):
seastar_cxx_ld_flags = mode_config['cxx_ld_flags']
# We want to "undo" coverage for seastar if we have it enabled.
if args.coverage:
@@ -2142,10 +2047,6 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
'-DSeastar_IO_URING=ON',
]
if compiler_cache:
seastar_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
if args.stack_guards is not None:
stack_guards = 'ON' if args.stack_guards else 'OFF'
seastar_cmake_args += ['-DSeastar_STACK_GUARDS={}'.format(stack_guards)]
@@ -2177,7 +2078,7 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
subprocess.check_call(seastar_cmd, shell=False, cwd=cmake_dir)
def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
def configure_abseil(build_dir, mode, mode_config):
abseil_cflags = mode_config['lib_cflags']
cxx_flags = mode_config['cxxflags']
if '-DSANITIZE' in cxx_flags:
@@ -2203,10 +2104,6 @@ def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
'-DABSL_PROPAGATE_CXX_STD=ON',
]
if compiler_cache:
abseil_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
cmake_args = abseil_cmake_args[:]
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
abseil_cmd = ['cmake', '-G', 'Ninja', real_relpath('abseil', abseil_build_dir)] + cmake_args
@@ -2352,6 +2249,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
if debuginfo and mode_config['can_have_debug_info']:
cxxflags += ['-g', '-gz']
if 'clang' in cxx:
# Since AssignmentTracking was enabled by default in clang
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
# coroutine frame debugging info (`coro_frame_ty`) is broken.
#
# It seems that we aren't losing much by disabling AssigmentTracking,
# so for now we choose to disable it to get `coro_frame_ty` back.
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
return cxxflags
@@ -2379,15 +2285,10 @@ def write_build_file(f,
scylla_product,
scylla_version,
scylla_release,
compiler_cache,
args):
use_precompiled_header = not args.disable_precompiled_header
warnings = get_warning_options(args.cxx)
rustc_target = pick_rustc_target('wasm32-wasi', 'wasm32-wasip1')
# If compiler cache is available, prefix the compiler with it
cxx_with_cache = f'{compiler_cache} {args.cxx}' if compiler_cache else args.cxx
# For Rust, sccache is used via RUSTC_WRAPPER environment variable
rustc_wrapper = f'RUSTC_WRAPPER={compiler_cache} ' if compiler_cache and 'sccache' in compiler_cache and args.sccache_rust else ''
f.write(textwrap.dedent('''\
configure_args = {configure_args}
builddir = {outdir}
@@ -2450,7 +2351,7 @@ def write_build_file(f,
command = clang --target=wasm32 --no-standard-libraries -Wl,--export-all -Wl,--no-entry $in -o $out
description = C2WASM $out
rule rust2wasm
command = {rustc_wrapper}cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
command = cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
&& wasm-opt -Oz $builddir/wasm/{rustc_target}/debug/examples/$example.wasm -o $builddir/wasm/$example.wasm $
&& wasm-strip $builddir/wasm/$example.wasm
description = RUST2WASM $out
@@ -2466,7 +2367,7 @@ def write_build_file(f,
command = llvm-profdata merge $in -output=$out
''').format(configure_args=configure_args,
outdir=outdir,
cxx=cxx_with_cache,
cxx=args.cxx,
user_cflags=user_cflags,
warnings=warnings,
defines=defines,
@@ -2474,7 +2375,6 @@ def write_build_file(f,
user_ldflags=user_ldflags,
libs=libs,
rustc_target=rustc_target,
rustc_wrapper=rustc_wrapper,
link_pool_depth=link_pool_depth,
seastar_path=args.seastar_path,
ninja=ninja,
@@ -2559,15 +2459,16 @@ def write_build_file(f,
description = TEST {mode}
# This rule is unused for PGO stages. They use the rust lib from the parent mode.
rule rust_lib.{mode}
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' {rustc_wrapper}cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
&& touch $out
description = RUST_LIB $out
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, rustc_wrapper=rustc_wrapper, **modeval))
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, **modeval))
f.write(
'build {mode}-build: phony {artifacts} {wasms}\n'.format(
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
mode=mode,
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms)]),
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms - vector_search_validator_bins)]),
wasms = str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & wasms)]),
vector_search_validator_bins=str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & vector_search_validator_bins)]),
)
)
if profile_recipe := modes[mode].get('profile_recipe'):
@@ -2597,7 +2498,7 @@ def write_build_file(f,
continue
profile_dep = modes[mode].get('profile_target', "")
if binary in other or binary in wasms:
if binary in other or binary in wasms or binary in vector_search_validator_bins:
continue
srcs = deps[binary]
# 'scylla'
@@ -2625,7 +2526,7 @@ def write_build_file(f,
# In debug/sanitize modes, we compile with fsanitizers,
# so must use the same options during the link:
if '-DSANITIZE' in modes[mode]['cxxflags']:
f.write(' libs = -fsanitize=address -fsanitize=undefined -lubsan\n')
f.write(' libs = -fsanitize=address -fsanitize=undefined\n')
else:
f.write(' libs =\n')
f.write(f'build $builddir/{mode}/{binary}.stripped: strip $builddir/{mode}/{binary}\n')
@@ -2708,10 +2609,11 @@ def write_build_file(f,
)
f.write(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms}\n'.format(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms} {vector_search_validator_bins} \n'.format(
mode=mode,
test_executables=' '.join(['$builddir/{}/{}'.format(mode, binary) for binary in sorted(tests)]),
wasms=' '.join([f'$builddir/{binary}' for binary in sorted(wasms)]),
vector_search_validator_bins=' '.join([f'$builddir/{binary}' for binary in sorted(vector_search_validator_bins)]),
)
)
f.write(
@@ -2798,35 +2700,38 @@ def write_build_file(f,
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
seastar_testing_dep = f'$builddir/{mode}/seastar/libseastar_testing.{seastar_lib_ext}'
f.write(f'build {seastar_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n')
f.write('build {seastar_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/seastar\n')
f.write(' target = seastar\n')
f.write(f'build {seastar_testing_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n')
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
f.write(' target = seastar\n'.format(**locals()))
f.write('build {seastar_testing_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/seastar\n')
f.write(' target = seastar_testing\n')
f.write(f' profile_dep = {profile_dep}\n')
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
f.write(' target = seastar_testing\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
for lib in abseil_libs:
f.write(f'build $builddir/{mode}/abseil/{lib}: ninja $builddir/{mode}/abseil/build.ninja | always {profile_dep}\n')
f.write(f' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/abseil\n')
f.write(f' target = {lib}\n')
f.write(f' profile_dep = {profile_dep}\n')
f.write('build $builddir/{mode}/abseil/{lib}: ninja $builddir/{mode}/abseil/build.ninja | always {profile_dep}\n'.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(' subdir = $builddir/{mode}/abseil\n'.format(**locals()))
f.write(' target = {lib}\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
f.write(f'build $builddir/{mode}/stdafx.hh.pch: cxx_build_precompiled_header.{mode} stdafx.hh | {profile_dep} {seastar_dep} {abseil_dep} {gen_headers_dep} {pch_dep}\n')
f.write(f'build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n')
f.write('build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
f.write(f' subdir = $builddir/{mode}/seastar\n')
f.write(' target = iotune\n')
f.write(f' profile_dep = {profile_dep}\n')
f.write(textwrap.dedent(f'''\
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
f.write(' target = iotune\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
f.write(textwrap.dedent('''\
build $builddir/{mode}/iotune: copy $builddir/{mode}/seastar/apps/iotune/iotune
build $builddir/{mode}/iotune.stripped: strip $builddir/{mode}/iotune
build $builddir/{mode}/iotune.debug: phony $builddir/{mode}/iotune.stripped
'''))
''').format(**locals()))
if args.dist_only:
include_scylla_and_iotune = ''
include_scylla_and_iotune_stripped = ''
@@ -2835,16 +2740,16 @@ def write_build_file(f,
include_scylla_and_iotune = f'$builddir/{mode}/scylla $builddir/{mode}/iotune $builddir/{mode}/patchelf'
include_scylla_and_iotune_stripped = f'$builddir/{mode}/scylla.stripped $builddir/{mode}/iotune.stripped $builddir/{mode}/patchelf.stripped'
include_scylla_and_iotune_debug = f'$builddir/{mode}/scylla.debug $builddir/{mode}/iotune.debug'
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz: package {include_scylla_and_iotune} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter | always\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz: stripped_package {include_scylla_and_iotune_stripped} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.stripped | always\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-debuginfo-{scylla_version}-{scylla_release}.{arch}.tar.gz: debuginfo_package {include_scylla_and_iotune_debug} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.debug | always\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
f.write(f' mode = {mode}\n')
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-{arch}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
f.write(f' mode = {mode}\n')
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz: package {include_scylla_and_iotune} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter | always\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz: stripped_package {include_scylla_and_iotune_stripped} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.stripped | always\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-debuginfo-{scylla_version}-{scylla_release}.{arch}.tar.gz: debuginfo_package {include_scylla_and_iotune_debug} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.debug | always\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-{arch}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n'.format(**locals()))
f.write(' mode = {mode}\n'.format(**locals()))
f.write(f'build $builddir/dist/{mode}/redhat: rpmbuild $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
f.write(f' mode = {mode}\n')
@@ -2879,6 +2784,19 @@ def write_build_file(f,
'build compiler-training: phony {}\n'.format(' '.join(['{mode}-compiler-training'.format(mode=mode) for mode in default_modes]))
)
f.write(textwrap.dedent(f'''\
rule build-vector-search-validator
command = test/vector_search_validator/build-validator $builddir
rule build-vector-store
command = test/vector_search_validator/build-vector-store $builddir
'''))
f.write(
'build $builddir/{vector_search_validator_bin}: build-vector-search-validator {}\n'.format(' '.join([dep for dep in sorted(vector_search_validator_deps)]), vector_search_validator_bin=vector_search_validator_bin)
)
f.write(
'build $builddir/{vector_store_bin}: build-vector-store {}\n'.format(' '.join([dep for dep in sorted(vector_store_deps)]), vector_store_bin=vector_store_bin)
)
f.write(textwrap.dedent(f'''\
build dist-unified-tar: phony {' '.join([f'$builddir/{mode}/dist/tar/{scylla_product}-unified-{scylla_version}-{scylla_release}.{arch}.tar.gz' for mode in default_modes])}
build dist-unified: phony dist-unified-tar
@@ -3004,9 +2922,6 @@ def create_build_system(args):
os.makedirs(outdir, exist_ok=True)
compiler_cache = find_compiler_cache(args.compiler_cache)
resolve_compilers_for_compiler_cache(args, compiler_cache)
scylla_product, scylla_version, scylla_release = generate_version(args.date_stamp)
for mode, mode_config in build_modes.items():
@@ -3023,8 +2938,8 @@ def create_build_system(args):
# {outdir}/{mode}/seastar/build.ninja, and
# {outdir}/{mode}/seastar/seastar.pc is queried for building flags
for mode, mode_config in build_modes.items():
configure_seastar(outdir, mode, mode_config, compiler_cache)
configure_abseil(outdir, mode, mode_config, compiler_cache)
configure_seastar(outdir, mode, mode_config)
configure_abseil(outdir, mode, mode_config)
user_cflags += ' -isystem abseil'
for mode, mode_config in build_modes.items():
@@ -3047,7 +2962,6 @@ def create_build_system(args):
scylla_product,
scylla_version,
scylla_release,
compiler_cache,
args)
generate_compdb('compile_commands.json', ninja, args.buildfile, selected_modes)
@@ -3090,10 +3004,6 @@ def configure_using_cmake(args):
selected_modes = args.selected_modes or default_modes
selected_configs = ';'.join(build_modes[mode].cmake_build_type for mode
in selected_modes)
compiler_cache = find_compiler_cache(args.compiler_cache)
resolve_compilers_for_compiler_cache(args, compiler_cache)
settings = {
'CMAKE_CONFIGURATION_TYPES': selected_configs,
'CMAKE_CROSS_CONFIGS': selected_configs,
@@ -3111,14 +3021,6 @@ def configure_using_cmake(args):
'Scylla_WITH_DEBUG_INFO' : 'ON' if args.debuginfo else 'OFF',
'Scylla_USE_PRECOMPILED_HEADER': 'OFF' if args.disable_precompiled_header else 'ON',
}
if compiler_cache:
settings['CMAKE_CXX_COMPILER_LAUNCHER'] = compiler_cache
settings['CMAKE_C_COMPILER_LAUNCHER'] = compiler_cache
# For Rust, sccache is used via RUSTC_WRAPPER
if 'sccache' in compiler_cache and args.sccache_rust:
settings['Scylla_RUSTC_WRAPPER'] = compiler_cache
if args.date_stamp:
settings['Scylla_DATE_STAMP'] = args.date_stamp
if args.staticboost:
@@ -3150,7 +3052,7 @@ def configure_using_cmake(args):
if not args.dist_only:
for mode in selected_modes:
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode], compiler_cache)
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode])
cmake_command = ['cmake']
cmake_command += [f'-D{var}={value}' for var, value in settings.items()]

View File

@@ -47,10 +47,6 @@ target_sources(cql3
functions/aggregate_fcts.cc
functions/castas_fcts.cc
functions/error_injection_fcts.cc
statements/strong_consistency/select_statement.cc
statements/strong_consistency/modification_statement.cc
statements/strong_consistency/statement_helpers.cc
functions/vector_similarity_fcts.cc
statements/cf_prop_defs.cc
statements/cf_statement.cc
statements/authentication_statement.cc
@@ -75,8 +71,8 @@ target_sources(cql3
statements/raw/parsed_statement.cc
statements/property_definitions.cc
statements/update_statement.cc
statements/broadcast_modification_statement.cc
statements/broadcast_select_statement.cc
statements/strongly_consistent_modification_statement.cc
statements/strongly_consistent_select_statement.cc
statements/delete_statement.cc
statements/prune_materialized_view_statement.cc
statements/batch_statement.cc
@@ -108,7 +104,6 @@ target_sources(cql3
statements/list_service_level_attachments_statement.cc
statements/list_effective_service_level_statement.cc
statements/describe_statement.cc
statements/view_prop_defs.cc
update_parameters.cc
util.cc
ut_name.cc

View File

@@ -389,10 +389,8 @@ selectStatement returns [std::unique_ptr<raw::select_statement> expr]
bool is_ann_ordering = false;
}
: K_SELECT (
( (K_JSON K_DISTINCT)=> K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; }
| (K_JSON selectClause K_FROM)=> K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; }
)?
( (K_DISTINCT selectClause K_FROM)=> K_DISTINCT { is_distinct = true; } )?
( K_JSON { statement_subtype = raw::select_statement::parameters::statement_subtype::JSON; } )?
( K_DISTINCT { is_distinct = true; } )?
sclause=selectClause
)
K_FROM (
@@ -427,7 +425,6 @@ selector returns [shared_ptr<raw_selector> s]
unaliasedSelector returns [uexpression tmp]
: ( c=cident { tmp = unresolved_identifier{std::move(c)}; }
| v=value { tmp = std::move(v); }
| K_COUNT '(' countArgument ')' { tmp = make_count_rows_function_expression(); }
| K_WRITETIME '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::writetime,
unresolved_identifier{std::move(c)}}; }
@@ -450,7 +447,9 @@ selectionFunctionArgs returns [std::vector<expression> a]
countArgument
: '*'
/* COUNT(1) is also allowed, it is recognized via the general function(args) path */
| i=INTEGER { if (i->getText() != "1") {
add_recognition_error("Only COUNT(1) is supported, got COUNT(" + i->getText() + ")");
} }
;
whereClause returns [uexpression clause]
@@ -886,10 +885,6 @@ pkDef[cql3::statements::create_table_statement::raw_statement& expr]
| '(' k1=ident { l.push_back(k1); } ( ',' kn=ident { l.push_back(kn); } )* ')' { $expr.add_key_aliases(l); }
;
cfamProperties[cql3::statements::cf_properties& expr]
: cfamProperty[expr] (K_AND cfamProperty[expr])*
;
cfamProperty[cql3::statements::cf_properties& expr]
: property[*$expr.properties()]
| K_COMPACT K_STORAGE { $expr.set_compact_storage(); }
@@ -927,22 +922,16 @@ typeColumns[create_type_statement& expr]
*/
createIndexStatement returns [std::unique_ptr<create_index_statement> expr]
@init {
auto idx_props = make_shared<index_specific_prop_defs>();
auto props = index_prop_defs();
auto props = make_shared<index_prop_defs>();
bool if_not_exists = false;
auto name = ::make_shared<cql3::index_name>();
std::vector<::shared_ptr<index_target::raw>> targets;
}
: K_CREATE (K_CUSTOM { idx_props->is_custom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
: K_CREATE (K_CUSTOM { props->is_custom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
(idxName[*name])? K_ON cf=columnFamilyName '(' (target1=indexIdent { targets.emplace_back(target1); } (',' target2=indexIdent { targets.emplace_back(target2); } )*)? ')'
(K_USING cls=STRING_LITERAL { idx_props->custom_class = sstring{$cls.text}; })?
(K_WITH cfamProperties[props])?
{
props.extract_index_specific_properties_to(*idx_props);
view_prop_defs view_props = std::move(props).into_view_prop_defs();
$expr = std::make_unique<create_index_statement>(cf, name, targets, std::move(idx_props), std::move(view_props), if_not_exists);
}
(K_USING cls=STRING_LITERAL { props->custom_class = sstring{$cls.text}; })?
(K_WITH properties[*props])?
{ $expr = std::make_unique<create_index_statement>(cf, name, targets, props, if_not_exists); }
;
indexIdent returns [::shared_ptr<index_target::raw> id]
@@ -1090,9 +1079,9 @@ alterTypeStatement returns [std::unique_ptr<alter_type_statement> expr]
*/
alterViewStatement returns [std::unique_ptr<alter_view_statement> expr]
@init {
auto props = cql3::statements::view_prop_defs();
auto props = cql3::statements::cf_prop_defs();
}
: K_ALTER K_MATERIALIZED K_VIEW cf=columnFamilyName K_WITH properties[*props.properties()]
: K_ALTER K_MATERIALIZED K_VIEW cf=columnFamilyName K_WITH properties[props]
{
$expr = std::make_unique<alter_view_statement>(std::move(cf), std::move(props));
}

View File

@@ -25,11 +25,6 @@ public:
NOT_ASSIGNABLE,
};
struct vector_test_result {
test_result result;
std::optional<size_t> dimension_opt;
};
static bool is_assignable(test_result tr) {
return tr != test_result::NOT_ASSIGNABLE;
}
@@ -49,8 +44,6 @@ public:
*/
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const = 0;
virtual vector_test_result test_assignment_any_size_float_vector() const = 0;
virtual std::optional<data_type> assignment_testable_type_opt() const = 0;
// for error reporting

View File

@@ -10,7 +10,6 @@
#include "expr-utils.hh"
#include "evaluate.hh"
#include "cql3/functions/functions.hh"
#include "cql3/functions/aggregate_fcts.hh"
#include "cql3/functions/castas_fcts.hh"
#include "cql3/functions/scalar_function.hh"
#include "cql3/column_identifier.hh"
@@ -1048,47 +1047,8 @@ prepare_function_args_for_type_inference(std::span<const expression> args, data_
return partially_prepared_args;
}
// Special case for count(1) - recognize it as the countRows() function. Note it is quite
// artificial and we might relax it to the more general count(expression) later.
static
std::optional<expression>
try_prepare_count_rows(const expr::function_call& fc, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
return std::visit(overloaded_functor{
[&] (const functions::function_name& name) -> std::optional<expression> {
auto native_name = name;
if (!native_name.has_keyspace()) {
native_name = name.as_native_function();
}
// Collapse count(1) into countRows()
if (native_name == functions::function_name::native_function("count")) {
if (fc.args.size() == 1) {
if (auto uc_arg = expr::as_if<expr::untyped_constant>(&fc.args[0])) {
if (uc_arg->partial_type == expr::untyped_constant::type_class::integer
&& uc_arg->raw_text == "1") {
return expr::function_call{
.func = functions::aggregate_fcts::make_count_rows_function(),
.args = {},
};
} else {
throw exceptions::invalid_request_exception(format("count() expects a column or the literal 1 as an argument", fc.args[0]));
}
}
}
}
return std::nullopt;
},
[] (const shared_ptr<functions::function>&) -> std::optional<expression> {
// Already prepared, nothing to do
return std::nullopt;
},
}, fc.func);
}
std::optional<expression>
prepare_function_call(const expr::function_call& fc, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
if (auto prepared = try_prepare_count_rows(fc, db, keyspace, schema_opt, receiver)) {
return prepared;
}
// Try to extract a column family name from the available information.
// Most functions can be prepared without information about the column family, usually just the keyspace is enough.
// One exception is the token() function - in order to prepare system.token() we have to know the partition key of the table,
@@ -1474,112 +1434,6 @@ test_assignment(const expression& expr, data_dictionary::database db, const sstr
}, expr);
}
template <cql3_type::kind... Kinds>
assignment_testable::vector_test_result
test_assignment_any_size_float_vector(const expression& expr) {
using test_result = assignment_testable::vector_test_result;
const test_result NOT_ASSIGNABLE = {assignment_testable::test_result::NOT_ASSIGNABLE, std::nullopt};
const test_result WEAKLY_ASSIGNABLE = {assignment_testable::test_result::WEAKLY_ASSIGNABLE, std::nullopt};
auto is_float_or_bind = [] (const expression& e) {
return expr::visit(overloaded_functor{
[] (const bind_variable&) {
return true;
},
[] (const untyped_constant& uc) {
return uc.partial_type == untyped_constant::type_class::floating_point
|| uc.partial_type == untyped_constant::type_class::integer;
},
[] (const constant& value) {
auto kind = value.type->as_cql3_type().get_kind();
return cql3_type::kind_enum_set::frozen<Kinds...>().contains(kind);
},
[] (const auto&) {
return false;
},
}, e);
};
auto validate_assignment = [&] (const data_type& dt) -> test_result {
auto vt = dynamic_pointer_cast<const vector_type_impl>(dt->underlying_type());
if (!vt) {
return NOT_ASSIGNABLE;
}
auto elem_kind = vt->get_elements_type()->as_cql3_type().get_kind();
if (cql3_type::kind_enum_set::frozen<Kinds...>().contains(elem_kind)) {
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, vt->get_dimension()};
}
return NOT_ASSIGNABLE;
};
return expr::visit(overloaded_functor{
[&] (const constant& value) -> test_result {
return validate_assignment(value.type);
},
[&] (const binary_operator&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const conjunction&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const column_value& col_val) -> test_result {
return validate_assignment(col_val.col->type);
},
[&] (const subscript&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const unresolved_identifier& ui) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const column_mutation_attribute& cma) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const function_call& fc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const cast& c) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const field_selection& fs) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const bind_variable& bv) -> test_result {
return WEAKLY_ASSIGNABLE;
},
[&] (const untyped_constant& uc) -> test_result {
return uc.partial_type == untyped_constant::type_class::null
? WEAKLY_ASSIGNABLE
: NOT_ASSIGNABLE;
},
[&] (const tuple_constructor& tc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const collection_constructor& c) -> test_result {
switch (c.style) {
case collection_constructor::style_type::list_or_vector: {
if(std::ranges::all_of(c.elements, is_float_or_bind)) {
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, c.elements.size()};
}
return NOT_ASSIGNABLE;
}
case collection_constructor::style_type::set: return NOT_ASSIGNABLE;
case collection_constructor::style_type::map: return NOT_ASSIGNABLE;
case collection_constructor::style_type::vector:
on_internal_error(expr_logger, "vector style type found in test_assignment, should have been introduced post-prepare");
}
on_internal_error(expr_logger, fmt::format("unexpected collection_constructor style {}", static_cast<unsigned>(c.style)));
},
[&] (const usertype_constructor& uc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const temporary& t) -> test_result {
return NOT_ASSIGNABLE;
},
}, expr);
}
assignment_testable::vector_test_result
test_assignment_any_size_float_vector(const expression& expr) {
return test_assignment_any_size_float_vector<cql3_type::kind::FLOAT, cql3_type::kind::DOUBLE>(expr);
}
expression
prepare_expression(const expression& expr, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
auto e_opt = try_prepare_expression(expr, db, keyspace, schema_opt, std::move(receiver));
@@ -1613,9 +1467,6 @@ public:
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const override {
return expr::test_assignment(_e, db, keyspace, schema_opt, receiver);
}
virtual vector_test_result test_assignment_any_size_float_vector() const override {
return expr::test_assignment_any_size_float_vector(_e);
}
virtual sstring assignment_testable_source_context() const override {
return fmt::format("{}", _e);
}

View File

@@ -16,7 +16,6 @@
#include "cql3/functions/user_function.hh"
#include "cql3/functions/user_aggregate.hh"
#include "cql3/functions/uuid_fcts.hh"
#include "cql3/functions/vector_similarity_fcts.hh"
#include "data_dictionary/data_dictionary.hh"
#include "as_json_function.hh"
#include "cql3/prepare_context.hh"
@@ -399,14 +398,6 @@ functions::get(data_dictionary::database db,
}
});
const auto func_name = name.has_keyspace() ? name : name.as_native_function();
if (SIMILARITY_FUNCTIONS.contains(func_name)) {
auto arg_types = retrieve_vector_arg_types(func_name, provided_args);
auto fun = ::make_shared<vector_similarity_fct>(func_name.name, arg_types);
validate_types(db, keyspace, schema.get(), fun, provided_args, receiver_ks, receiver_cf);
return fun;
}
if (name.has_keyspace()
? name == TOKEN_FUNCTION_NAME
: name.name == TOKEN_FUNCTION_NAME.name) {

View File

@@ -1,184 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "vector_similarity_fcts.hh"
#include "types/types.hh"
#include "types/vector.hh"
#include "exceptions/exceptions.hh"
#include <span>
#include <bit>
namespace cql3 {
namespace functions {
namespace detail {
std::vector<float> extract_float_vector(const bytes_opt& param, size_t dimension) {
if (!param) {
throw exceptions::invalid_request_exception("Cannot extract float vector from null parameter");
}
const size_t expected_size = dimension * sizeof(float);
if (param->size() != expected_size) {
throw exceptions::invalid_request_exception(
fmt::format("Invalid vector size: expected {} bytes for {} floats, got {} bytes",
expected_size, dimension, param->size()));
}
std::vector<float> result;
result.reserve(dimension);
bytes_view view(*param);
for (size_t i = 0; i < dimension; ++i) {
// read_simple handles network byte order (big-endian) conversion
uint32_t raw = read_simple<uint32_t>(view);
result.push_back(std::bit_cast<float>(raw));
}
return result;
}
} // namespace detail
namespace {
// The computations of similarity scores match the exact formulas of Cassandra's (jVector's) implementation to ensure compatibility.
// There exist tests checking the compliance of the results.
// Reference:
// https://github.com/datastax/jvector/blob/f967f1c9249035b63b55a566fac7d4dc38380349/jvector-base/src/main/java/io/github/jbellis/jvector/vector/VectorSimilarityFunction.java#L36-L69
// You should only use this function if you need to preserve the original vectors and cannot normalize
// them in advance.
float compute_cosine_similarity(std::span<const float> v1, std::span<const float> v2) {
double dot_product = 0.0;
double squared_norm_a = 0.0;
double squared_norm_b = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = v1[i];
double b = v2[i];
dot_product += a * b;
squared_norm_a += a * a;
squared_norm_b += b * b;
}
if (squared_norm_a == 0 || squared_norm_b == 0) {
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
}
// The cosine similarity is in the range [-1, 1].
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
// for consistency with other similarity functions.
return (1 + (dot_product / (std::sqrt(squared_norm_a * squared_norm_b)))) / 2;
}
float compute_euclidean_similarity(std::span<const float> v1, std::span<const float> v2) {
double sum = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = v1[i];
double b = v2[i];
double diff = a - b;
sum += diff * diff;
}
// The squared Euclidean (L2) distance is of range [0, inf).
// It is mapped to a similarity score in the range (0, 1] (0 -> 1, inf -> 0)
// for consistency with other similarity functions.
return (1 / (1 + sum));
}
// Assumes that both vectors are L2-normalized.
// This similarity is intended as an optimized way to perform cosine similarity calculation.
float compute_dot_product_similarity(std::span<const float> v1, std::span<const float> v2) {
double dot_product = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = v1[i];
double b = v2[i];
dot_product += a * b;
}
// The dot product is in the range [-1, 1] for L2-normalized vectors.
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
// for consistency with other similarity functions.
return ((1 + dot_product) / 2);
}
} // namespace
thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS = {
{SIMILARITY_COSINE_FUNCTION_NAME, compute_cosine_similarity},
{SIMILARITY_EUCLIDEAN_FUNCTION_NAME, compute_euclidean_similarity},
{SIMILARITY_DOT_PRODUCT_FUNCTION_NAME, compute_dot_product_similarity},
};
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args) {
if (provided_args.size() != 2) {
throw exceptions::invalid_request_exception(fmt::format("Invalid number of arguments for function {}(vector<float, n>, vector<float, n>)", name));
}
auto [first_result, first_dim_opt] = provided_args[0]->test_assignment_any_size_float_vector();
auto [second_result, second_dim_opt] = provided_args[1]->test_assignment_any_size_float_vector();
auto invalid_type_error_message = [&name](const shared_ptr<assignment_testable>& arg) {
auto type = arg->assignment_testable_type_opt();
const auto& source_context = arg->assignment_testable_source_context();
if (type) {
return fmt::format("Function {} requires a float vector argument, but found {} of type {}", name, source_context, type.value()->cql3_type_name());
} else {
return fmt::format("Function {} requires a float vector argument, but found {}", name, source_context);
}
};
if (!is_assignable(first_result)) {
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[0]));
}
if (!is_assignable(second_result)) {
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[1]));
}
if (!first_dim_opt && !second_dim_opt) {
throw exceptions::invalid_request_exception(fmt::format("Cannot infer type of argument {} for function {}(vector<float, n>, vector<float, n>)",
provided_args[0]->assignment_testable_source_context(), name));
}
if (first_dim_opt && second_dim_opt) {
if (*first_dim_opt != *second_dim_opt) {
throw exceptions::invalid_request_exception(fmt::format(
"All arguments must have the same vector dimensions, but found vector<float, {}> and vector<float, {}>", *first_dim_opt, *second_dim_opt));
}
}
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
auto type = vector_type_impl::get_instance(float_type, dimension);
return {type, type};
}
bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters) {
if (std::any_of(parameters.begin(), parameters.end(), [](const auto& param) {
return !param;
})) {
return std::nullopt;
}
// Extract dimension from the vector type
const auto& type = static_cast<const vector_type_impl&>(*arg_types()[0]);
size_t dimension = type.get_dimension();
// Optimized path: extract floats directly from bytes, bypassing data_value overhead
std::vector<float> v1 = detail::extract_float_vector(parameters[0], dimension);
std::vector<float> v2 = detail::extract_float_vector(parameters[1], dimension);
float result = SIMILARITY_FUNCTIONS.at(_name)(v1, v2);
return float_type->decompose(result);
}
} // namespace functions
} // namespace cql3

View File

@@ -1,47 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "native_scalar_function.hh"
#include "cql3/assignment_testable.hh"
#include "cql3/functions/function_name.hh"
#include <span>
namespace cql3 {
namespace functions {
static const function_name SIMILARITY_COSINE_FUNCTION_NAME = function_name::native_function("similarity_cosine");
static const function_name SIMILARITY_EUCLIDEAN_FUNCTION_NAME = function_name::native_function("similarity_euclidean");
static const function_name SIMILARITY_DOT_PRODUCT_FUNCTION_NAME = function_name::native_function("similarity_dot_product");
using similarity_function_t = float (*)(std::span<const float>, std::span<const float>);
extern thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS;
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args);
class vector_similarity_fct : public native_scalar_function {
public:
vector_similarity_fct(const sstring& name, const std::vector<data_type>& arg_types)
: native_scalar_function(name, float_type, arg_types) {
}
virtual bytes_opt execute(std::span<const bytes_opt> parameters) override;
};
namespace detail {
// Extract float vector directly from serialized bytes, bypassing data_value overhead.
// This is an internal API exposed for testing purposes.
// Vector<float, N> wire format: N floats as big-endian uint32_t values, 4 bytes each.
std::vector<float> extract_float_vector(const bytes_opt& param, size_t dimension);
} // namespace detail
} // namespace functions
} // namespace cql3

View File

@@ -14,7 +14,6 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/try_future.hh>
#include "service/storage_proxy.hh"
#include "service/migration_manager.hh"
@@ -48,10 +47,8 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono
struct query_processor::remote {
remote(service::migration_manager& mm, service::mapreduce_service& fwd,
service::storage_service& ss, service::raft_group0_client& group0_client,
service::strong_consistency::coordinator& _sc_coordinator)
service::storage_service& ss, service::raft_group0_client& group0_client)
: mm(mm), mapreducer(fwd), ss(ss), group0_client(group0_client)
, sc_coordinator(_sc_coordinator)
, gate("query_processor::remote")
{}
@@ -59,7 +56,6 @@ struct query_processor::remote {
service::mapreduce_service& mapreducer;
service::storage_service& ss;
service::raft_group0_client& group0_client;
service::strong_consistency::coordinator& sc_coordinator;
seastar::named_gate gate;
};
@@ -68,10 +64,6 @@ bool query_processor::topology_global_queue_empty() {
return remote().first.get().ss.topology_global_queue_empty();
}
future<bool> query_processor::ongoing_rf_change(const service::group0_guard& guard, sstring ks) {
return remote().first.get().ss.ongoing_rf_change(guard, std::move(ks));
}
static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
@@ -517,16 +509,9 @@ query_processor::~query_processor() {
}
}
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
query_processor::acquire_strongly_consistent_coordinator() {
auto [remote_, holder] = remote();
return {remote_.get().sc_coordinator, std::move(holder)};
}
void query_processor::start_remote(service::migration_manager& mm, service::mapreduce_service& mapreducer,
service::storage_service& ss, service::raft_group0_client& group0_client,
service::strong_consistency::coordinator& sc_coordinator) {
_remote = std::make_unique<struct remote>(mm, mapreducer, ss, group0_client, sc_coordinator);
service::storage_service& ss, service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, mapreducer, ss, group0_client);
}
future<> query_processor::stop_remote() {
@@ -870,7 +855,6 @@ struct internal_query_state {
sstring query_string;
std::unique_ptr<query_options> opts;
statements::prepared_statement::checked_weak_ptr p;
std::optional<service::query_state> qs;
bool more_results = true;
};
@@ -878,14 +862,10 @@ internal_query_state query_processor::create_paged_state(
const sstring& query_string,
db::consistency_level cl,
const data_value_list& values,
int32_t page_size,
std::optional<service::query_state> qs) {
int32_t page_size) {
auto p = prepare_internal(query_string);
auto opts = make_internal_options(p, values, cl, page_size);
if (!qs) {
qs.emplace(query_state_for_internal_call());
}
return internal_query_state{query_string, std::make_unique<cql3::query_options>(std::move(opts)), std::move(p), std::move(qs), true};
return internal_query_state{query_string, std::make_unique<cql3::query_options>(std::move(opts)), std::move(p), true};
}
bool query_processor::has_more_results(cql3::internal_query_state& state) const {
@@ -908,8 +888,9 @@ future<> query_processor::for_each_cql_result(
future<::shared_ptr<untyped_result_set>>
query_processor::execute_paged_internal(internal_query_state& state) {
state.p->statement->validate(*this, service::client_state::for_internal_calls());
auto qs = query_state_for_internal_call();
::shared_ptr<cql_transport::messages::result_message> msg =
co_await state.p->statement->execute(*this, *state.qs, *state.opts, std::nullopt);
co_await state.p->statement->execute(*this, qs, *state.opts, std::nullopt);
class visitor : public result_message::visitor_base {
internal_query_state& _state;
@@ -1008,7 +989,7 @@ query_processor::execute_with_params(
auto opts = make_internal_options(p, values, cl);
auto statement = p->statement;
auto msg = co_await coroutine::try_future(execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params));
auto msg = co_await execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params);
co_return ::make_shared<untyped_result_set>(msg);
}
@@ -1018,7 +999,7 @@ query_processor::do_execute_with_params(
shared_ptr<cql_statement> statement,
const query_options& options, std::optional<service::group0_guard> guard) {
statement->validate(*this, service::client_state::for_internal_calls());
co_return co_await coroutine::try_future(statement->execute(*this, query_state, options, std::move(guard)));
co_return co_await statement->execute(*this, query_state, options, std::move(guard));
}
@@ -1216,9 +1197,8 @@ future<> query_processor::query_internal(
db::consistency_level cl,
const data_value_list& values,
int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f,
std::optional<service::query_state> qs) {
auto query_state = create_paged_state(query_string, cl, values, page_size, std::move(qs));
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f) {
auto query_state = create_paged_state(query_string, cl, values, page_size);
co_return co_await for_each_cql_result(query_state, std::move(f));
}

View File

@@ -44,10 +44,6 @@ class query_state;
class mapreduce_service;
class raft_group0_client;
namespace strong_consistency {
class coordinator;
}
namespace broadcast_tables {
struct query;
}
@@ -159,8 +155,7 @@ public:
~query_processor();
void start_remote(service::migration_manager&, service::mapreduce_service&,
service::storage_service& ss, service::raft_group0_client&,
service::strong_consistency::coordinator&);
service::storage_service& ss, service::raft_group0_client&);
future<> stop_remote();
data_dictionary::database db() {
@@ -179,9 +174,6 @@ public:
return _proxy;
}
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
acquire_strongly_consistent_coordinator();
cql_stats& get_cql_stats() {
return _cql_stats;
}
@@ -330,7 +322,6 @@ public:
* page_size - maximum page size
* f - a function to be run on each row of the query result,
* if the function returns stop_iteration::yes the iteration will stop
* qs - optional query state (default: std::nullopt)
*
* \note This function is optimized for convenience, not performance.
*/
@@ -339,8 +330,7 @@ public:
db::consistency_level cl,
const data_value_list& values,
int32_t page_size,
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f,
std::optional<service::query_state> qs = std::nullopt);
noncopyable_function<future<stop_iteration>(const cql3::untyped_result_set_row&)> f);
/*
* \brief iterate over all cql results using paging
@@ -484,7 +474,6 @@ public:
void reset_cache();
bool topology_global_queue_empty();
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
query_options make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,
@@ -509,8 +498,7 @@ private:
const sstring& query_string,
db::consistency_level,
const data_value_list& values,
int32_t page_size,
std::optional<service::query_state> qs = std::nullopt);
int32_t page_size);
/*!
* \brief run a query using paging

View File

@@ -1322,10 +1322,6 @@ const std::vector<expr::expression>& statement_restrictions::index_restrictions(
return _index_restrictions;
}
bool statement_restrictions::is_empty() const {
return !_where.has_value();
}
// Current score table:
// local and restrictions include full partition key: 2
// global: 1

View File

@@ -408,8 +408,6 @@ public:
/// Checks that the primary key restrictions don't contain null values, throws invalid_request_exception otherwise.
void validate_primary_key(const query_options& options) const;
bool is_empty() const;
};
statement_restrictions analyze_statement_restrictions(

View File

@@ -46,13 +46,6 @@ void metadata::add_non_serialized_column(lw_shared_ptr<column_specification> nam
_column_info->_names.emplace_back(std::move(name));
}
void metadata::hide_last_column() {
if (_column_info->_column_count == 0) {
utils::on_internal_error("Trying to hide a column when there are no columns visible.");
}
_column_info->_column_count--;
}
void metadata::set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state) {
_flags.set<flag::HAS_MORE_PAGES>();
_paging_state = std::move(paging_state);

View File

@@ -73,7 +73,6 @@ public:
uint32_t value_count() const;
void add_non_serialized_column(lw_shared_ptr<column_specification> name);
void hide_last_column();
public:
void set_paging_state(lw_shared_ptr<const service::pager::paging_state> paging_state);

View File

@@ -32,7 +32,7 @@ bool
selectable_processes_selection(const expr::expression& selectable) {
return expr::visit(overloaded_functor{
[&] (const expr::constant&) -> bool {
return true;
on_internal_error(slogger, "no way to express SELECT constant in the grammar yet");
},
[&] (const expr::conjunction& conj) -> bool {
on_internal_error(slogger, "no way to express 'SELECT a AND b' in the grammar yet");

View File

@@ -19,7 +19,6 @@
#include "locator/abstract_replication_strategy.hh"
#include "mutation/canonical_mutation.hh"
#include "prepared_statement.hh"
#include <seastar/coroutine/exception.hh>
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
@@ -139,7 +138,6 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
using namespace cql_transport;
bool unknown_keyspace = false;
try {
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
auto ks = qp.db().find_keyspace(_name);
@@ -160,12 +158,8 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
// when in reality nothing or only schema is being changed
if (changes_tablets(qp)) {
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
if (qp.proxy().features().rack_list_rf && co_await qp.ongoing_rf_change(mc.guard(),_name)) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception(format("Another RF change for this keyspace {} ongoing, please retry.", _name)));
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
qp.db().real_database().validate_keyspace_update(*ks_md_update);
@@ -206,9 +200,8 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
locator::replication_strategy_params(ks_md_update->strategy_options(), ks_md_update->initial_tablets(), ks_md_update->consistency_option()),
topo);
// If RF-rack-validity must be enforced for the keyspace according to `enforce_rf_rack_validity_for_keyspace`,
// it's forbidden to perform a schema change that would lead to an RF-rack-invalid keyspace.
// Verify that this change does not.
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to perform a schema change that
// would lead to an RF-rack-valid keyspace. Verify that this change does not.
// For more context, see: scylladb/scylladb#23071.
try {
// There are two things to note here:
@@ -225,13 +218,14 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
// The second hyphen is not really true because currently topological changes can
// disturb it (see scylladb/scylladb#23345), but we ignore that.
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
} catch (const std::invalid_argument& e) {
if (replica::database::enforce_rf_rack_validity_for_keyspace(qp.db().get_config(), *ks_md)) {
// wrap the exception manually here in a type that can be passed to the user.
} catch (const std::exception& e) {
if (qp.db().get_config().rf_rack_valid_keyspaces()) {
// There's no guarantee what the type of the exception will be, so we need to
// wrap it manually here in a type that can be passed to the user.
throw exceptions::invalid_request_exception(e.what());
} else {
// Even when RF-rack-validity is not enforced for the keyspace, we'd
// like to inform the user that the keyspace they're altering will not
// Even when the configuration option `rf_rack_valid_keyspaces` is set to false,
// we'd like to inform the user that the keyspace they're altering will not
// satisfy the restriction after the change--but just as a warning.
// For more context, see issue: scylladb/scylladb#23330.
warnings.push_back(seastar::format(
@@ -248,15 +242,10 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
target_type,
keyspace());
mc.add_mutations(std::move(muts), "CQL alter keyspace");
co_return std::make_tuple(std::move(ret), warnings);
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), warnings));
} catch (data_dictionary::no_such_keyspace& e) {
unknown_keyspace = true;
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
}
if (unknown_keyspace) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception("Unknown keyspace " + _name));
}
std::unreachable();
}
std::unique_ptr<cql3::statements::prepared_statement>

Some files were not shown because too many files have changed in this diff Show More