mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
Compare commits
4 Commits
ykaul/comp
...
copilot/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb1ab98fc9 | ||
|
|
bdbc47a333 | ||
|
|
5407b6b43d | ||
|
|
8582156257 |
9
.github/CODEOWNERS
vendored
9
.github/CODEOWNERS
vendored
@@ -1,5 +1,5 @@
|
||||
# AUTH
|
||||
auth/* @nuivall
|
||||
auth/* @nuivall @ptrsmrn
|
||||
|
||||
# CACHE
|
||||
row_cache* @tgrabiec
|
||||
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
|
||||
transport/*
|
||||
|
||||
# CQL QUERY LANGUAGE
|
||||
cql3/* @tgrabiec @nuivall
|
||||
cql3/* @tgrabiec @nuivall @ptrsmrn
|
||||
|
||||
# COUNTERS
|
||||
counters* @nuivall
|
||||
tests/counter_test* @nuivall
|
||||
counters* @nuivall @ptrsmrn
|
||||
tests/counter_test* @nuivall @ptrsmrn
|
||||
|
||||
# DOCS
|
||||
docs/* @annastuchlik @tzach
|
||||
@@ -57,6 +57,7 @@ repair/* @tgrabiec @asias
|
||||
|
||||
# SCHEMA MANAGEMENT
|
||||
db/schema_tables* @tgrabiec
|
||||
db/legacy_schema_migrator* @tgrabiec
|
||||
service/migration* @tgrabiec
|
||||
schema* @tgrabiec
|
||||
|
||||
|
||||
29
.github/copilot-instructions.md
vendored
29
.github/copilot-instructions.md
vendored
@@ -55,26 +55,22 @@ ninja build/<mode>/test/boost/<test_name>
|
||||
ninja build/<mode>/scylla
|
||||
|
||||
# Run all tests in a file
|
||||
./test.py --mode=<mode> test/<suite>/<test_name>.py
|
||||
./test.py --mode=<mode> <test_path>
|
||||
|
||||
# Run a single test case from a file
|
||||
./test.py --mode=<mode> test/<suite>/<test_name>.py::<test_function_name>
|
||||
|
||||
# Run all tests in a directory
|
||||
./test.py --mode=<mode> test/<suite>/
|
||||
./test.py --mode=<mode> <test_path>::<test_function_name>
|
||||
|
||||
# Examples
|
||||
./test.py --mode=dev test/alternator/
|
||||
./test.py --mode=dev test/cluster/test_raft_voters.py::test_raft_limited_voters_retain_coordinator
|
||||
./test.py --mode=dev test/cqlpy/test_json.py
|
||||
./test.py --mode=dev alternator/
|
||||
./test.py --mode=dev cluster/test_raft_voters::test_raft_limited_voters_retain_coordinator
|
||||
|
||||
# Optional flags
|
||||
./test.py --mode=dev test/cluster/test_raft_no_quorum.py -v # Verbose output
|
||||
./test.py --mode=dev test/cluster/test_raft_no_quorum.py --repeat 5 # Repeat test 5 times
|
||||
./test.py --mode=dev cluster/test_raft_no_quorum -v # Verbose output
|
||||
./test.py --mode=dev cluster/test_raft_no_quorum --repeat 5 # Repeat test 5 times
|
||||
```
|
||||
|
||||
**Important:**
|
||||
- Use full path with `.py` extension (e.g., `test/cluster/test_raft_no_quorum.py`, not `cluster/test_raft_no_quorum`)
|
||||
- Use path without `.py` extension (e.g., `cluster/test_raft_no_quorum`, not `cluster/test_raft_no_quorum.py`)
|
||||
- To run a single test case, append `::<test_function_name>` to the file path
|
||||
- Add `-v` for verbose output
|
||||
- Add `--repeat <num>` to repeat a test multiple times
|
||||
@@ -88,14 +84,3 @@ ninja build/<mode>/scylla
|
||||
- Strive for simplicity and clarity, add complexity only when clearly justified
|
||||
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
|
||||
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context
|
||||
|
||||
## Test Philosophy
|
||||
- Performance matters. Tests should run as quickly as possible. Sleeps in the code are highly discouraged and should be avoided, to reduce run time and flakiness.
|
||||
- Stability matters. Tests should be stable. New tests should be executed 100 times at least to ensure they pass 100 out of 100 times. (use --repeat 100 --max-failures 1 when running it)
|
||||
- Unit tests should ideally test one thing and one thing only.
|
||||
- Tests for bug fixes should run before the fix - and show the failure and after the fix - and show they now pass.
|
||||
- Tests for bug fixes should have in their comments which bug fixes (GitHub or JIRA issue) they test.
|
||||
- Tests in debug are always slower, so if needed, reduce number of iterations, rows, data used, cycles, etc. in debug mode.
|
||||
- Tests should strive to be repeatable, and not use random input that will make their results unpredictable.
|
||||
- Tests should consume as little resources as possible. Prefer running tests on a single node if it is sufficient, for example.
|
||||
|
||||
|
||||
2
.github/dependabot.yml
vendored
2
.github/dependabot.yml
vendored
@@ -1,6 +1,6 @@
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "uv"
|
||||
- package-ecosystem: "pip"
|
||||
directory: "/docs"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
|
||||
2
.github/scripts/auto-backport.py
vendored
2
.github/scripts/auto-backport.py
vendored
@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
|
||||
if is_draft:
|
||||
labels_to_add.append("conflicts")
|
||||
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
|
||||
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
|
||||
pr_comment += "Please resolve them and mark this PR as ready for review"
|
||||
backport_pr.create_issue_comment(pr_comment)
|
||||
|
||||
# Apply all labels at once if we have any
|
||||
|
||||
2
.github/scripts/check-license.py
vendored
2
.github/scripts/check-license.py
vendored
@@ -4,7 +4,7 @@
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import argparse
|
||||
|
||||
@@ -8,9 +8,6 @@ on:
|
||||
jobs:
|
||||
check-fixes-prefix:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
steps:
|
||||
- name: Check PR body for "Fixes" prefix patterns
|
||||
uses: actions/github-script@v7
|
||||
@@ -21,7 +18,7 @@ jobs:
|
||||
|
||||
// Regular expression pattern to check for "Fixes" prefix
|
||||
// Adjusted to dynamically insert the repository full name
|
||||
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|(?:https://scylladb\\.atlassian\\.net/browse/)?([A-Z]+-\\d+))`;
|
||||
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
|
||||
const regex = new RegExp(pattern);
|
||||
|
||||
if (!regex.test(body)) {
|
||||
|
||||
53
.github/workflows/call_backport_with_jira.yaml
vendored
53
.github/workflows/call_backport_with_jira.yaml
vendored
@@ -1,53 +0,0 @@
|
||||
name: Backport with Jira Integration
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- next-*.*
|
||||
- branch-*.*
|
||||
pull_request_target:
|
||||
types: [labeled, closed]
|
||||
branches:
|
||||
- master
|
||||
- next
|
||||
- next-*.*
|
||||
- branch-*.*
|
||||
|
||||
jobs:
|
||||
backport-on-push:
|
||||
if: github.event_name == 'push'
|
||||
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
|
||||
with:
|
||||
event_type: 'push'
|
||||
base_branch: ${{ github.ref }}
|
||||
commits: ${{ github.event.before }}..${{ github.sha }}
|
||||
secrets:
|
||||
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
backport-on-label:
|
||||
if: github.event_name == 'pull_request_target' && github.event.action == 'labeled'
|
||||
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
|
||||
with:
|
||||
event_type: 'labeled'
|
||||
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
|
||||
pull_request_number: ${{ github.event.pull_request.number }}
|
||||
head_commit: ${{ github.event.pull_request.base.sha }}
|
||||
label_name: ${{ github.event.label.name }}
|
||||
pr_state: ${{ github.event.pull_request.state }}
|
||||
secrets:
|
||||
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
backport-chain:
|
||||
if: github.event_name == 'pull_request_target' && github.event.action == 'closed' && github.event.pull_request.merged == true
|
||||
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
|
||||
with:
|
||||
event_type: 'chain'
|
||||
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
|
||||
pull_request_number: ${{ github.event.pull_request.number }}
|
||||
pr_body: ${{ github.event.pull_request.body }}
|
||||
secrets:
|
||||
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
12
.github/workflows/call_jira_status_in_progress.yml
vendored
Normal file
12
.github/workflows/call_jira_status_in_progress.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status In Progress
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened]
|
||||
|
||||
jobs:
|
||||
call-jira-status-in-progress:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
12
.github/workflows/call_jira_status_in_review.yml
vendored
Normal file
12
.github/workflows/call_jira_status_in_review.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status In Review
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [ready_for_review, review_requested]
|
||||
|
||||
jobs:
|
||||
call-jira-status-in-review:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
12
.github/workflows/call_jira_status_ready_for_merge.yml
vendored
Normal file
12
.github/workflows/call_jira_status_ready_for_merge.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status Ready For Merge
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [labeled]
|
||||
|
||||
jobs:
|
||||
call-jira-status-update:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
18
.github/workflows/call_jira_sync.yml
vendored
18
.github/workflows/call_jira_sync.yml
vendored
@@ -1,18 +0,0 @@
|
||||
name: Sync Jira Based on PR Events
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened, edited, ready_for_review, review_requested, labeled, unlabeled, closed]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: write
|
||||
issues: write
|
||||
|
||||
jobs:
|
||||
jira-sync:
|
||||
uses: scylladb/github-automation/.github/workflows/main_pr_events_jira_sync.yml@main
|
||||
with:
|
||||
caller_action: ${{ github.event.action }}
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
@@ -1,22 +0,0 @@
|
||||
name: Sync Jira Based on PR Milestone Events
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [milestoned, demilestoned]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: read
|
||||
|
||||
jobs:
|
||||
jira-sync-milestone-set:
|
||||
if: github.event.action == 'milestoned'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_set.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-milestone-removed:
|
||||
if: github.event.action == 'demilestoned'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_removed.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
@@ -1,14 +0,0 @@
|
||||
name: Call Jira release creation for new milestone
|
||||
|
||||
on:
|
||||
milestone:
|
||||
types: [created, closed]
|
||||
|
||||
jobs:
|
||||
sync-milestone-to-jira:
|
||||
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
|
||||
with:
|
||||
# Comma-separated list of Jira project keys
|
||||
jira_project_keys: "SCYLLADB,CUSTOMER,SMI,RELENG,VECTOR"
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
@@ -1,18 +0,0 @@
|
||||
name: validate_pr_author_email
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types:
|
||||
- opened
|
||||
- synchronize
|
||||
- reopened
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: write
|
||||
statuses: write
|
||||
|
||||
jobs:
|
||||
validate_pr_author_email:
|
||||
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main
|
||||
|
||||
2
.github/workflows/check-license-header.yaml
vendored
2
.github/workflows/check-license-header.yaml
vendored
@@ -7,7 +7,7 @@ on:
|
||||
|
||||
env:
|
||||
HEADER_CHECK_LINES: 10
|
||||
LICENSE: "LicenseRef-ScyllaDB-Source-Available-1.1"
|
||||
LICENSE: "LicenseRef-ScyllaDB-Source-Available-1.0"
|
||||
CHECKED_EXTENSIONS: ".cc .hh .py"
|
||||
|
||||
jobs:
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
name: Close issues created by Scylla associates
|
||||
|
||||
on:
|
||||
issues:
|
||||
types: [opened, reopened]
|
||||
|
||||
permissions:
|
||||
issues: write
|
||||
|
||||
jobs:
|
||||
comment-and-close:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Comment and close if author email is scylladb.com
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
script: |
|
||||
const issue = context.payload.issue;
|
||||
const actor = context.actor;
|
||||
|
||||
// Get user data (only public email is available)
|
||||
const { data: user } = await github.rest.users.getByUsername({
|
||||
username: actor,
|
||||
});
|
||||
|
||||
const email = user.email || "";
|
||||
console.log(`Actor: ${actor}, public email: ${email || "<none>"}`);
|
||||
|
||||
// Only continue if email exists and ends with @scylladb.com
|
||||
if (!email || !email.toLowerCase().endsWith("@scylladb.com")) {
|
||||
console.log("User is not a scylladb.com email (or email not public); skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
const owner = context.repo.owner;
|
||||
const repo = context.repo.repo;
|
||||
const issue_number = issue.number;
|
||||
|
||||
const body = "Issues in this repository are closed automatically. Scylla associates should use Jira to manage issues.\nPlease move this issue to Jira https://scylladb.atlassian.net/jira/software/c/projects/SCYLLADB/list";
|
||||
|
||||
// Add the comment
|
||||
await github.rest.issues.createComment({
|
||||
owner,
|
||||
repo,
|
||||
issue_number,
|
||||
body,
|
||||
});
|
||||
|
||||
console.log(`Comment added to #${issue_number}`);
|
||||
|
||||
// Close the issue
|
||||
await github.rest.issues.update({
|
||||
owner,
|
||||
repo,
|
||||
issue_number,
|
||||
state: "closed",
|
||||
state_reason: "not_planned"
|
||||
});
|
||||
|
||||
console.log(`Issue #${issue_number} closed.`);
|
||||
2
.github/workflows/codespell.yaml
vendored
2
.github/workflows/codespell.yaml
vendored
@@ -13,5 +13,5 @@ jobs:
|
||||
- uses: codespell-project/actions-codespell@master
|
||||
with:
|
||||
only_warn: 1
|
||||
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison,iif,tread"
|
||||
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison"
|
||||
skip: "./.git,./build,./tools,*.js,*.lock,./test,./licenses,./redis/lolwut.cc,*.svg"
|
||||
|
||||
8
.github/workflows/docs-pages.yaml
vendored
8
.github/workflows/docs-pages.yaml
vendored
@@ -18,10 +18,6 @@ on:
|
||||
|
||||
jobs:
|
||||
release:
|
||||
permissions:
|
||||
pages: write
|
||||
id-token: write
|
||||
contents: write
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -33,9 +29,7 @@ jobs:
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v6
|
||||
python-version: "3.10"
|
||||
- name: Set up env
|
||||
run: make -C docs FLAG="${{ env.FLAG }}" setupenv
|
||||
- name: Build docs
|
||||
|
||||
7
.github/workflows/docs-pr.yaml
vendored
7
.github/workflows/docs-pr.yaml
vendored
@@ -2,9 +2,6 @@ name: "Docs / Build PR"
|
||||
# For more information,
|
||||
# see https://sphinx-theme.scylladb.com/stable/deployment/production.html#available-workflows
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
FLAG: ${{ github.repository == 'scylladb/scylla-enterprise' && 'enterprise' || 'opensource' }}
|
||||
|
||||
@@ -29,9 +26,7 @@ jobs:
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v6
|
||||
python-version: "3.10"
|
||||
- name: Set up env
|
||||
run: make -C docs FLAG="${{ env.FLAG }}" setupenv
|
||||
- name: Build docs
|
||||
|
||||
13
.github/workflows/docs-validate-metrics.yml
vendored
13
.github/workflows/docs-validate-metrics.yml
vendored
@@ -1,8 +1,5 @@
|
||||
name: Docs / Validate metrics
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
@@ -10,7 +7,7 @@ on:
|
||||
- enterprise
|
||||
paths:
|
||||
- '**/*.cc'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/get_description.py'
|
||||
- 'docs/_ext/scylladb_metrics.py'
|
||||
|
||||
@@ -18,20 +15,20 @@ jobs:
|
||||
validate-metrics:
|
||||
runs-on: ubuntu-latest
|
||||
name: Check metrics documentation coverage
|
||||
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: '3.10'
|
||||
|
||||
|
||||
- name: Install dependencies
|
||||
run: pip install PyYAML
|
||||
|
||||
|
||||
- name: Validate metrics
|
||||
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml
|
||||
|
||||
5
.github/workflows/iwyu.yaml
vendored
5
.github/workflows/iwyu.yaml
vendored
@@ -14,8 +14,7 @@ env:
|
||||
CLEANER_DIRS: test/unit exceptions alternator api auth cdc compaction db dht gms index lang message mutation mutation_writer node_ops raft redis replica service
|
||||
SEASTAR_BAD_INCLUDE_OUTPUT_PATH: build/seastar-bad-include.log
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
permissions: {}
|
||||
|
||||
# cancel the in-progress run upon a repush
|
||||
concurrency:
|
||||
@@ -35,6 +34,8 @@ jobs:
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
- run: |
|
||||
sudo dnf -y install clang-tools-extra
|
||||
- name: Generate compilation database
|
||||
run: |
|
||||
cmake \
|
||||
|
||||
2
.github/workflows/read-toolchain.yaml
vendored
2
.github/workflows/read-toolchain.yaml
vendored
@@ -10,8 +10,6 @@ on:
|
||||
jobs:
|
||||
read-toolchain:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
outputs:
|
||||
image: ${{ steps.read.outputs.image }}
|
||||
steps:
|
||||
|
||||
53
.github/workflows/trigger-scylla-ci.yaml
vendored
53
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -1,66 +1,21 @@
|
||||
name: Trigger Scylla CI Route
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
issue_comment:
|
||||
types: [created]
|
||||
pull_request_target:
|
||||
types:
|
||||
- unlabeled
|
||||
|
||||
jobs:
|
||||
trigger-jenkins:
|
||||
if: (github.event_name == 'issue_comment' && github.event.comment.user.login != 'scylladbbot') || github.event.label.name == 'conflicts'
|
||||
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Verify Org Membership
|
||||
id: verify_author
|
||||
env:
|
||||
EVENT_NAME: ${{ github.event_name }}
|
||||
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
|
||||
PR_ASSOCIATION: ${{ github.event.pull_request.author_association }}
|
||||
COMMENT_AUTHOR: ${{ github.event.comment.user.login }}
|
||||
COMMENT_ASSOCIATION: ${{ github.event.comment.author_association }}
|
||||
shell: bash
|
||||
run: |
|
||||
if [[ "$EVENT_NAME" == "pull_request_target" ]]; then
|
||||
AUTHOR="$PR_AUTHOR"
|
||||
ASSOCIATION="$PR_ASSOCIATION"
|
||||
else
|
||||
AUTHOR="$COMMENT_AUTHOR"
|
||||
ASSOCIATION="$COMMENT_ASSOCIATION"
|
||||
fi
|
||||
if [[ "$ASSOCIATION" == "MEMBER" || "$ASSOCIATION" == "OWNER" ]]; then
|
||||
echo "member=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "::warning::${AUTHOR} is not a member of scylladb (association: ${ASSOCIATION}); skipping CI trigger."
|
||||
echo "member=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: Validate Comment Trigger
|
||||
if: github.event_name == 'issue_comment'
|
||||
id: verify_comment
|
||||
env:
|
||||
COMMENT_BODY: ${{ github.event.comment.body }}
|
||||
shell: bash
|
||||
run: |
|
||||
CLEAN_BODY=$(echo "$COMMENT_BODY" | grep -v '^[[:space:]]*>')
|
||||
|
||||
if echo "$CLEAN_BODY" | grep -qi '@scylladbbot' && echo "$CLEAN_BODY" | grep -qi 'trigger-ci'; then
|
||||
echo "trigger=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "trigger=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: Trigger Scylla-CI-Route Jenkins Job
|
||||
if: steps.verify_author.outputs.member == 'true' && (github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true')
|
||||
env:
|
||||
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
|
||||
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
|
||||
JENKINS_URL: "https://jenkins.scylladb.com"
|
||||
PR_NUMBER: "${{ github.event.issue.number || github.event.pull_request.number }}"
|
||||
PR_REPO_NAME: "${{ github.event.repository.full_name }}"
|
||||
run: |
|
||||
PR_NUMBER=${{ github.event.issue.number }}
|
||||
PR_REPO_NAME=${{ github.event.repository.full_name }}
|
||||
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
|
||||
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail
|
||||
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v
|
||||
|
||||
3
.github/workflows/trigger_jenkins.yaml
vendored
3
.github/workflows/trigger_jenkins.yaml
vendored
@@ -1,8 +1,5 @@
|
||||
name: Trigger next gating
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
|
||||
@@ -2,12 +2,6 @@ cmake_minimum_required(VERSION 3.27)
|
||||
|
||||
project(scylla)
|
||||
|
||||
# Disable CMake's automatic -fcolor-diagnostics injection (CMake 3.24+ adds
|
||||
# it for Clang+Ninja). configure.py does not add any color diagnostics flags,
|
||||
# so we clear the internal CMake variable to prevent injection.
|
||||
set(CMAKE_CXX_COMPILE_OPTIONS_COLOR_DIAGNOSTICS "")
|
||||
set(CMAKE_C_COMPILE_OPTIONS_COLOR_DIAGNOSTICS "")
|
||||
|
||||
list(APPEND CMAKE_MODULE_PATH
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/cmake
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/seastar/cmake)
|
||||
@@ -57,16 +51,6 @@ set(CMAKE_CXX_EXTENSIONS ON CACHE INTERNAL "")
|
||||
set(CMAKE_CXX_SCAN_FOR_MODULES OFF CACHE INTERNAL "")
|
||||
set(CMAKE_VISIBILITY_INLINES_HIDDEN ON)
|
||||
|
||||
# Global defines matching configure.py
|
||||
# Since gcc 13, libgcc doesn't need the exception workaround
|
||||
add_compile_definitions(SEASTAR_NO_EXCEPTION_HACK)
|
||||
# Hacks needed to expose internal APIs for xxhash dependencies
|
||||
add_compile_definitions(XXH_PRIVATE_API)
|
||||
# SEASTAR_TESTING_MAIN is added later (after add_subdirectory(seastar) and
|
||||
# add_subdirectory(abseil)) to avoid leaking into the seastar subdirectory.
|
||||
# If SEASTAR_TESTING_MAIN is defined globally before seastar, it causes a
|
||||
# duplicate 'main' symbol in seastar_testing.
|
||||
|
||||
if(is_multi_config)
|
||||
find_package(Seastar)
|
||||
# this is atypical compared to standard ExternalProject usage:
|
||||
@@ -112,33 +96,12 @@ else()
|
||||
set(Seastar_EXCLUDE_APPS_FROM_ALL ON CACHE BOOL "" FORCE)
|
||||
set(Seastar_EXCLUDE_TESTS_FROM_ALL ON CACHE BOOL "" FORCE)
|
||||
set(Seastar_IO_URING ON CACHE BOOL "" FORCE)
|
||||
set(Seastar_SCHEDULING_GROUPS_COUNT 24 CACHE STRING "" FORCE)
|
||||
set(Seastar_SCHEDULING_GROUPS_COUNT 21 CACHE STRING "" FORCE)
|
||||
set(Seastar_UNUSED_RESULT_ERROR ON CACHE BOOL "" FORCE)
|
||||
# Match configure.py's build_seastar_shared_libs: Debug and Dev
|
||||
# build Seastar as a shared library, others build it static.
|
||||
if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_BUILD_TYPE STREQUAL "Dev")
|
||||
set(BUILD_SHARED_LIBS ON CACHE BOOL "" FORCE)
|
||||
else()
|
||||
set(BUILD_SHARED_LIBS OFF CACHE BOOL "" FORCE)
|
||||
endif()
|
||||
add_subdirectory(seastar)
|
||||
|
||||
# Coverage mode sets cmake_build_type='Debug' for Seastar
|
||||
# (configure.py:515), so Seastar's pkg-config output includes sanitizer
|
||||
# link flags in seastar_libs_coverage (configure.py:2514,2649).
|
||||
# Seastar's own CMake only activates sanitizer targets for Debug/Sanitize
|
||||
# configs, so we inject link options on the seastar target for Coverage.
|
||||
# Using PUBLIC ensures they propagate to all targets linking Seastar
|
||||
# (but not standalone tools like patchelf), matching configure.py's
|
||||
# behavior. Compile-time flags and defines are handled globally in
|
||||
# cmake/mode.Coverage.cmake.
|
||||
if(CMAKE_BUILD_TYPE STREQUAL "Coverage")
|
||||
target_link_options(seastar
|
||||
PUBLIC
|
||||
-fsanitize=address
|
||||
-fsanitize=undefined
|
||||
-fsanitize=vptr)
|
||||
endif()
|
||||
target_compile_definitions (seastar
|
||||
PRIVATE
|
||||
SEASTAR_NO_EXCEPTION_HACK)
|
||||
endif()
|
||||
|
||||
set(ABSL_PROPAGATE_CXX_STD ON CACHE BOOL "" FORCE)
|
||||
@@ -148,10 +111,8 @@ if(Scylla_ENABLE_LTO)
|
||||
endif()
|
||||
|
||||
find_package(Sanitizers QUIET)
|
||||
# Match configure.py:2192 — abseil gets sanitizer flags with -fno-sanitize=vptr
|
||||
# to exclude vptr checks which are incompatible with abseil's usage.
|
||||
list(APPEND absl_cxx_flags
|
||||
$<$<CONFIG:Debug,Sanitize>:$<TARGET_PROPERTY:Sanitizers::address,INTERFACE_COMPILE_OPTIONS>;$<TARGET_PROPERTY:Sanitizers::undefined_behavior,INTERFACE_COMPILE_OPTIONS>;-fno-sanitize=vptr>)
|
||||
$<$<CONFIG:Debug,Sanitize>:$<TARGET_PROPERTY:Sanitizers::address,INTERFACE_COMPILE_OPTIONS>;$<TARGET_PROPERTY:Sanitizers::undefined_behavior,INTERFACE_COMPILE_OPTIONS>>)
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||
list(APPEND ABSL_GCC_FLAGS ${absl_cxx_flags})
|
||||
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
@@ -176,38 +137,9 @@ add_library(absl::headers ALIAS absl-headers)
|
||||
# unfortunately.
|
||||
set_target_properties(absl_strerror PROPERTIES EXCLUDE_FROM_ALL TRUE)
|
||||
|
||||
# Now that seastar and abseil subdirectories are fully processed, add
|
||||
# SEASTAR_TESTING_MAIN globally. This matches configure.py's global define
|
||||
# without leaking into seastar (which would cause duplicate main symbols).
|
||||
add_compile_definitions(SEASTAR_TESTING_MAIN)
|
||||
|
||||
# System libraries dependencies
|
||||
find_package(Boost REQUIRED
|
||||
COMPONENTS filesystem program_options system thread regex unit_test_framework)
|
||||
# When using shared Boost libraries, define BOOST_ALL_DYN_LINK (matching configure.py)
|
||||
if(NOT Boost_USE_STATIC_LIBS)
|
||||
add_compile_definitions(BOOST_ALL_DYN_LINK)
|
||||
endif()
|
||||
|
||||
# CMake's Boost package config adds per-component defines like
|
||||
# BOOST_UNIT_TEST_FRAMEWORK_DYN_LINK, BOOST_REGEX_DYN_LINK, etc. on the
|
||||
# imported targets. configure.py only uses BOOST_ALL_DYN_LINK (which covers
|
||||
# all components), so strip the per-component defines to align the two build
|
||||
# systems.
|
||||
foreach(_boost_target
|
||||
Boost::unit_test_framework
|
||||
Boost::regex
|
||||
Boost::filesystem
|
||||
Boost::program_options
|
||||
Boost::system
|
||||
Boost::thread)
|
||||
if(TARGET ${_boost_target})
|
||||
# Completely remove all INTERFACE_COMPILE_DEFINITIONS from the Boost target.
|
||||
# This prevents per-component *_DYN_LINK and *_NO_LIB defines from
|
||||
# propagating. BOOST_ALL_DYN_LINK (set globally) covers all components.
|
||||
set_property(TARGET ${_boost_target} PROPERTY INTERFACE_COMPILE_DEFINITIONS)
|
||||
endif()
|
||||
endforeach()
|
||||
target_link_libraries(Boost::regex
|
||||
INTERFACE
|
||||
ICU::i18n
|
||||
@@ -234,9 +166,6 @@ generate_scylla_version()
|
||||
|
||||
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
|
||||
add_library(scylla-precompiled-header STATIC exported_templates.cc)
|
||||
target_include_directories(scylla-precompiled-header PRIVATE
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}"
|
||||
"${scylla_gen_build_dir}")
|
||||
target_link_libraries(scylla-precompiled-header PRIVATE
|
||||
absl::headers
|
||||
absl::btree
|
||||
@@ -267,10 +196,6 @@ if (Scylla_USE_PRECOMPILED_HEADER)
|
||||
message(STATUS "Using precompiled header for Scylla - remember to add `sloppiness = pch_defines,time_macros` to ccache.conf, if you're using ccache.")
|
||||
target_precompile_headers(scylla-precompiled-header PRIVATE "stdafx.hh")
|
||||
target_compile_definitions(scylla-precompiled-header PRIVATE SCYLLA_USE_PRECOMPILED_HEADER)
|
||||
# Match configure.py: -fpch-validate-input-files-content tells the compiler
|
||||
# to check content of stdafx.hh if timestamps don't match (important for
|
||||
# ccache/git workflows where timestamps may not be preserved).
|
||||
add_compile_options(-fpch-validate-input-files-content)
|
||||
endif()
|
||||
else()
|
||||
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
|
||||
@@ -375,6 +300,7 @@ add_subdirectory(locator)
|
||||
add_subdirectory(message)
|
||||
add_subdirectory(mutation)
|
||||
add_subdirectory(mutation_writer)
|
||||
add_subdirectory(node_ops)
|
||||
add_subdirectory(readers)
|
||||
add_subdirectory(replica)
|
||||
add_subdirectory(raft)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
## **SCYLLADB SOFTWARE LICENSE AGREEMENT**
|
||||
|
||||
| Version: | 1.1 |
|
||||
| Version: | 1.0 |
|
||||
| :---- | :---- |
|
||||
| Last updated: | April 12, 2026 |
|
||||
| Last updated: | December 18, 2024 |
|
||||
|
||||
**Your Acceptance**
|
||||
|
||||
@@ -12,48 +12,20 @@ The terms "**You**" or "**Licensee**" refer to any individual accessing or using
|
||||
|
||||
**Grant of License**
|
||||
|
||||
* **Definitions:**
|
||||
1. **Software:** Software means the ScyllaDB software provided by Licensor, including the source code, object code, and any accompanying documentation or tools, or any part thereof, as made available under this Agreement.
|
||||
2. **Commercial Customer**: means any legal entity (including its Affiliates) that has entered into a transaction with Licensor, or an authorized reseller/distributor, for the provision of any ScyllaDB products or services. This includes, without limitation: (a) Scope of Service: Any paid subscription, enterprise license, "BYOA" or Database-as-a-Service (DBaaS) offering, technical support, professional services, consulting, or training. (b) Scale and Volume: Any deployment regardless of size, capacity, or performance metrics (c) Payment Method: Any compensation model, including but not limited to, fixed-fee, consumption-based (On-Demand), committed spend, third-party marketplace credits (e.g., AWS, GCP, Azure), or promotional credits and discounts.
|
||||
* **Grant of License:** Subject to the terms and conditions of this Agreement, including the Eligibility and Exclusive Use Restrictions clause, Licensor grants You a limited, non-exclusive, revocable, non-sublicensable, non-transferable, royalty free license to Use the Software, in each case solely for the purposes of:
|
||||
* **Software Definitions:** Software means the ScyllaDB software provided by Licensor, including the source code, object code, and any accompanying documentation or tools, or any part thereof, as made available under this Agreement.
|
||||
* **Grant of License:** Subject to the terms and conditions of this Agreement, Licensor grants You a limited, non-exclusive, revocable, non-sublicensable, non-transferable, royalty free license to Use the Software, in each case solely for the purposes of:
|
||||
1) Copying, distributing, evaluating (including performing benchmarking or comparative tests or evaluations , subject to the limitations below) and improving the Software and ScyllaDB; and
|
||||
2) create a modified version of the Software (each, a "**Licensed Work**"); provided however, that each such Licensed Work keeps all or substantially all of the functions and features of the Software, and/or using all or substantially all of the source code of the Software. You hereby agree that all the Licensed Work are, upon creation, considered Licensed Work of the Licensor, shall be the sole property of the Licensor and its assignees, and the Licensor and its assignees shall be the sole owner of all rights of any kind or nature, in connection with such Licensed Work. You hereby irrevocably and unconditionally assign to the Licensor all the Licensed Work and any part thereof. This License applies separately for each version of the Licensed Work, which shall be considered "Software" for the purpose of this Agreement.
|
||||
|
||||
* **Eligibility and Exclusive Use Restrictions**
|
||||
|
||||
i. Restricted to "Never Customers" Only. The license granted under this Agreement is strictly limited to Never Customers. For purposes of this Agreement, a "Never Customer" is an entity (including its Affiliates) that does not have, and has not had within the previous twelve (12) months, a paid commercial subscription, professional services agreement, or any other commercial relationship with Licensor. Satisfaction of the Never Customer criteria is a strict condition precedent to the effectiveness of this License.
|
||||
|
||||
ii. Total Prohibition for Existing Commercial Customers. If You (or any of Your Affiliates) are an existing Commercial Customer of Licensor within the last twelve (12) months, no license is deemed to have been offered or extended to You, and any download or installation of the Software by You is unauthorized. This prohibition applies to all deployments, including but not limited to:
|
||||
|
||||
(a) existing commercial workloads;
|
||||
|
||||
(b) any new use cases, new applications, or new workloads
|
||||
|
||||
iii. **No "Hybrid" Usage**. Licensee is expressly prohibited from combining free tier usage under this Agreement with paid commercial units.
|
||||
|
||||
If You are a Commercial Customer, all use of the Software across Your entire organization (and any of your Affiliates) must be governed by a valid, paid commercial agreement. Use of the Software under this license by a Commercial Customer (which is not a "Never Customer") shall:
|
||||
|
||||
(a) Void this license *ab initio*;
|
||||
|
||||
(b) Be deemed a material breach of both this Agreement and any existing commercial terms; and
|
||||
|
||||
(c) Entitle Licensor to invoice Licensee for such unauthorized usage at Licensor's standard list prices, retroactive to the date of first use.
|
||||
|
||||
Notwithstanding anything to the contrary in the Eligibility or License Limitations sections above a Commercial Customer may use the Software exclusively for non-production purposes, including Continuous Integration (CI), automated testing, and quality assurance environments, provided that such use at all times remains compliant with the Usage Limit.
|
||||
|
||||
iv. **Verification**. Licensor reserves the right to audit Licensee's environment and corporate identity to ensure compliance with these eligibility criteria.
|
||||
|
||||
For the purposes of this Agreement an "**Affiliate**" means any entity that directly or indirectly controls, is controlled by, or is under common control with a party, where "control" means ownership of more than 50% of the voting stock or decision-making authority
|
||||
|
||||
|
||||
**License Limitations, Restrictions and Obligations:** The license grant above is subject to the following limitations, restrictions, and obligations. If Licensee’s Use of the Software does not comply with the above license grant or the terms of this section (including exceeding the Usage Limit set forth below), Licensee must: (i) refrain from any Use of the Software; and (ii) unless Licensee is a Never Customer, purchase a [commercial paid license](https://www.scylladb.com/scylladb-proprietary-software-license-agreement/) from the Licensor.
|
||||
**License Limitations, Restrictions and Obligations:** The license grant above is subject to the following limitations, restrictions, and obligations. If Licensee’s Use of the Software does not comply with the above license grant or the terms of this section (including exceeding the Usage Limit set forth below), Licensee must: (i) refrain from any Use of the Software; and (ii) purchase a [commercial paid license](https://www.scylladb.com/scylladb-proprietary-software-license-agreement/) from the Licensor.
|
||||
|
||||
* **Updates:** You shall be solely responsible for providing all equipment, systems, assets, access, and ancillary goods and services needed to access and Use the Software. Licensor may modify or update the Software at any time, without notification, in its sole and absolute discretion. After the effective date of each such update, Licensor shall bear no obligation to run, provide or support legacy versions of the Software.
|
||||
* **"Usage Limit":** Licensee's total overall available storage across all deployments and clusters of the Software and the Licensed Work under this License shall not exceed 10TB and/or an upper limit of 50 VCPUs (hyper threads).
|
||||
* **IP Markings:** Licensee must retain all copyright, trademark, and other proprietary notices contained in the Software. You will not modify, delete, alter, remove, or obscure any intellectual property, including without limitations licensing, copyright, trademark, or any other notices of Licensor in the Software.
|
||||
* **License Reproduction:** You must conspicuously display this Agreement on each copy of the Software. If You receive the Software from a third party, this Agreement still applies to Your Use of the Software. You will be responsible for any breach of this Agreement by any such third-party.
|
||||
* Distribution of any Licensed Works is permitted, provided that: (i) You must include in any Licensed Work prominent notices stating that You have modified the Software, (ii) You include a copy of this Agreement with the Licensed Work, and (iii) You clearly identify all modifications made in the Licensed Work and provides attribution to the Licensor as the original author(s) of the Software.
|
||||
* **Commercial Use Restrictions:** Licensee may not offer the Software as a software-as-a-service (SaaS) or commercial database-as-as-service (dBaaS) offering. Licensee may not use the Software to compete with Licensor's existing or future products or services. If your Use of the Software does not comply with the requirements currently in effect as described in this License, you must purchase a commercial license from the Licensor, its Affiliated entities, or you must refrain from using the Software and all Licensed Work. Furthermore, if You make any written claim of patent infringement relating to the Software, Your patent license for the Software granted under this Agreement terminates immediately.
|
||||
* **Commercial Use Restrictions:** Licensee may not offer the Software as a software-as-a-service (SaaS) or commercial database-as-as-service (dBaaS) offering. Licensee may not use the Software to compete with Licensor's existing or future products or services. If your Use of the Software does not comply with the requirements currently in effect as described in this License, you must purchase a commercial license from the Licensor, its affiliated entities, or you must refrain from using the Software and all Licensed Work. Furthermore, if You make any written claim of patent infringement relating to the Software, Your patent license for the Software granted under this Agreement terminates immediately.
|
||||
* Notwithstanding anything to the contrary, under the License granted hereunder, You shall not and shall not permit others to: (i) transfer the Software or any portions thereof to any other party except as expressly permitted herein; (ii) attempt to circumvent or overcome any technological protection measures incorporated into the Software; (iii) incorporate the Software into the structure, machinery or controls of any aircraft, other aerial device, military vehicle, hovercraft, waterborne craft or any medical equipment of any kind; or (iv) use the Software or any part thereof in any unlawful, harmful or illegal manner, or in a manner which infringes third parties’ rights in any way, including intellectual property rights.
|
||||
|
||||
**Monitoring; Audit**
|
||||
@@ -69,14 +41,14 @@ For the purposes of this Agreement an "**Affiliate**" means any entity that dire
|
||||
|
||||
**Indemnity; Disclaimer; Limitation of Liability**
|
||||
|
||||
* **Indemnity:** Licensee hereby agrees to indemnify, defend and hold harmless Licensor and its Affiliates from any losses or damages incurred due to a third party claim arising out of: (i) Licensee’s breach of this Agreement; (ii) Licensee’s negligence, willful misconduct or violation of law, or (iii) Licensee’s products or services.
|
||||
* **Indemnity:** Licensee hereby agrees to indemnify, defend and hold harmless Licensor and its affiliates from any losses or damages incurred due to a third party claim arising out of: (i) Licensee’s breach of this Agreement; (ii) Licensee’s negligence, willful misconduct or violation of law, or (iii) Licensee’s products or services.
|
||||
* DISCLAIMER OF WARRANTIES: LICENSEE AGREES THAT LICENSOR HAS MADE NO EXPRESS WARRANTIES REGARDING THE SOFTWARE AND THAT THE SOFTWARE IS BEING PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. LICENSOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THE SOFTWARE, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF FITNESS FOR A PARTICULAR PURPOSE; TITLE; MERCHANTABILITY; OR NON-INFRINGEMENT OF THIRD PARTY RIGHTS. LICENSOR DOES NOT WARRANT THAT THE SOFTWARE WILL OPERATE UNINTERRUPTED OR ERROR FREE, OR THAT ALL ERRORS WILL BE CORRECTED. LICENSOR DOES NOT GUARANTEE ANY PARTICULAR RESULTS FROM THE USE OF THE SOFTWARE, AND DOES NOT WARRANT THAT THE SOFTWARE IS FIT FOR ANY PARTICULAR PURPOSE.
|
||||
* LIMITATION OF LIABILITY: TO THE FULLEST EXTENT PERMISSIBLE UNDER APPLICABLE LAW, IN NO EVENT WILL LICENSOR AND/OR ITS AFFILIATES, EMPLOYEES, OFFICERS AND DIRECTORS BE LIABLE TO LICENSEE FOR (I) ANY LOSS OF USE OR DATA; INTERRUPTION OF BUSINESS; OR ANY INDIRECT; SPECIAL; INCIDENTAL; OR CONSEQUENTIAL DAMAGES OF ANY KIND (INCLUDING LOST PROFITS); AND (II) ANY DIRECT DAMAGES EXCEEDING THE TOTAL AMOUNT OF ONE THOUSAND US DOLLARS ($1,000). THE FOREGOING PROVISIONS LIMITING THE LIABILITY OF LICENSOR SHALL APPLY REGARDLESS OF THE FORM OR CAUSE OF ACTION, WHETHER IN STRICT LIABILITY, CONTRACT OR TORT.
|
||||
|
||||
**Proprietary Rights; No Other Rights**
|
||||
|
||||
* **Ownership:** Licensor retains sole and exclusive ownership of all rights, interests and title in the Software and any scripts, processes, techniques, methodologies, inventions, know-how, concepts, formatting, arrangements, visual attributes, ideas, database rights, copyrights, patents, trade secrets, and other intellectual property related thereto, and all derivatives, enhancements, modifications and improvements thereof. Except for the limited license rights granted herein, Licensee has no rights in or to the Software and/ or Licensor’s trademarks, logo, or branding and You acknowledge that such Software, trademarks, logo, or branding is the sole property of Licensor.
|
||||
* **Feedback:** Licensee is not required to provide any suggestions, enhancement requests, recommendations or other feedback regarding the Software ("Feedback"). If, notwithstanding this policy, Licensee submits Feedback, Licensee understands and acknowledges that such Feedback is not submitted in confidence and Licensor assumes no obligation, expressed or implied, by considering it. All right in any trademark or logo of Licensor or its Affiliates and You shall make no claim of right to the Software or any part thereof to be supplied by Licensor hereunder and acknowledges that as between Licensor and You, such Software is the sole proprietary, title and interest in and to Licensor.such Feedback shall be assigned to, and shall become the sole and exclusive property of, Licensor upon its creation.
|
||||
* **Feedback:** Licensee is not required to provide any suggestions, enhancement requests, recommendations or other feedback regarding the Software ("Feedback"). If, notwithstanding this policy, Licensee submits Feedback, Licensee understands and acknowledges that such Feedback is not submitted in confidence and Licensor assumes no obligation, expressed or implied, by considering it. All right in any trademark or logo of Licensor or its affiliates and You shall make no claim of right to the Software or any part thereof to be supplied by Licensor hereunder and acknowledges that as between Licensor and You, such Software is the sole proprietary, title and interest in and to Licensor.such Feedback shall be assigned to, and shall become the sole and exclusive property of, Licensor upon its creation.
|
||||
* Except for the rights expressly granted to You under this Agreement, You are not granted any other licenses or rights in the Software or otherwise. This Agreement constitutes the entire agreement between You and the Licensor with respect to the subject matter hereof and supersedes all prior or contemporaneous communications, representations, or agreements, whether oral or written.
|
||||
* **Third-Party Software:** Customer acknowledges that the Software may contain open and closed source components (“OSS Components”) that are governed separately by certain licenses, in each case as further provided by Company upon request. Any applicable OSS Component license is solely between Licensee and the applicable licensor of the OSS Component and Licensee shall comply with the applicable OSS Component license.
|
||||
* If any provision of this Agreement is held to be invalid or unenforceable, such provision shall be struck and the remaining provisions shall remain in full force and effect.
|
||||
@@ -84,7 +56,7 @@ For the purposes of this Agreement an "**Affiliate**" means any entity that dire
|
||||
**Miscellaneous**
|
||||
|
||||
* **Miscellaneous:** This Agreement may be modified at any time by Licensor, and constitutes the entire agreement between the parties with respect to the subject matter hereof. Licensee may not assign or subcontract its rights or obligations under this Agreement. This Agreement does not, and shall not be construed to create any relationship, partnership, joint venture, employer-employee, agency, or franchisor-franchisee relationship between the parties.
|
||||
* **Modifications**: Licensor reserves the right to modify this Agreement at any time. Changes will be effective upon posting to the Website or within the Software repository. Continued use of the Software after such changes constitutes acceptance.
|
||||
* **Governing Law & Jurisdiction:** This Agreement shall be governed and construed in accordance with the laws of Israel, without giving effect to their respective conflicts of laws provisions, and the competent courts situated in Tel Aviv, Israel, shall have sole and exclusive jurisdiction over the parties and any conflict and/or dispute arising out of, or in connection to, this Agreement
|
||||
|
||||
\[*End of ScyllaDB Software License Agreement*\]
|
||||
|
||||
|
||||
@@ -43,7 +43,7 @@ For further information, please see:
|
||||
|
||||
[developer documentation]: HACKING.md
|
||||
[build documentation]: docs/dev/building.md
|
||||
[docker image build documentation]: dist/docker/redhat/README.md
|
||||
[docker image build documentation]: dist/docker/debian/README.md
|
||||
|
||||
## Running Scylla
|
||||
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.2.0-dev
|
||||
VERSION=2026.1.0-dev
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
109
UNIMPLEMENTED_ENUM_ANALYSIS.md
Normal file
109
UNIMPLEMENTED_ENUM_ANALYSIS.md
Normal file
@@ -0,0 +1,109 @@
|
||||
# Analysis of unimplemented::cause Enum Values
|
||||
|
||||
This document provides an analysis of the `unimplemented::cause` enum values after cleanup.
|
||||
|
||||
## Removed Unused Enum Values (20 values removed)
|
||||
|
||||
The following enum values had **zero usages** in the codebase and have been removed:
|
||||
|
||||
- `LWT` - Lightweight transactions
|
||||
- `PAGING` - Query result paging
|
||||
- `AUTH` - Authentication
|
||||
- `PERMISSIONS` - Permission checking
|
||||
- `COUNTERS` - Counter columns
|
||||
- `MIGRATIONS` - Schema migrations
|
||||
- `GOSSIP` - Gossip protocol
|
||||
- `TOKEN_RESTRICTION` - Token-based restrictions
|
||||
- `LEGACY_COMPOSITE_KEYS` - Legacy composite key handling
|
||||
- `COLLECTION_RANGE_TOMBSTONES` - Collection range tombstones
|
||||
- `RANGE_DELETES` - Range deletion operations
|
||||
- `COMPRESSION` - Compression features
|
||||
- `NONATOMIC` - Non-atomic operations
|
||||
- `CONSISTENCY` - Consistency level handling
|
||||
- `WRAP_AROUND` - Token wrap-around handling
|
||||
- `STORAGE_SERVICE` - Storage service operations
|
||||
- `SCHEMA_CHANGE` - Schema change operations
|
||||
- `MIXED_CF` - Mixed column family operations
|
||||
- `SSTABLE_FORMAT_M` - SSTable format M
|
||||
|
||||
## Remaining Enum Values (8 values kept)
|
||||
|
||||
### 1. `API` (4 usages)
|
||||
**Impact**: REST API features that are not fully implemented.
|
||||
|
||||
**Usages**:
|
||||
- `api/column_family.cc:1052` - Fails when `split_output` parameter is used in major compaction
|
||||
- `api/compaction_manager.cc:100,146,216` - Warns when force_user_defined_compaction or related operations are called
|
||||
|
||||
**User Impact**: Some REST API endpoints for compaction management are stubs and will warn or fail.
|
||||
|
||||
### 2. `INDEXES` (6 usages)
|
||||
**Impact**: Secondary index features not fully supported.
|
||||
|
||||
**Usages**:
|
||||
- `api/column_family.cc:433,440,449,456` - Warns about index-related operations
|
||||
- `cql3/restrictions/statement_restrictions.cc:1158` - Fails when attempting filtering on collection columns without proper indexing
|
||||
- `cql3/statements/update_statement.cc:149` - Warns about index operations
|
||||
|
||||
**User Impact**: Some advanced secondary index features (especially filtering on collections) are not available.
|
||||
|
||||
### 3. `TRIGGERS` (2 usages)
|
||||
**Impact**: Trigger support is not implemented.
|
||||
|
||||
**Usages**:
|
||||
- `db/schema_tables.cc:2017` - Warns when loading trigger metadata from schema tables
|
||||
- `service/storage_proxy.cc:4166` - Warns when processing trigger-related operations
|
||||
|
||||
**User Impact**: Cassandra triggers (stored procedures that execute on data changes) are not supported.
|
||||
|
||||
### 4. `METRICS` (1 usage)
|
||||
**Impact**: Some query processor metrics are not collected.
|
||||
|
||||
**Usages**:
|
||||
- `cql3/query_processor.cc:585` - Warns about missing metrics implementation
|
||||
|
||||
**User Impact**: Minor - some internal metrics may not be available.
|
||||
|
||||
### 5. `VALIDATION` (4 usages)
|
||||
**Impact**: Schema validation checks are partially implemented.
|
||||
|
||||
**Usages**:
|
||||
- `cql3/functions/token_fct.hh:38` - Warns about validation in token functions
|
||||
- `cql3/statements/drop_keyspace_statement.cc:40` - Warns when dropping keyspace
|
||||
- `cql3/statements/truncate_statement.cc:87` - Warns when truncating table
|
||||
- `service/migration_manager.cc:750` - Warns during schema migrations
|
||||
|
||||
**User Impact**: Some schema validation checks are skipped (with warnings logged).
|
||||
|
||||
### 6. `REVERSED` (1 usage)
|
||||
**Impact**: Reversed type support in CQL protocol.
|
||||
|
||||
**Usages**:
|
||||
- `transport/server.cc:2085` - Fails when trying to use reversed types in CQL protocol
|
||||
|
||||
**User Impact**: Reversed types are not supported in the CQL protocol implementation.
|
||||
|
||||
### 7. `HINT` (1 usage)
|
||||
**Impact**: Hint replaying is not implemented.
|
||||
|
||||
**Usages**:
|
||||
- `db/batchlog_manager.cc:251` - Warns when attempting to replay hints
|
||||
|
||||
**User Impact**: Cassandra hints (temporary storage of writes when nodes are down) are not supported.
|
||||
|
||||
### 8. `SUPER` (2 usages)
|
||||
**Impact**: Super column families are not supported.
|
||||
|
||||
**Usages**:
|
||||
- `db/legacy_schema_migrator.cc:157` - Fails when encountering super column family in legacy schema
|
||||
- `db/schema_tables.cc:2288` - Fails when encountering super column family in schema tables
|
||||
|
||||
**User Impact**: Super column families (legacy Cassandra feature) will cause errors if encountered in legacy data or schema migrations.
|
||||
|
||||
## Summary
|
||||
|
||||
- **Removed**: 20 unused enum values (76% reduction)
|
||||
- **Kept**: 8 actively used enum values (24% remaining)
|
||||
- **Total lines removed**: ~40 lines from enum definition and switch statement
|
||||
|
||||
The remaining enum values represent actual unimplemented features that users may encounter, with varying impacts ranging from warnings (TRIGGERS, METRICS, VALIDATION, HINT) to failures (API split_output, INDEXES on collections, REVERSED types, SUPER tables).
|
||||
2
abseil
2
abseil
Submodule abseil updated: 255c84dadd...d7aaad83b4
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "absl-flat_hash_map.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -18,7 +18,6 @@ target_sources(alternator
|
||||
consumed_capacity.cc
|
||||
ttl.cc
|
||||
parsed_expression_cache.cc
|
||||
http_compression.cc
|
||||
${cql_grammar_srcs})
|
||||
target_include_directories(alternator
|
||||
PUBLIC
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "alternator/error.hh"
|
||||
@@ -13,8 +13,7 @@
|
||||
#include <string_view>
|
||||
#include "alternator/auth.hh"
|
||||
#include <fmt/format.h>
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "auth/password_authenticator.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "alternator/executor.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
@@ -26,8 +25,8 @@ namespace alternator {
|
||||
|
||||
static logging::logger alogger("alternator-auth");
|
||||
|
||||
future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::string username) {
|
||||
schema_ptr schema = proxy.data_dictionary().find_schema(db::system_keyspace::NAME, "roles");
|
||||
future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::service& as, std::string username) {
|
||||
schema_ptr schema = proxy.data_dictionary().find_schema(auth::get_auth_ks_name(as.query_processor()), "roles");
|
||||
partition_key pk = partition_key::from_single_value(*schema, utf8_type->decompose(username));
|
||||
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
|
||||
@@ -40,7 +39,7 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::strin
|
||||
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
|
||||
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
|
||||
auto cl = db::consistency_level::LOCAL_ONE;
|
||||
auto cl = auth::password_authenticator::consistency_for_user(username);
|
||||
|
||||
service::client_state client_state{service::client_state::internal_tag()};
|
||||
service::storage_proxy::coordinator_query_result qr = co_await proxy.query(schema, std::move(command), std::move(partition_ranges), cl,
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -20,6 +20,6 @@ namespace alternator {
|
||||
|
||||
using key_cache = utils::loading_cache<std::string, std::string, 1>;
|
||||
|
||||
future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::string username);
|
||||
future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::service& as, std::string username);
|
||||
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <string_view>
|
||||
@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
|
||||
if (!comparison_operator.IsString()) {
|
||||
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
|
||||
}
|
||||
std::string op = rjson::to_string(comparison_operator);
|
||||
std::string op = comparison_operator.GetString();
|
||||
auto it = ops.find(op);
|
||||
if (it == ops.end()) {
|
||||
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
|
||||
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
|
||||
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
|
||||
}
|
||||
if (kv1.name == "S") {
|
||||
return cmp(rjson::to_string_view(kv1.value),
|
||||
rjson::to_string_view(kv2.value));
|
||||
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
|
||||
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
|
||||
}
|
||||
if (kv1.name == "B") {
|
||||
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
|
||||
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
|
||||
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "S") {
|
||||
return check_BETWEEN(rjson::to_string_view(kv_v.value),
|
||||
rjson::to_string_view(kv_lb.value),
|
||||
rjson::to_string_view(kv_ub.value),
|
||||
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
|
||||
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
|
||||
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
|
||||
bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "B") {
|
||||
@@ -618,7 +618,7 @@ conditional_operator_type get_conditional_operator(const rjson::value& req) {
|
||||
// Check if the existing values of the item (previous_item) match the
|
||||
// conditions given by the Expected and ConditionalOperator parameters
|
||||
// (if they exist) in the request (an UpdateItem, PutItem or DeleteItem).
|
||||
// This function can throw a ValidationException API error if there
|
||||
// This function can throw an ValidationException API error if there
|
||||
// are errors in the format of the condition itself.
|
||||
bool verify_expected(const rjson::value& req, const rjson::value* previous_item) {
|
||||
const rjson::value* expected = rjson::find(req, "Expected");
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
/*
|
||||
|
||||
@@ -3,13 +3,11 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "consumed_capacity.hh"
|
||||
#include "error.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace alternator {
|
||||
|
||||
@@ -34,18 +32,18 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
|
||||
if (!return_consumed->IsString()) {
|
||||
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
|
||||
}
|
||||
std::string_view consumed = rjson::to_string_view(*return_consumed);
|
||||
std::string consumed = return_consumed->GetString();
|
||||
if (consumed == "INDEXES") {
|
||||
throw api_error::validation("INDEXES consumed capacity is not supported");
|
||||
}
|
||||
if (consumed != "TOTAL") {
|
||||
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
|
||||
throw api_error::validation("Unknown consumed capacity "+ consumed);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
|
||||
if (_should_add_to_response) {
|
||||
if (_should_add_to_reponse) {
|
||||
auto consumption = rjson::empty_object();
|
||||
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
|
||||
rjson::add(response, "ConsumedCapacity", std::move(consumption));
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -28,9 +28,9 @@ namespace alternator {
|
||||
class consumed_capacity_counter {
|
||||
public:
|
||||
consumed_capacity_counter() = default;
|
||||
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){}
|
||||
consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
|
||||
bool operator()() const noexcept {
|
||||
return _should_add_to_response;
|
||||
return _should_add_to_reponse;
|
||||
}
|
||||
|
||||
consumed_capacity_counter& operator +=(uint64_t bytes);
|
||||
@@ -44,7 +44,7 @@ public:
|
||||
uint64_t _total_bytes = 0;
|
||||
static bool should_add_capacity(const rjson::value& request);
|
||||
protected:
|
||||
bool _should_add_to_response = false;
|
||||
bool _should_add_to_reponse = false;
|
||||
};
|
||||
|
||||
class rcu_consumed_capacity_counter : public consumed_capacity_counter {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/core/with_scheduling_group.hh>
|
||||
@@ -28,7 +28,6 @@ static logging::logger logger("alternator_controller");
|
||||
controller::controller(
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
@@ -40,7 +39,6 @@ controller::controller(
|
||||
: protocol_server(sg)
|
||||
, _gossiper(gossiper)
|
||||
, _proxy(proxy)
|
||||
, _ss(ss)
|
||||
, _mm(mm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _cdc_gen_svc(cdc_gen_svc)
|
||||
@@ -91,7 +89,7 @@ future<> controller::start_server() {
|
||||
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
|
||||
return cfg.alternator_timeout_in_ms;
|
||||
};
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
|
||||
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
|
||||
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
|
||||
@@ -105,23 +103,11 @@ future<> controller::start_server() {
|
||||
alternator_port = _config.alternator_port();
|
||||
_listen_addresses.push_back({addr, *alternator_port});
|
||||
}
|
||||
std::optional<uint16_t> alternator_port_proxy_protocol;
|
||||
if (_config.alternator_port_proxy_protocol()) {
|
||||
alternator_port_proxy_protocol = _config.alternator_port_proxy_protocol();
|
||||
_listen_addresses.push_back({addr, *alternator_port_proxy_protocol});
|
||||
}
|
||||
std::optional<uint16_t> alternator_https_port;
|
||||
std::optional<uint16_t> alternator_https_port_proxy_protocol;
|
||||
std::optional<tls::credentials_builder> creds;
|
||||
if (_config.alternator_https_port() || _config.alternator_https_port_proxy_protocol()) {
|
||||
if (_config.alternator_https_port()) {
|
||||
alternator_https_port = _config.alternator_https_port();
|
||||
_listen_addresses.push_back({addr, *alternator_https_port});
|
||||
}
|
||||
if (_config.alternator_https_port_proxy_protocol()) {
|
||||
alternator_https_port_proxy_protocol = _config.alternator_https_port_proxy_protocol();
|
||||
_listen_addresses.push_back({addr, *alternator_https_port_proxy_protocol});
|
||||
}
|
||||
if (_config.alternator_https_port()) {
|
||||
alternator_https_port = _config.alternator_https_port();
|
||||
_listen_addresses.push_back({addr, *alternator_https_port});
|
||||
creds.emplace();
|
||||
auto opts = _config.alternator_encryption_options();
|
||||
if (opts.empty()) {
|
||||
@@ -147,29 +133,20 @@ future<> controller::start_server() {
|
||||
}
|
||||
}
|
||||
_server.invoke_on_all(
|
||||
[this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds = std::move(creds)] (server& server) mutable {
|
||||
return server.init(addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds,
|
||||
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
|
||||
return server.init(addr, alternator_port, alternator_https_port, creds,
|
||||
_config.alternator_enforce_authorization,
|
||||
_config.alternator_warn_authorization,
|
||||
_config.alternator_max_users_query_size_in_trace_output,
|
||||
&_memory_limiter.local().get_semaphore(),
|
||||
_config.max_concurrent_requests_per_shard);
|
||||
}).handle_exception([this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] (std::exception_ptr ep) {
|
||||
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}, proxy-protocol port {}, TLS proxy-protocol port {}: {}",
|
||||
addr,
|
||||
alternator_port ? std::to_string(*alternator_port) : "OFF",
|
||||
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
|
||||
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
|
||||
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF",
|
||||
ep);
|
||||
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
|
||||
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}: {}",
|
||||
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF", ep);
|
||||
return stop_server().then([ep = std::move(ep)] { return make_exception_future<>(ep); });
|
||||
}).then([addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] {
|
||||
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}, proxy-protocol port {}, TLS proxy-protocol port {}",
|
||||
addr,
|
||||
alternator_port ? std::to_string(*alternator_port) : "OFF",
|
||||
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
|
||||
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
|
||||
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF");
|
||||
}).then([addr, alternator_port, alternator_https_port] {
|
||||
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
|
||||
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
@@ -192,7 +169,7 @@ future<> controller::request_stop_server() {
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
|
||||
future<utils::chunked_vector<client_data>> controller::get_client_data() {
|
||||
return _server.local().get_client_data();
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -15,7 +15,6 @@
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class storage_service;
|
||||
class migration_manager;
|
||||
class memory_limiter;
|
||||
}
|
||||
@@ -58,7 +57,6 @@ class server;
|
||||
class controller : public protocol_server {
|
||||
sharded<gms::gossiper>& _gossiper;
|
||||
sharded<service::storage_proxy>& _proxy;
|
||||
sharded<service::storage_service>& _ss;
|
||||
sharded<service::migration_manager>& _mm;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<cdc::generation_service>& _cdc_gen_svc;
|
||||
@@ -76,7 +74,6 @@ public:
|
||||
controller(
|
||||
sharded<gms::gossiper>& gossiper,
|
||||
sharded<service::storage_proxy>& proxy,
|
||||
sharded<service::storage_service>& ss,
|
||||
sharded<service::migration_manager>& mm,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<cdc::generation_service>& cdc_gen_svc,
|
||||
@@ -96,7 +93,7 @@ public:
|
||||
// This virtual function is called (on each shard separately) when the
|
||||
// virtual table "system.clients" is read. It is expected to generate a
|
||||
// list of clients connected to this server (on this shard).
|
||||
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
|
||||
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <fmt/ranges.h>
|
||||
@@ -17,7 +17,6 @@
|
||||
#include "auth/service.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "mutation/tombstone.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "utils/log.hh"
|
||||
@@ -63,20 +62,11 @@
|
||||
#include "types/types.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "cql3/statements/ks_prop_defs.hh"
|
||||
#include "alternator/ttl_tag.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
logging::logger elogger("alternator-executor");
|
||||
|
||||
namespace std {
|
||||
template <> struct hash<std::pair<sstring, sstring>> {
|
||||
size_t operator () (const std::pair<sstring, sstring>& p) const {
|
||||
return std::hash<sstring>()(p.first) * 1009 + std::hash<sstring>()(p.second) * 3;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// Alternator-specific table properties stored as hidden table tags:
|
||||
@@ -165,7 +155,7 @@ static map_type attrs_type() {
|
||||
|
||||
static const column_definition& attrs_column(const schema& schema) {
|
||||
const column_definition* cdef = schema.get_column_definition(bytes(executor::ATTRS_COLUMN_NAME));
|
||||
throwing_assert(cdef);
|
||||
SCYLLA_ASSERT(cdef);
|
||||
return *cdef;
|
||||
}
|
||||
|
||||
@@ -238,7 +228,7 @@ static void validate_is_object(const rjson::value& value, const char* caller) {
|
||||
}
|
||||
|
||||
// This function assumes the given value is an object and returns requested member value.
|
||||
// If it is not possible, an api_error::validation is thrown.
|
||||
// If it is not possible an api_error::validation is thrown.
|
||||
static const rjson::value& get_member(const rjson::value& obj, const char* member_name, const char* caller) {
|
||||
validate_is_object(obj, caller);
|
||||
const rjson::value* ret = rjson::find(obj, member_name);
|
||||
@@ -250,7 +240,7 @@ static const rjson::value& get_member(const rjson::value& obj, const char* membe
|
||||
|
||||
|
||||
// This function assumes the given value is an object with a single member, and returns this member.
|
||||
// In case the requirements are not met, an api_error::validation is thrown.
|
||||
// In case the requirements are not met an api_error::validation is thrown.
|
||||
static const rjson::value::Member& get_single_member(const rjson::value& v, const char* caller) {
|
||||
if (!v.IsObject() || v.MemberCount() != 1) {
|
||||
throw api_error::validation(format("{}: expected an object with a single member.", caller));
|
||||
@@ -258,66 +248,14 @@ static const rjson::value::Member& get_single_member(const rjson::value& v, cons
|
||||
return *(v.MemberBegin());
|
||||
}
|
||||
|
||||
class executor::describe_table_info_manager : public service::migration_listener::empty_listener {
|
||||
executor &_executor;
|
||||
|
||||
struct table_info {
|
||||
utils::simple_value_with_expiry<std::uint64_t> size_in_bytes;
|
||||
};
|
||||
std::unordered_map<std::pair<sstring, sstring>, table_info> info_for_tables;
|
||||
bool active = false;
|
||||
|
||||
public:
|
||||
describe_table_info_manager(executor& executor) : _executor(executor) {
|
||||
_executor._proxy.data_dictionary().real_database_ptr()->get_notifier().register_listener(this);
|
||||
active = true;
|
||||
}
|
||||
describe_table_info_manager(const describe_table_info_manager &) = delete;
|
||||
describe_table_info_manager(describe_table_info_manager&&) = delete;
|
||||
~describe_table_info_manager() {
|
||||
if (active) {
|
||||
on_fatal_internal_error(elogger, "describe_table_info_manager was not stopped before destruction");
|
||||
}
|
||||
}
|
||||
|
||||
describe_table_info_manager &operator = (const describe_table_info_manager &) = delete;
|
||||
describe_table_info_manager &operator = (describe_table_info_manager&&) = delete;
|
||||
|
||||
static std::chrono::high_resolution_clock::time_point now() {
|
||||
return std::chrono::high_resolution_clock::now();
|
||||
}
|
||||
|
||||
std::optional<std::uint64_t> get_cached_size_in_bytes(const sstring &ks_name, const sstring &cf_name) const {
|
||||
auto it = info_for_tables.find({ks_name, cf_name});
|
||||
if (it != info_for_tables.end()) {
|
||||
return it->second.size_in_bytes.get();
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
void cache_size_in_bytes(sstring ks_name, sstring cf_name, std::uint64_t size_in_bytes, std::chrono::high_resolution_clock::time_point expiry) {
|
||||
info_for_tables[{std::move(ks_name), std::move(cf_name)}].size_in_bytes.set_if_longer_expiry(size_in_bytes, expiry);
|
||||
}
|
||||
future<> stop() {
|
||||
co_await _executor._proxy.data_dictionary().real_database_ptr()->get_notifier().unregister_listener(this);
|
||||
active = false;
|
||||
co_return;
|
||||
}
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
if (!ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX)) return;
|
||||
info_for_tables.erase({ks_name, cf_name});
|
||||
}
|
||||
};
|
||||
|
||||
executor::executor(gms::gossiper& gossiper,
|
||||
service::storage_proxy& proxy,
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
cdc::metadata& cdc_metadata,
|
||||
smp_service_group ssg,
|
||||
utils::updateable_value<uint32_t> default_timeout_in_ms)
|
||||
: _gossiper(gossiper),
|
||||
_ss(ss),
|
||||
_proxy(proxy),
|
||||
_mm(mm),
|
||||
_sdks(sdks),
|
||||
@@ -330,7 +268,6 @@ executor::executor(gms::gossiper& gossiper,
|
||||
_stats))
|
||||
{
|
||||
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
|
||||
_describe_table_info_manager = std::make_unique<describe_table_info_manager>(*this);
|
||||
register_metrics(_metrics, _stats);
|
||||
}
|
||||
|
||||
@@ -482,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
|
||||
if (!table_name_value->IsString()) {
|
||||
throw api_error::validation("Non-string TableName field in request");
|
||||
}
|
||||
std::string table_name = rjson::to_string(*table_name_value);
|
||||
std::string table_name = table_name_value->GetString();
|
||||
return table_name;
|
||||
}
|
||||
|
||||
@@ -609,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
// does exist but the index does not (ValidationException).
|
||||
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
|
||||
throw api_error::validation(
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
|
||||
} else {
|
||||
throw api_error::resource_not_found(
|
||||
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
|
||||
@@ -650,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
|
||||
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
|
||||
attribute_name, value));
|
||||
}
|
||||
return rjson::to_string(*attribute_value);
|
||||
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
|
||||
}
|
||||
|
||||
// Convenience function for getting the value of a boolean attribute, or a
|
||||
@@ -683,7 +620,7 @@ static std::optional<int> get_int_attribute(const rjson::value& value, std::stri
|
||||
}
|
||||
|
||||
// Sets a KeySchema object inside the given JSON parent describing the key
|
||||
// attributes of the given schema as being either HASH or RANGE keys.
|
||||
// attributes of the the given schema as being either HASH or RANGE keys.
|
||||
// Additionally, adds to a given map mappings between the key attribute
|
||||
// names and their type (as a DynamoDB type string).
|
||||
void executor::describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>* attribute_types, const std::map<sstring, sstring> *tags) {
|
||||
@@ -815,44 +752,12 @@ static future<bool> is_view_built(
|
||||
|
||||
}
|
||||
|
||||
future<> executor::cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl) {
|
||||
auto expiry = describe_table_info_manager::now() + ttl;
|
||||
return container().invoke_on_all(
|
||||
[schema, size_in_bytes, expiry] (executor& exec) {
|
||||
exec._describe_table_info_manager->cache_size_in_bytes(schema->ks_name(), schema->cf_name(), size_in_bytes, expiry);
|
||||
});
|
||||
}
|
||||
|
||||
future<> executor::fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting) {
|
||||
auto cached_size = _describe_table_info_manager->get_cached_size_in_bytes(schema->ks_name(), schema->cf_name());
|
||||
std::uint64_t total_size = 0;
|
||||
if (cached_size) {
|
||||
total_size = *cached_size;
|
||||
} else {
|
||||
// there's no point in trying to estimate value of table that is being deleted, as other nodes more often than not might
|
||||
// move forward with deletion faster than we calculate the size
|
||||
if (!deleting) {
|
||||
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
|
||||
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
|
||||
// Note: we don't care when the notification of other shards will finish, as long as it will be done
|
||||
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
|
||||
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
|
||||
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
|
||||
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
|
||||
// which is also fine, as the specification doesn't give precision guarantees of any kind.
|
||||
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
|
||||
}
|
||||
}
|
||||
rjson::add(table_description, "TableSizeBytes", total_size);
|
||||
}
|
||||
|
||||
future<rjson::value> executor::fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
rjson::value table_description = rjson::empty_object();
|
||||
auto tags_ptr = db::get_tags_of_table(schema);
|
||||
|
||||
rjson::add(table_description, "TableName", rjson::from_string(schema->cf_name()));
|
||||
co_await fill_table_size(table_description, schema, tbl_status == table_status::deleting);
|
||||
|
||||
auto creation_timestamp = get_table_creation_time(*schema);
|
||||
|
||||
@@ -896,7 +801,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", wcu);
|
||||
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
|
||||
|
||||
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
|
||||
|
||||
|
||||
data_dictionary::table t = proxy.data_dictionary().find_column_family(schema);
|
||||
|
||||
if (tbl_status != table_status::deleting) {
|
||||
rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp));
|
||||
@@ -917,7 +824,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
sstring index_name = cf_name.substr(delim_it + 1);
|
||||
rjson::add(view_entry, "IndexName", rjson::from_string(index_name));
|
||||
rjson::add(view_entry, "IndexArn", generate_arn_for_index(*schema, index_name));
|
||||
// Add index's KeySchema and collect types for AttributeDefinitions:
|
||||
// Add indexes's KeySchema and collect types for AttributeDefinitions:
|
||||
executor::describe_key_schema(view_entry, *vptr, key_attribute_types, db::get_tags_of_table(vptr));
|
||||
// Add projection type
|
||||
rjson::value projection = rjson::empty_object();
|
||||
@@ -933,7 +840,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
// (for a built view) or CREATING+Backfilling (if view building
|
||||
// is in progress).
|
||||
if (!is_lsi) {
|
||||
if (co_await is_view_built(vptr, _proxy, client_state, trace_state, permit)) {
|
||||
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
|
||||
rjson::add(view_entry, "IndexStatus", "ACTIVE");
|
||||
} else {
|
||||
rjson::add(view_entry, "IndexStatus", "CREATING");
|
||||
@@ -961,8 +868,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
|
||||
}
|
||||
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
|
||||
}
|
||||
executor::supplement_table_stream_info(table_description, *schema, _proxy);
|
||||
executor::supplement_table_stream_info(table_description, *schema, proxy);
|
||||
|
||||
// FIXME: still missing some response fields (issue #5026)
|
||||
co_return table_description;
|
||||
}
|
||||
|
||||
@@ -982,7 +890,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
|
||||
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
|
||||
tracing::add_alternator_table_name(trace_state, schema->cf_name());
|
||||
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, client_state, trace_state, permit);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
|
||||
rjson::value response = rjson::empty_object();
|
||||
rjson::add(response, "Table", std::move(table_description));
|
||||
elogger.trace("returning {}", response);
|
||||
@@ -1085,7 +993,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
auto& p = _proxy.container();
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, client_state, trace_state, permit);
|
||||
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
|
||||
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
@@ -1172,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
|
||||
}
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
if (attribute_info["AttributeName"].GetString() == name) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
data_type dt = parse_key_type(type);
|
||||
if (computed_column) {
|
||||
// Computed column for GSI (doesn't choose a real column as-is
|
||||
@@ -1208,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First element of KeySchema must be an object");
|
||||
}
|
||||
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
|
||||
throw api_error::validation("First key in KeySchema must be a HASH key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[0], "AttributeName");
|
||||
@@ -1216,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First key in KeySchema must have string AttributeName");
|
||||
}
|
||||
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
|
||||
std::string hash_key = rjson::to_string(*v);
|
||||
std::string hash_key = v->GetString();
|
||||
std::string range_key;
|
||||
if (key_schema->Size() == 2) {
|
||||
if (!(*key_schema)[1].IsObject()) {
|
||||
throw api_error::validation("Second element of KeySchema must be an object");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "KeyType");
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
|
||||
throw api_error::validation("Second key in KeySchema must be a RANGE key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "AttributeName");
|
||||
@@ -1649,8 +1557,9 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
|
||||
}
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
|
||||
throwing_assert(this_shard_id() == 0);
|
||||
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
|
||||
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
|
||||
// We begin by parsing and validating the content of the CreateTable
|
||||
// command. We can't inspect the current database schema at this point
|
||||
@@ -1836,7 +1745,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
|
||||
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
if (executor::add_stream_options(*stream_specification, builder, _proxy)) {
|
||||
if (executor::add_stream_options(*stream_specification, builder, sp)) {
|
||||
validate_cdc_log_name_length(builder.cf_name());
|
||||
}
|
||||
}
|
||||
@@ -1855,7 +1764,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
set_table_creation_time(tags_map, db_clock::now());
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, _stats);
|
||||
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
|
||||
|
||||
schema_ptr schema = builder.build();
|
||||
for (auto& view_builder : view_builders) {
|
||||
@@ -1871,49 +1780,33 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
|
||||
}
|
||||
|
||||
size_t retries = _mm.get_concurrent_ddl_retries();
|
||||
size_t retries = mm.get_concurrent_ddl_retries();
|
||||
for (;;) {
|
||||
auto group0_guard = co_await _mm.start_group0_operation();
|
||||
auto group0_guard = co_await mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
utils::chunked_vector<mutation> schema_mutations;
|
||||
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
|
||||
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
|
||||
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
|
||||
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
|
||||
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
|
||||
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
|
||||
if (stream_specification && stream_specification->IsObject()) {
|
||||
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
|
||||
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
|
||||
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
|
||||
const auto& topo = sp.local_db().get_token_metadata().get_topology();
|
||||
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
|
||||
if (rs->uses_tablets()) {
|
||||
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
|
||||
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
|
||||
}
|
||||
}
|
||||
}
|
||||
// Creating an index in tablets mode requires the keyspace to be RF-rack-valid.
|
||||
// GSI and LSI indexes are based on materialized views which require RF-rack-validity to avoid consistency issues.
|
||||
if (!view_builders.empty() || _proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
|
||||
try {
|
||||
locator::assert_rf_rack_valid_keyspace(keyspace_name, _proxy.local_db().get_token_metadata_ptr(), *rs);
|
||||
} catch (const std::invalid_argument& ex) {
|
||||
if (!view_builders.empty()) {
|
||||
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes and LocalSecondaryIndexes on a table "
|
||||
"using tablets require the number of racks in the cluster to be either 1 or 3"));
|
||||
} else {
|
||||
co_return api_error::validation(fmt::format("Cannot create table '{}' with tablets: the configuration "
|
||||
"option 'rf_rack_valid_keyspaces' is enabled, which enforces that tables using tablets can only be created in clusters "
|
||||
"that have either 1 or 3 racks", table_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
|
||||
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
|
||||
} catch (exceptions::already_exists_exception&) {
|
||||
if (_proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
co_return api_error::resource_in_use(fmt::format("Table {} already exists", table_name));
|
||||
}
|
||||
}
|
||||
if (_proxy.data_dictionary().try_find_table(schema->id())) {
|
||||
if (sp.data_dictionary().try_find_table(schema->id())) {
|
||||
// This should never happen, the ID is supposed to be unique
|
||||
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
|
||||
}
|
||||
@@ -1922,9 +1815,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
for (schema_builder& view_builder : view_builders) {
|
||||
schemas.push_back(view_builder.build());
|
||||
}
|
||||
co_await service::prepare_new_column_families_announcement(schema_mutations, _proxy, *ksm, schemas, ts);
|
||||
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
|
||||
if (ksm->uses_tablets()) {
|
||||
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, _proxy);
|
||||
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
|
||||
}
|
||||
|
||||
// If a role is allowed to create a table, we must give it permissions to
|
||||
@@ -1949,7 +1842,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
std::tie(schema_mutations, group0_guard) = co_await std::move(mc).extract();
|
||||
try {
|
||||
co_await _mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
|
||||
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
|
||||
break;
|
||||
} catch (const service::group0_concurrent_modification& ex) {
|
||||
elogger.info("Failed to execute CreateTable {} due to concurrent schema modifications. {}.",
|
||||
@@ -1961,9 +1854,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
|
||||
}
|
||||
}
|
||||
|
||||
co_await _mm.wait_for_schema_agreement(_proxy.local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
|
||||
rjson::value status = rjson::empty_object();
|
||||
executor::supplement_table_info(request, *schema, _proxy);
|
||||
executor::supplement_table_info(request, *schema, sp);
|
||||
rjson::add(status, "TableDescription", std::move(request));
|
||||
co_return rjson::print(std::move(status));
|
||||
}
|
||||
@@ -1972,11 +1865,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
_stats.api_operations.create_table++;
|
||||
elogger.trace("Creating table {}", request);
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
|
||||
// `invoke_on` hopped us to shard 0, but `this` points to `executor` is from 'old' shard, we need to hop it too.
|
||||
co_return co_await e.local().create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), enforce_authorization, warn_authorization, std::move(tablets_mode));
|
||||
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1995,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
std::string def_type = type_to_string(def.type);
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
if (type != def_type) {
|
||||
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
|
||||
}
|
||||
@@ -2127,13 +2019,6 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
co_return api_error::validation(fmt::format(
|
||||
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
|
||||
}
|
||||
try {
|
||||
locator::assert_rf_rack_valid_keyspace(keyspace_name, p.local().local_db().get_token_metadata_ptr(),
|
||||
p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy());
|
||||
} catch (const std::invalid_argument& ex) {
|
||||
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes on a table "
|
||||
"using tablets require the number of racks in the cluster to be either 1 or 3"));
|
||||
}
|
||||
|
||||
elogger.trace("Adding GSI {}", index_name);
|
||||
// FIXME: read and handle "Projection" parameter. This will
|
||||
@@ -2338,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
|
||||
|
||||
// The put_or_delete_item class builds the mutations needed by the PutItem and
|
||||
// DeleteItem operations - either as stand-alone commands or part of a list
|
||||
// of commands in BatchWriteItem.
|
||||
// of commands in BatchWriteItems.
|
||||
// put_or_delete_item splits each operation into two stages: Constructing the
|
||||
// object parses and validates the user input (throwing exceptions if there
|
||||
// are input errors). Later, build() generates the actual mutation, with a
|
||||
// specified timestamp. This split is needed because of the peculiar needs of
|
||||
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
|
||||
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
|
||||
// any writing happens (if one of the commands has an error, none of the
|
||||
// writes should be done). LWT makes it impossible for the parse step to
|
||||
// generate "mutation" objects, because the timestamp still isn't known.
|
||||
@@ -2436,7 +2321,7 @@ std::unordered_map<bytes, std::string> si_key_attributes(data_dictionary::table
|
||||
// case, this function simply won't be called for this attribute.)
|
||||
//
|
||||
// This function checks if the given attribute update is an update to some
|
||||
// GSI's key, and if the value is unsuitable, an api_error::validation is
|
||||
// GSI's key, and if the value is unsuitable, a api_error::validation is
|
||||
// thrown. The checking here is similar to the checking done in
|
||||
// get_key_from_typed_value() for the base table's key columns.
|
||||
//
|
||||
@@ -2477,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
_cells = std::vector<cell>();
|
||||
_cells->reserve(item.MemberCount());
|
||||
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
validate_value(it->value, "PutItem");
|
||||
const column_definition* cdef = find_attribute(*schema, column_name);
|
||||
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
|
||||
@@ -2838,12 +2723,14 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
}
|
||||
} else if (_write_isolation != write_isolation::LWT_ALWAYS) {
|
||||
std::optional<mutation> m = apply(nullptr, api::new_timestamp(), cdc_opts);
|
||||
throwing_assert(m); // !needs_read_before_write, so apply() did not check a condition
|
||||
SCYLLA_ASSERT(m); // !needs_read_before_write, so apply() did not check a condition
|
||||
return proxy.mutate(utils::chunked_vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes, false, std::move(cdc_opts)).then([this, &wcu_total] () mutable {
|
||||
return rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total);
|
||||
});
|
||||
}
|
||||
throwing_assert(cas_shard);
|
||||
if (!cas_shard) {
|
||||
on_internal_error(elogger, "cas_shard is not set");
|
||||
}
|
||||
// If we're still here, we need to do this write using LWT:
|
||||
global_stats.write_using_lwt++;
|
||||
per_table_stats.write_using_lwt++;
|
||||
@@ -2852,7 +2739,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
auto read_command = needs_read_before_write ?
|
||||
previous_item_read_command(proxy, schema(), _ck, selection) :
|
||||
nullptr;
|
||||
return proxy.cas(schema(), std::move(*cas_shard), *this, read_command, to_partition_ranges(*schema(), _pk),
|
||||
return proxy.cas(schema(), std::move(*cas_shard), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk),
|
||||
{timeout, std::move(permit), client_state, trace_state},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout, true, std::move(cdc_opts)).then([this, read_command, &wcu_total] (bool is_applied) mutable {
|
||||
if (!is_applied) {
|
||||
@@ -2896,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
|
||||
return;
|
||||
}
|
||||
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
|
||||
if (!used.contains(rjson::to_string(it->name))) {
|
||||
if (!used.contains(it->name.GetString())) {
|
||||
throw api_error::validation(
|
||||
format("{} has spurious '{}', not used in {}",
|
||||
field_name, rjson::to_string_view(it->name), operation));
|
||||
field_name, it->name.GetString(), operation));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3113,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
}
|
||||
|
||||
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
|
||||
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
|
||||
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
|
||||
try {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
@@ -3139,20 +3026,17 @@ struct primary_key_equal {
|
||||
};
|
||||
|
||||
// This is a cas_request subclass for applying given put_or_delete_items to
|
||||
// one partition using LWT as part as BatchWriteItem. This is a write-only
|
||||
// one partition using LWT as part as BatchWriteItems. This is a write-only
|
||||
// operation, not needing the previous value of the item (the mutation to be
|
||||
// done is known prior to starting the operation). Nevertheless, we want to
|
||||
// do this mutation via LWT to ensure that it is serialized with other LWT
|
||||
// mutations to the same partition.
|
||||
//
|
||||
// The std::vector<put_or_delete_item> must remain alive until the
|
||||
// storage_proxy::cas() future is resolved.
|
||||
class put_or_delete_item_cas_request : public service::cas_request {
|
||||
schema_ptr schema;
|
||||
const std::vector<put_or_delete_item>& _mutation_builders;
|
||||
std::vector<put_or_delete_item> _mutation_builders;
|
||||
public:
|
||||
put_or_delete_item_cas_request(schema_ptr s, const std::vector<put_or_delete_item>& b) :
|
||||
schema(std::move(s)), _mutation_builders(b) { }
|
||||
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
|
||||
schema(std::move(s)), _mutation_builders(std::move(b)) { }
|
||||
virtual ~put_or_delete_item_cas_request() = default;
|
||||
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override {
|
||||
std::optional<mutation> ret;
|
||||
@@ -3168,48 +3052,20 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit)
|
||||
{
|
||||
if (!cas_shard.this_shard()) {
|
||||
_stats.shard_bounce_for_lwt++;
|
||||
return container().invoke_on(cas_shard.shard(), _ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
&mb = mutation_builders,
|
||||
&dk,
|
||||
ks = schema->ks_name(),
|
||||
cf = schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(executor& self) mutable {
|
||||
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt), &self]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders,
|
||||
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
|
||||
auto timeout = executor::default_timeout();
|
||||
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
|
||||
auto* op_ptr = op.get();
|
||||
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
|
||||
auto cdc_opts = cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility =
|
||||
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
};
|
||||
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
|
||||
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
|
||||
{timeout, std::move(permit), client_state, trace_state},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
|
||||
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
|
||||
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
|
||||
timeout, timeout, true, std::move(cdc_opts)).discard_result();
|
||||
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
|
||||
// does not need to support conditional updates.
|
||||
}
|
||||
|
||||
@@ -3231,11 +3087,13 @@ struct schema_decorated_key_equal {
|
||||
|
||||
// FIXME: if we failed writing some of the mutations, need to return a list
|
||||
// of these failed mutations rather than fail the whole write (issue #5650).
|
||||
future<> executor::do_batch_write(
|
||||
static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
smp_service_group ssg,
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit) {
|
||||
service_permit permit,
|
||||
stats& stats) {
|
||||
if (mutation_builders.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -3257,7 +3115,7 @@ future<> executor::do_batch_write(
|
||||
mutations.push_back(b.second.build(b.first, now));
|
||||
any_cdc_enabled |= b.first->cdc_options().enabled();
|
||||
}
|
||||
return _proxy.mutate(std::move(mutations),
|
||||
return proxy.mutate(std::move(mutations),
|
||||
db::consistency_level::LOCAL_QUORUM,
|
||||
executor::default_timeout(),
|
||||
trace_state,
|
||||
@@ -3266,48 +3124,55 @@ future<> executor::do_batch_write(
|
||||
false,
|
||||
cdc::per_request_options{
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
});
|
||||
} else {
|
||||
// Do the write via LWT:
|
||||
// Multiple mutations may be destined for the same partition, adding
|
||||
// or deleting different items of one partition. Join them together
|
||||
// because we can do them in one cas() call.
|
||||
using map_type = std::unordered_map<schema_decorated_key,
|
||||
std::vector<put_or_delete_item>,
|
||||
schema_decorated_key_hash,
|
||||
schema_decorated_key_equal>;
|
||||
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
|
||||
for (auto&& b : std::move(mutation_builders)) {
|
||||
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
|
||||
.schema = b.first,
|
||||
.dk = dht::decorate_key(*b.first, b.second.pk())
|
||||
});
|
||||
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
|
||||
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
|
||||
for (auto& b : mutation_builders) {
|
||||
auto dk = dht::decorate_key(*b.first, b.second.pk());
|
||||
auto [it, added] = key_builders.try_emplace(schema_decorated_key{b.first, dk});
|
||||
it->second.push_back(std::move(b.second));
|
||||
}
|
||||
auto* key_builders_ptr = key_builders.get();
|
||||
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
|
||||
_stats.write_using_lwt++;
|
||||
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
|
||||
stats.write_using_lwt++;
|
||||
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
|
||||
auto s = e.first.schema;
|
||||
if (desired_shard.this_shard()) {
|
||||
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, std::move(e.second), client_state, trace_state, permit);
|
||||
} else {
|
||||
stats.shard_bounce_for_lwt++;
|
||||
return proxy.container().invoke_on(desired_shard.shard(), ssg,
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
mb = e.second,
|
||||
dk = e.first.dk,
|
||||
ks = e.first.schema->ks_name(),
|
||||
cf = e.first.schema->cf_name(),
|
||||
gt = tracing::global_trace_state_ptr(trace_state),
|
||||
permit = std::move(permit)]
|
||||
(service::storage_proxy& proxy) mutable {
|
||||
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
|
||||
trace_state = tracing::trace_state_ptr(gt)]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = proxy.data_dictionary().find_schema(ks, cf);
|
||||
|
||||
static const auto* injection_name = "alternator_executor_batch_write_wait";
|
||||
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
|
||||
const auto ks = handler.get("keyspace");
|
||||
const auto cf = handler.get("table");
|
||||
const auto shard = std::atoll(handler.get("shard")->data());
|
||||
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
|
||||
elogger.info("{}: hit", injection_name);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
elogger.info("{}: continue", injection_name);
|
||||
}
|
||||
}).then([&e, desired_shard = std::move(desired_shard),
|
||||
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
|
||||
{
|
||||
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
|
||||
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
|
||||
});
|
||||
}).finally([key_builders = std::move(key_builders)]{});
|
||||
// The desired_shard on the original shard remains alive for the duration
|
||||
// of cas_write on this shard and prevents any tablet operations.
|
||||
// However, we need a local instance of cas_shard on this shard
|
||||
// to pass it to sp::cas, so we just create a new one.
|
||||
service::cas_shard cas_shard(*schema, dk.token());
|
||||
|
||||
//FIXME: Instead of passing empty_service_permit() to the background operation,
|
||||
// the current permit's lifetime should be prolonged, so that it's destructed
|
||||
// only after all background operations are finished as well.
|
||||
return cas_write(proxy, schema, std::move(cas_shard), dk, std::move(mb), client_state, std::move(trace_state), empty_service_permit());
|
||||
});
|
||||
}).finally([desired_shard = std::move(desired_shard)]{});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3454,7 +3319,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
|
||||
_stats.api_operations.batch_write_item_batch_total += total_items;
|
||||
_stats.api_operations.batch_write_item_histogram.add(total_items);
|
||||
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
|
||||
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
|
||||
// FIXME: Issue #5650: If we failed writing some of the updates,
|
||||
// need to return a list of these failed updates in UnprocessedItems
|
||||
// rather than fail the whole write (issue #5650).
|
||||
@@ -3463,11 +3328,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
if (should_add_wcu) {
|
||||
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
|
||||
}
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_write_item_latency.mark(duration);
|
||||
for (const auto& w : per_table_wcu) {
|
||||
w.first->api_operations.batch_write_item_latency.mark(duration);
|
||||
}
|
||||
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
co_return rjson::print(std::move(ret));
|
||||
}
|
||||
|
||||
@@ -3503,7 +3364,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
|
||||
}
|
||||
rjson::value newv = rjson::empty_object();
|
||||
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
|
||||
std::string attr = rjson::to_string(it->name);
|
||||
std::string attr = it->name.GetString();
|
||||
auto x = members.find(attr);
|
||||
if (x != members.end()) {
|
||||
if (x->second) {
|
||||
@@ -3551,7 +3412,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
|
||||
return true;
|
||||
}
|
||||
|
||||
// Add a path to an attribute_path_map. Throws a validation error if the path
|
||||
// Add a path to a attribute_path_map. Throws a validation error if the path
|
||||
// "overlaps" with one already in the filter (one is a sub-path of the other)
|
||||
// or "conflicts" with it (both a member and index is requested).
|
||||
template<typename T>
|
||||
@@ -3723,7 +3584,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
|
||||
const rjson::value& attributes_to_get = req["AttributesToGet"];
|
||||
attrs_to_get ret;
|
||||
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
|
||||
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
|
||||
attribute_path_map_add("AttributesToGet", ret, it->GetString());
|
||||
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
|
||||
}
|
||||
if (ret.empty()) {
|
||||
@@ -4389,12 +4250,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
|
||||
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
|
||||
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
|
||||
// Note that it.key() is the name of the column, *it is the operation
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
const column_definition* cdef = _schema->get_column_definition(column_name);
|
||||
if (cdef && cdef->is_primary_key()) {
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
|
||||
}
|
||||
std::string action = rjson::to_string((it->value)["Action"]);
|
||||
std::string action = (it->value)["Action"].GetString();
|
||||
if (action == "DELETE") {
|
||||
// The DELETE operation can do two unrelated tasks. Without a
|
||||
// "Value" option, it is used to delete an attribute. With a
|
||||
@@ -4978,12 +4839,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
if (!some_succeeded && eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::move(eptr));
|
||||
}
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_get_item_latency.mark(duration);
|
||||
for (const table_requests& rs : requests) {
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
|
||||
}
|
||||
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
if (is_big(response)) {
|
||||
co_return make_streamed(std::move(response));
|
||||
} else {
|
||||
@@ -5421,7 +5277,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
|
||||
}
|
||||
|
||||
static dht::token token_for_segment(int segment, int total_segments) {
|
||||
throwing_assert(total_segments > 1 && segment >= 0 && segment < total_segments);
|
||||
SCYLLA_ASSERT(total_segments > 1 && segment >= 0 && segment < total_segments);
|
||||
uint64_t delta = std::numeric_limits<uint64_t>::max() / total_segments;
|
||||
return dht::token::from_int64(std::numeric_limits<int64_t>::min() + delta * segment);
|
||||
}
|
||||
@@ -5596,7 +5452,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
std::vector<query::clustering_range> ck_bounds;
|
||||
|
||||
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
|
||||
sstring key = rjson::to_sstring(it->name);
|
||||
std::string key = it->name.GetString();
|
||||
const rjson::value& condition = it->value;
|
||||
|
||||
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
|
||||
@@ -5604,13 +5460,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
|
||||
const column_definition& pk_cdef = schema->partition_key_columns().front();
|
||||
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
|
||||
if (key == pk_cdef.name_as_text()) {
|
||||
if (sstring(key) == pk_cdef.name_as_text()) {
|
||||
if (!partition_ranges.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
|
||||
}
|
||||
if (ck_cdef && key == ck_cdef->name_as_text()) {
|
||||
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
|
||||
if (!ck_bounds.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
@@ -6009,14 +5865,9 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
_stats.api_operations.list_tables++;
|
||||
elogger.trace("Listing tables {}", request);
|
||||
|
||||
co_await utils::get_local_injector().inject("alternator_list_tables", [] (auto& handler) -> future<> {
|
||||
handler.set("waiting", true);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
});
|
||||
|
||||
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
|
||||
rjson::value* limit_json = rjson::find(request, "Limit");
|
||||
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
|
||||
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
|
||||
int limit = limit_json ? limit_json->GetInt() : 100;
|
||||
if (limit < 1 || limit > 100) {
|
||||
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");
|
||||
@@ -6205,10 +6056,9 @@ future<> executor::start() {
|
||||
}
|
||||
|
||||
future<> executor::stop() {
|
||||
co_await _describe_table_info_manager->stop();
|
||||
// disconnect from the value source, but keep the value unchanged.
|
||||
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
|
||||
co_await _parsed_expression_cache->stop();
|
||||
return _parsed_expression_cache->stop();
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -17,13 +17,11 @@
|
||||
#include "service/client_state.hh"
|
||||
#include "service_permit.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include "alternator/error.hh"
|
||||
#include "stats.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/simple_value_with_expiry.hh"
|
||||
|
||||
#include "tracing/trace_state.hh"
|
||||
|
||||
@@ -42,8 +40,6 @@ namespace cql3::selection {
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class cas_shard;
|
||||
class storage_service;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
@@ -60,9 +56,7 @@ class schema_builder;
|
||||
|
||||
namespace alternator {
|
||||
|
||||
enum class table_status;
|
||||
class rmw_operation;
|
||||
class put_or_delete_item;
|
||||
|
||||
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
|
||||
bool is_alternator_keyspace(const sstring& ks_name);
|
||||
@@ -140,7 +134,6 @@ class expression_cache;
|
||||
|
||||
class executor : public peering_sharded_service<executor> {
|
||||
gms::gossiper& _gossiper;
|
||||
service::storage_service& _ss;
|
||||
service::storage_proxy& _proxy;
|
||||
service::migration_manager& _mm;
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
@@ -153,11 +146,6 @@ class executor : public peering_sharded_service<executor> {
|
||||
|
||||
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
|
||||
|
||||
struct describe_table_info_manager;
|
||||
std::unique_ptr<describe_table_info_manager> _describe_table_info_manager;
|
||||
|
||||
future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl);
|
||||
future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting);
|
||||
public:
|
||||
using client_state = service::client_state;
|
||||
// request_return_type is the return type of the executor methods, which
|
||||
@@ -183,7 +171,6 @@ public:
|
||||
|
||||
executor(gms::gossiper& gossiper,
|
||||
service::storage_proxy& proxy,
|
||||
service::storage_service& ss,
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
cdc::metadata& cdc_metadata,
|
||||
@@ -231,18 +218,6 @@ private:
|
||||
friend class rmw_operation;
|
||||
|
||||
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
|
||||
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode);
|
||||
|
||||
future<> do_batch_write(
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
service_permit permit);
|
||||
|
||||
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
|
||||
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
|
||||
tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
|
||||
public:
|
||||
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "expressions.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
/*
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -50,7 +50,7 @@ public:
|
||||
_operators.emplace_back(i);
|
||||
check_depth_limit();
|
||||
}
|
||||
void add_dot(std::string name) {
|
||||
void add_dot(std::string(name)) {
|
||||
_operators.emplace_back(std::move(name));
|
||||
check_depth_limit();
|
||||
}
|
||||
@@ -85,7 +85,7 @@ struct constant {
|
||||
}
|
||||
};
|
||||
|
||||
// "value" is a value used in the right hand side of an assignment
|
||||
// "value" is is a value used in the right hand side of an assignment
|
||||
// expression, "SET a = ...". It can be a constant (a reference to a value
|
||||
// included in the request, e.g., ":val"), a path to an attribute from the
|
||||
// existing item (e.g., "a.b[3].c"), or a function of other such values.
|
||||
@@ -205,7 +205,7 @@ public:
|
||||
// The supported primitive conditions are:
|
||||
// 1. Binary operators - v1 OP v2, where OP is =, <>, <, <=, >, or >= and
|
||||
// v1 and v2 are values - from the item (an attribute path), the query
|
||||
// (a ":val" reference), or a function of the above (only the size()
|
||||
// (a ":val" reference), or a function of the the above (only the size()
|
||||
// function is supported).
|
||||
// 2. Ternary operator - v1 BETWEEN v2 and v3 (means v1 >= v2 AND v1 <= v3).
|
||||
// 3. N-ary operator - v1 IN ( v2, v3, ... )
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -1,301 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include "alternator/http_compression.hh"
|
||||
#include "alternator/server.hh"
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <zlib.h>
|
||||
|
||||
static logging::logger slogger("alternator-http-compression");
|
||||
|
||||
namespace alternator {
|
||||
|
||||
|
||||
static constexpr size_t compressed_buffer_size = 1024;
|
||||
class zlib_compressor {
|
||||
z_stream _zs;
|
||||
temporary_buffer<char> _output_buf;
|
||||
noncopyable_function<future<>(temporary_buffer<char>&&)> _write_func;
|
||||
public:
|
||||
zlib_compressor(bool gzip, int compression_level, noncopyable_function<future<>(temporary_buffer<char>&&)> write_func)
|
||||
: _write_func(std::move(write_func)) {
|
||||
memset(&_zs, 0, sizeof(_zs));
|
||||
if (deflateInit2(&_zs, std::clamp(compression_level, Z_NO_COMPRESSION, Z_BEST_COMPRESSION), Z_DEFLATED,
|
||||
(gzip ? 16 : 0) + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
|
||||
// Should only happen if memory allocation fails
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
}
|
||||
~zlib_compressor() {
|
||||
deflateEnd(&_zs);
|
||||
}
|
||||
future<> close() {
|
||||
return compress(nullptr, 0, true);
|
||||
}
|
||||
|
||||
future<> compress(const char* buf, size_t len, bool is_last_chunk = false) {
|
||||
_zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(buf));
|
||||
_zs.avail_in = (uInt) len;
|
||||
int mode = is_last_chunk ? Z_FINISH : Z_NO_FLUSH;
|
||||
while(_zs.avail_in > 0 || is_last_chunk) {
|
||||
co_await coroutine::maybe_yield();
|
||||
if (_output_buf.empty()) {
|
||||
if (is_last_chunk) {
|
||||
uint32_t max_buffer_size = 0;
|
||||
deflatePending(&_zs, &max_buffer_size, nullptr);
|
||||
max_buffer_size += deflateBound(&_zs, _zs.avail_in) + 1;
|
||||
_output_buf = temporary_buffer<char>(std::min(compressed_buffer_size, (size_t) max_buffer_size));
|
||||
} else {
|
||||
_output_buf = temporary_buffer<char>(compressed_buffer_size);
|
||||
}
|
||||
_zs.next_out = reinterpret_cast<unsigned char*>(_output_buf.get_write());
|
||||
_zs.avail_out = compressed_buffer_size;
|
||||
}
|
||||
int e = deflate(&_zs, mode);
|
||||
if (e < Z_OK) {
|
||||
throw api_error::internal("Error during compression of response body");
|
||||
}
|
||||
if (e == Z_STREAM_END || _zs.avail_out < compressed_buffer_size / 4) {
|
||||
_output_buf.trim(compressed_buffer_size - _zs.avail_out);
|
||||
co_await _write_func(std::move(_output_buf));
|
||||
if (e == Z_STREAM_END) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Helper string_view functions for parsing Accept-Encoding header
|
||||
struct case_insensitive_cmp_sv {
|
||||
bool operator()(std::string_view s1, std::string_view s2) const {
|
||||
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(),
|
||||
[](char a, char b) { return ::tolower(a) == ::tolower(b); });
|
||||
}
|
||||
};
|
||||
static inline std::string_view trim_left(std::string_view sv) {
|
||||
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.front())))
|
||||
sv.remove_prefix(1);
|
||||
return sv;
|
||||
}
|
||||
static inline std::string_view trim_right(std::string_view sv) {
|
||||
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.back())))
|
||||
sv.remove_suffix(1);
|
||||
return sv;
|
||||
}
|
||||
static inline std::string_view trim(std::string_view sv) {
|
||||
return trim_left(trim_right(sv));
|
||||
}
|
||||
|
||||
inline std::vector<std::string_view> split(std::string_view text, char separator) {
|
||||
std::vector<std::string_view> tokens;
|
||||
if (text == "") {
|
||||
return tokens;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto pos = text.find_first_of(separator);
|
||||
if (pos != std::string_view::npos) {
|
||||
tokens.emplace_back(text.data(), pos);
|
||||
text.remove_prefix(pos + 1);
|
||||
} else {
|
||||
tokens.emplace_back(text);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
constexpr response_compressor::compression_type response_compressor::get_compression_type(std::string_view encoding) {
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::count); ++i) {
|
||||
if (case_insensitive_cmp_sv{}(encoding, compression_names[i])) {
|
||||
return static_cast<compression_type>(i);
|
||||
}
|
||||
}
|
||||
return compression_type::unknown;
|
||||
}
|
||||
|
||||
response_compressor::compression_type response_compressor::find_compression(std::string_view accept_encoding, size_t response_size) {
|
||||
std::optional<float> ct_q[static_cast<size_t>(compression_type::count)];
|
||||
ct_q[static_cast<size_t>(compression_type::none)] = std::numeric_limits<float>::min(); // enabled, but lowest priority
|
||||
compression_type selected_ct = compression_type::none;
|
||||
|
||||
std::vector<std::string_view> entries = split(accept_encoding, ',');
|
||||
for (auto& e : entries) {
|
||||
std::vector<std::string_view> params = split(e, ';');
|
||||
if (params.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
compression_type ct = get_compression_type(trim(params[0]));
|
||||
if (ct == compression_type::unknown) {
|
||||
continue; // ignore unknown encoding types
|
||||
}
|
||||
if (ct_q[static_cast<size_t>(ct)].has_value() && ct_q[static_cast<size_t>(ct)] != 0.0f) {
|
||||
continue; // already processed this encoding
|
||||
}
|
||||
if (response_size < _threshold[static_cast<size_t>(ct)]) {
|
||||
continue; // below threshold treat as unknown
|
||||
}
|
||||
for (size_t i = 1; i < params.size(); ++i) { // find "q=" parameter
|
||||
auto pos = params[i].find("q=");
|
||||
if (pos == std::string_view::npos) {
|
||||
continue;
|
||||
}
|
||||
std::string_view param = params[i].substr(pos + 2);
|
||||
param = trim(param);
|
||||
// parse quality value
|
||||
float q_value = 1.0f;
|
||||
auto [ptr, ec] = std::from_chars(param.data(), param.data() + param.size(), q_value);
|
||||
if (ec != std::errc() || ptr != param.data() + param.size()) {
|
||||
continue;
|
||||
}
|
||||
if (q_value < 0.0) {
|
||||
q_value = 0.0;
|
||||
} else if (q_value > 1.0) {
|
||||
q_value = 1.0;
|
||||
}
|
||||
ct_q[static_cast<size_t>(ct)] = q_value;
|
||||
break; // we parsed quality value
|
||||
}
|
||||
if (!ct_q[static_cast<size_t>(ct)].has_value()) {
|
||||
ct_q[static_cast<size_t>(ct)] = 1.0f; // default quality value
|
||||
}
|
||||
// keep the highest encoding (in the order, unless 'any')
|
||||
if (selected_ct == compression_type::any) {
|
||||
if (ct_q[static_cast<size_t>(ct)] >= ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = ct;
|
||||
}
|
||||
} else {
|
||||
if (ct_q[static_cast<size_t>(ct)] > ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = ct;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (selected_ct == compression_type::any) {
|
||||
// select any not mentioned or highest quality
|
||||
selected_ct = compression_type::none;
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
|
||||
if (!ct_q[i].has_value()) {
|
||||
return static_cast<compression_type>(i);
|
||||
}
|
||||
if (ct_q[i] > ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = static_cast<compression_type>(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return selected_ct;
|
||||
}
|
||||
|
||||
static future<chunked_content> compress(response_compressor::compression_type ct, const db::config& cfg, std::string str) {
|
||||
chunked_content compressed;
|
||||
auto write = [&compressed](temporary_buffer<char>&& buf) -> future<> {
|
||||
compressed.push_back(std::move(buf));
|
||||
return make_ready_future<>();
|
||||
};
|
||||
zlib_compressor compressor(ct != response_compressor::compression_type::deflate,
|
||||
cfg.alternator_response_gzip_compression_level(), std::move(write));
|
||||
co_await compressor.compress(str.data(), str.size(), true);
|
||||
co_return compressed;
|
||||
}
|
||||
|
||||
static sstring flatten(chunked_content&& cc) {
|
||||
size_t total_size = 0;
|
||||
for (const auto& chunk : cc) {
|
||||
total_size += chunk.size();
|
||||
}
|
||||
sstring result = sstring{ sstring::initialized_later{}, total_size };
|
||||
size_t offset = 0;
|
||||
for (const auto& chunk : cc) {
|
||||
std::copy(chunk.begin(), chunk.end(), result.begin() + offset);
|
||||
offset += chunk.size();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, std::string&& response_body) {
|
||||
response_compressor::compression_type ct = find_compression(accept_encoding, response_body.size());
|
||||
if (ct != response_compressor::compression_type::none) {
|
||||
rep->add_header("Content-Encoding", get_encoding_name(ct));
|
||||
rep->set_content_type(content_type);
|
||||
return compress(ct, cfg, std::move(response_body)).then([rep = std::move(rep)] (chunked_content compressed) mutable {
|
||||
rep->_content = flatten(std::move(compressed));
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
});
|
||||
} else {
|
||||
// Note that despite the move, there is a copy here -
|
||||
// as str is std::string and rep->_content is sstring.
|
||||
rep->_content = std::move(response_body);
|
||||
rep->set_content_type(content_type);
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
}
|
||||
|
||||
template<typename Compressor>
|
||||
class compressed_data_sink_impl : public data_sink_impl {
|
||||
output_stream<char> _out;
|
||||
Compressor _compressor;
|
||||
public:
|
||||
template<typename... Args>
|
||||
compressed_data_sink_impl(output_stream<char>&& out, Args&&... args)
|
||||
: _out(std::move(out)), _compressor(std::forward<Args>(args)..., [this](temporary_buffer<char>&& buf) {
|
||||
return _out.write(std::move(buf));
|
||||
}) { }
|
||||
|
||||
future<> put(std::span<temporary_buffer<char>> data) override {
|
||||
return data_sink_impl::fallback_put(data, [this] (temporary_buffer<char>&& buf) {
|
||||
return do_put(std::move(buf));
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
future<> do_put(temporary_buffer<char> buf) {
|
||||
co_return co_await _compressor.compress(buf.get(), buf.size());
|
||||
|
||||
}
|
||||
future<> close() override {
|
||||
return _compressor.close().then([this] {
|
||||
return _out.close();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
|
||||
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
|
||||
output_stream_options opts;
|
||||
opts.trim_to_size = true;
|
||||
std::unique_ptr<data_sink_impl> data_sink_impl;
|
||||
switch (ct) {
|
||||
case response_compressor::compression_type::gzip:
|
||||
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), true, level);
|
||||
break;
|
||||
case response_compressor::compression_type::deflate:
|
||||
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), false, level);
|
||||
break;
|
||||
case response_compressor::compression_type::none:
|
||||
case response_compressor::compression_type::any:
|
||||
case response_compressor::compression_type::unknown:
|
||||
on_internal_error(slogger,"Compression not selected");
|
||||
default:
|
||||
on_internal_error(slogger, "Unsupported compression type for data sink");
|
||||
}
|
||||
return bw(output_stream<char>(data_sink(std::move(data_sink_impl)), compressed_buffer_size, opts));
|
||||
};
|
||||
}
|
||||
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
|
||||
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
|
||||
if (ct != response_compressor::compression_type::none) {
|
||||
rep->add_header("Content-Encoding", get_encoding_name(ct));
|
||||
rep->write_body(content_type, compress(ct, cfg, std::move(body_writer)));
|
||||
} else {
|
||||
rep->write_body(content_type, std::move(body_writer));
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
@@ -1,91 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "alternator/executor.hh"
|
||||
#include <seastar/http/httpd.hh>
|
||||
#include "db/config.hh"
|
||||
|
||||
namespace alternator {
|
||||
|
||||
class response_compressor {
|
||||
public:
|
||||
enum class compression_type {
|
||||
gzip,
|
||||
deflate,
|
||||
compressions_count,
|
||||
any = compressions_count,
|
||||
none,
|
||||
count,
|
||||
unknown = count
|
||||
};
|
||||
static constexpr std::string_view compression_names[] = {
|
||||
"gzip",
|
||||
"deflate",
|
||||
"*",
|
||||
"identity"
|
||||
};
|
||||
|
||||
static sstring get_encoding_name(compression_type ct) {
|
||||
return sstring(compression_names[static_cast<size_t>(ct)]);
|
||||
}
|
||||
static constexpr compression_type get_compression_type(std::string_view encoding);
|
||||
|
||||
sstring get_accepted_encoding(const http::request& req) {
|
||||
if (get_threshold() == 0) {
|
||||
return "";
|
||||
}
|
||||
return req.get_header("Accept-Encoding");
|
||||
}
|
||||
compression_type find_compression(std::string_view accept_encoding, size_t response_size);
|
||||
|
||||
response_compressor(const db::config& cfg)
|
||||
: cfg(cfg)
|
||||
,_gzip_level_observer(
|
||||
cfg.alternator_response_gzip_compression_level.observe([this](int v) {
|
||||
update_threshold();
|
||||
}))
|
||||
,_gzip_threshold_observer(
|
||||
cfg.alternator_response_compression_threshold_in_bytes.observe([this](uint32_t v) {
|
||||
update_threshold();
|
||||
}))
|
||||
{
|
||||
update_threshold();
|
||||
}
|
||||
response_compressor(const response_compressor& rhs) : response_compressor(rhs.cfg) {}
|
||||
|
||||
private:
|
||||
const db::config& cfg;
|
||||
utils::observable<int>::observer _gzip_level_observer;
|
||||
utils::observable<uint32_t>::observer _gzip_threshold_observer;
|
||||
uint32_t _threshold[static_cast<size_t>(compression_type::count)];
|
||||
|
||||
size_t get_threshold() { return _threshold[static_cast<size_t>(compression_type::any)]; }
|
||||
void update_threshold() {
|
||||
_threshold[static_cast<size_t>(compression_type::none)] = std::numeric_limits<uint32_t>::max();
|
||||
_threshold[static_cast<size_t>(compression_type::any)] = std::numeric_limits<uint32_t>::max();
|
||||
uint32_t gzip = cfg.alternator_response_gzip_compression_level() <= 0 ? std::numeric_limits<uint32_t>::max()
|
||||
: cfg.alternator_response_compression_threshold_in_bytes();
|
||||
_threshold[static_cast<size_t>(compression_type::gzip)] = gzip;
|
||||
_threshold[static_cast<size_t>(compression_type::deflate)] = gzip;
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
|
||||
if (_threshold[i] < _threshold[static_cast<size_t>(compression_type::any)]) {
|
||||
_threshold[static_cast<size_t>(compression_type::any)] = _threshold[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, std::string&& response_body);
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
|
||||
};
|
||||
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "expressions.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "utils/base64.hh"
|
||||
@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
|
||||
return {"", nullptr};
|
||||
}
|
||||
auto it = v.MemberBegin();
|
||||
const std::string it_key = rjson::to_string(it->name);
|
||||
const std::string it_key = it->name.GetString();
|
||||
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
|
||||
return {std::move(it_key), nullptr};
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -55,7 +55,7 @@ partition_key pk_from_json(const rjson::value& item, schema_ptr schema);
|
||||
clustering_key ck_from_json(const rjson::value& item, schema_ptr schema);
|
||||
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema);
|
||||
|
||||
// If v encodes a number (i.e., it is a {"N": [...]}), returns an object representing it. Otherwise,
|
||||
// If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise,
|
||||
// raises ValidationException with diagnostic.
|
||||
big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic);
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "alternator/server.hh"
|
||||
@@ -34,7 +34,6 @@
|
||||
#include "client_data.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <zlib.h>
|
||||
#include "alternator/http_compression.hh"
|
||||
|
||||
static logging::logger slogger("alternator-server");
|
||||
|
||||
@@ -112,12 +111,9 @@ class api_handler : public handler_base {
|
||||
// type applies to all replies, both success and error.
|
||||
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
|
||||
public:
|
||||
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle,
|
||||
const db::config& config) : _response_compressor(config), _f_handle(
|
||||
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
|
||||
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
|
||||
sstring accept_encoding = _response_compressor.get_accepted_encoding(*req);
|
||||
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped(
|
||||
[this, rep = std::move(rep), accept_encoding=std::move(accept_encoding)](future<executor::request_return_type> resf) mutable {
|
||||
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
|
||||
if (resf.failed()) {
|
||||
// Exceptions of type api_error are wrapped as JSON and
|
||||
// returned to the client as expected. Other types of
|
||||
@@ -137,20 +133,22 @@ public:
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
}
|
||||
auto res = resf.get();
|
||||
return std::visit(overloaded_functor {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (std::string&& str) {
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(str));
|
||||
// Note that despite the move, there is a copy here -
|
||||
// as str is std::string and rep->_content is sstring.
|
||||
rep->_content = std::move(str);
|
||||
rep->set_content_type(REPLY_CONTENT_TYPE);
|
||||
},
|
||||
[&] (executor::body_writer&& body_writer) {
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(body_writer));
|
||||
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
|
||||
},
|
||||
[&] (const api_error& err) {
|
||||
generate_error_reply(*rep, err);
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
}
|
||||
}, std::move(res));
|
||||
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
});
|
||||
}) { }
|
||||
|
||||
@@ -179,7 +177,6 @@ protected:
|
||||
slogger.trace("api_handler error case: {}", rep._content);
|
||||
}
|
||||
|
||||
response_compressor _response_compressor;
|
||||
future_handler_function _f_handle;
|
||||
};
|
||||
|
||||
@@ -374,45 +371,18 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
for (const auto& header : signed_headers) {
|
||||
signed_headers_map.emplace(header, std::string_view());
|
||||
}
|
||||
std::vector<std::string> modified_values;
|
||||
for (auto& header : req._headers) {
|
||||
std::string header_str;
|
||||
header_str.resize(header.first.size());
|
||||
std::transform(header.first.begin(), header.first.end(), header_str.begin(), ::tolower);
|
||||
auto it = signed_headers_map.find(header_str);
|
||||
if (it != signed_headers_map.end()) {
|
||||
// replace multiple spaces in the header value header.second with
|
||||
// a single space, as required by AWS SigV4 header canonization.
|
||||
// If we modify the value, we need to save it in modified_values
|
||||
// to keep it alive.
|
||||
std::string value;
|
||||
value.reserve(header.second.size());
|
||||
bool prev_space = false;
|
||||
bool modified = false;
|
||||
for (char ch : header.second) {
|
||||
if (ch == ' ') {
|
||||
if (!prev_space) {
|
||||
value += ch;
|
||||
prev_space = true;
|
||||
} else {
|
||||
modified = true; // skip a space
|
||||
}
|
||||
} else {
|
||||
value += ch;
|
||||
prev_space = false;
|
||||
}
|
||||
}
|
||||
if (modified) {
|
||||
modified_values.emplace_back(std::move(value));
|
||||
it->second = std::string_view(modified_values.back());
|
||||
} else {
|
||||
it->second = std::string_view(header.second);
|
||||
}
|
||||
it->second = std::string_view(header.second);
|
||||
}
|
||||
}
|
||||
|
||||
auto cache_getter = [&proxy = _proxy] (std::string username) {
|
||||
return get_key_from_roles(proxy, std::move(username));
|
||||
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
|
||||
return get_key_from_roles(proxy, as, std::move(username));
|
||||
};
|
||||
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
|
||||
user = std::move(user),
|
||||
@@ -420,7 +390,6 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
datestamp = std::move(datestamp),
|
||||
signed_headers_str = std::move(signed_headers_str),
|
||||
signed_headers_map = std::move(signed_headers_map),
|
||||
modified_values = std::move(modified_values),
|
||||
region = std::move(region),
|
||||
service = std::move(service),
|
||||
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
|
||||
@@ -591,11 +560,11 @@ read_entire_stream(input_stream<char>& inp, size_t length_limit) {
|
||||
class safe_gzip_zstream {
|
||||
z_stream _zs;
|
||||
public:
|
||||
// If gzip is true, decode a gzip header (for "Content-Encoding: gzip").
|
||||
// Otherwise, a zlib header (for "Content-Encoding: deflate").
|
||||
safe_gzip_zstream(bool gzip = true) {
|
||||
safe_gzip_zstream() {
|
||||
memset(&_zs, 0, sizeof(_zs));
|
||||
if (inflateInit2(&_zs, gzip ? 16 + MAX_WBITS : MAX_WBITS) != Z_OK) {
|
||||
// The strange 16 + WMAX_BITS tells zlib to expect and decode
|
||||
// a gzip header, not a zlib header.
|
||||
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
|
||||
// Should only happen if memory allocation fails
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
@@ -614,21 +583,19 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// ungzip() takes a chunked_content of a compressed request body, and returns
|
||||
// the uncompressed content as a chunked_content. If gzip is true, we expect
|
||||
// gzip header (for "Content-Encoding: gzip"), if gzip is false, we expect a
|
||||
// zlib header (for "Content-Encoding: deflate").
|
||||
// ungzip() takes a chunked_content with a gzip-compressed request body,
|
||||
// uncompresses it, and returns the uncompressed content as a chunked_content.
|
||||
// If the uncompressed content exceeds length_limit, an error is thrown.
|
||||
static future<chunked_content>
|
||||
ungzip(chunked_content&& compressed_body, size_t length_limit, bool gzip = true) {
|
||||
ungzip(chunked_content&& compressed_body, size_t length_limit) {
|
||||
chunked_content ret;
|
||||
// output_buf can be any size - when uncompressing input_buf, it doesn't
|
||||
// need to fit in a single output_buf, we'll use multiple output_buf for
|
||||
// a single input_buf if needed.
|
||||
constexpr size_t OUTPUT_BUF_SIZE = 4096;
|
||||
temporary_buffer<char> output_buf;
|
||||
safe_gzip_zstream strm(gzip);
|
||||
bool complete_stream = false; // empty input is not a valid gzip/deflate
|
||||
safe_gzip_zstream strm;
|
||||
bool complete_stream = false; // empty input is not a valid gzip
|
||||
size_t total_out_bytes = 0;
|
||||
for (const temporary_buffer<char>& input_buf : compressed_body) {
|
||||
if (input_buf.empty()) {
|
||||
@@ -699,17 +666,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// for such a size.
|
||||
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
|
||||
}
|
||||
// Check the concurrency limit early, before acquiring memory and
|
||||
// reading the request body, to avoid piling up memory from excess
|
||||
// requests that will be rejected anyway. This mirrors the CQL
|
||||
// transport which also checks concurrency before memory acquisition
|
||||
// (transport/server.cc).
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
// JSON parsing can allocate up to roughly 2x the size of the raw
|
||||
// document, + a couple of bytes for maintenance.
|
||||
// If the Content-Length of the request is not available, we assume
|
||||
@@ -721,7 +677,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
++_executor._stats.requests_blocked_memory;
|
||||
}
|
||||
auto units = co_await std::move(units_fut);
|
||||
throwing_assert(req->content_stream);
|
||||
SCYLLA_ASSERT(req->content_stream);
|
||||
chunked_content content = co_await read_entire_stream(*req->content_stream, request_content_length_limit);
|
||||
// If the request had no Content-Length, we reserved too many units
|
||||
// so need to return some
|
||||
@@ -742,8 +698,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
sstring content_encoding = req->get_header("Content-Encoding");
|
||||
if (content_encoding == "gzip") {
|
||||
content = co_await ungzip(std::move(content), request_content_length_limit);
|
||||
} else if (content_encoding == "deflate") {
|
||||
content = co_await ungzip(std::move(content), request_content_length_limit, false);
|
||||
} else if (!content_encoding.empty()) {
|
||||
// DynamoDB returns a 500 error for unsupported Content-Encoding.
|
||||
// I'm not sure if this is the best error code, but let's do it too.
|
||||
@@ -754,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// As long as the system_clients_entry object is alive, this request will
|
||||
// be visible in the "system.clients" virtual table. When requested, this
|
||||
// entry will be formatted by server::ongoing_request::make_client_data().
|
||||
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
|
||||
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
|
||||
});
|
||||
|
||||
auto system_clients_entry = _ongoing_requests.emplace(
|
||||
req->get_client_address(), std::move(user_agent_header),
|
||||
req->get_client_address(), req->get_header("User-Agent"),
|
||||
username, current_scheduling_group(),
|
||||
req->get_protocol_name() == "https");
|
||||
|
||||
@@ -771,12 +721,18 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
_executor._stats.unsupported_operations++;
|
||||
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
|
||||
}
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
executor::client_state client_state(service::client_state::external_tag(),
|
||||
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
|
||||
if (!username.empty()) {
|
||||
client_state.set_login(auth::authenticated_user(username));
|
||||
}
|
||||
client_state.maybe_update_per_service_level_params();
|
||||
co_await client_state.maybe_update_per_service_level_params();
|
||||
|
||||
tracing::trace_state_ptr trace_state = maybe_trace_query(client_state, username, op, content, _max_users_query_size_in_trace_output.get());
|
||||
tracing::trace(trace_state, "{}", op);
|
||||
@@ -798,7 +754,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
void server::set_routes(routes& r) {
|
||||
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
|
||||
return handle_api_request(std::move(req));
|
||||
}, _proxy.data_dictionary().get_config());
|
||||
});
|
||||
|
||||
r.put(operation_type::POST, "/", req_handler);
|
||||
r.put(operation_type::GET, "/", new health_handler(_pending_requests));
|
||||
@@ -909,9 +865,7 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
|
||||
} {
|
||||
}
|
||||
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
|
||||
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
|
||||
std::optional<tls::credentials_builder> creds,
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
|
||||
_memory_limiter = memory_limiter;
|
||||
@@ -919,28 +873,20 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
|
||||
_warn_authorization = std::move(warn_authorization);
|
||||
_max_concurrent_requests = std::move(max_concurrent_requests);
|
||||
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
|
||||
if (!port && !https_port && !port_proxy_protocol && !https_port_proxy_protocol) {
|
||||
if (!port && !https_port) {
|
||||
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
|
||||
" must be specified in order to init an alternator HTTP server instance"));
|
||||
}
|
||||
return seastar::async([this, addr, port, https_port, port_proxy_protocol, https_port_proxy_protocol, creds] {
|
||||
return seastar::async([this, addr, port, https_port, creds] {
|
||||
_executor.start().get();
|
||||
|
||||
if (port || port_proxy_protocol) {
|
||||
if (port) {
|
||||
set_routes(_http_server._routes);
|
||||
_http_server.set_content_streaming(true);
|
||||
if (port) {
|
||||
_http_server.listen(socket_address{addr, *port}).get();
|
||||
}
|
||||
if (port_proxy_protocol) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
lo.proxy_protocol = true;
|
||||
_http_server.listen(socket_address{addr, *port_proxy_protocol}, lo).get();
|
||||
}
|
||||
_http_server.listen(socket_address{addr, *port}).get();
|
||||
_enabled_servers.push_back(std::ref(_http_server));
|
||||
}
|
||||
if (https_port || https_port_proxy_protocol) {
|
||||
if (https_port) {
|
||||
set_routes(_https_server._routes);
|
||||
_https_server.set_content_streaming(true);
|
||||
|
||||
@@ -960,15 +906,7 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
|
||||
} else {
|
||||
_credentials = creds->build_server_credentials();
|
||||
}
|
||||
if (https_port) {
|
||||
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
|
||||
}
|
||||
if (https_port_proxy_protocol) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
lo.proxy_protocol = true;
|
||||
_https_server.listen(socket_address{addr, *https_port_proxy_protocol}, lo, _credentials).get();
|
||||
}
|
||||
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
|
||||
_enabled_servers.push_back(std::ref(_https_server));
|
||||
}
|
||||
});
|
||||
@@ -1041,15 +979,16 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
// and keep "driver_version" unset.
|
||||
cd.driver_name = _user_agent;
|
||||
// Leave "protocol_version" unset, it has no meaning in Alternator.
|
||||
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
|
||||
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
|
||||
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
|
||||
// As reported in issue #9216, we never set these fields in CQL
|
||||
// either (see cql_server::connection::make_client_data()).
|
||||
return cd;
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
|
||||
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
|
||||
future<utils::chunked_vector<client_data>> server::get_client_data() {
|
||||
utils::chunked_vector<client_data> ret;
|
||||
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
|
||||
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
|
||||
ret.emplace_back(r.make_client_data());
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
|
||||
// though it isn't really relevant for Alternator which defines its own
|
||||
// timeouts separately. We can create this object only once.
|
||||
updateable_timeout_config _timeout_config;
|
||||
client_options_cache_type _connection_options_keys_and_values;
|
||||
|
||||
alternator_callbacks_map _callbacks;
|
||||
|
||||
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
|
||||
// is called when reading the "system.clients" virtual table.
|
||||
struct ongoing_request {
|
||||
socket_address _client_address;
|
||||
client_options_cache_entry_type _user_agent;
|
||||
sstring _user_agent;
|
||||
sstring _username;
|
||||
scheduling_group _scheduling_group;
|
||||
bool _is_https;
|
||||
@@ -100,9 +99,7 @@ class server : public peering_sharded_service<server> {
|
||||
public:
|
||||
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
|
||||
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
|
||||
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
|
||||
std::optional<tls::credentials_builder> creds,
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
|
||||
future<> stop();
|
||||
@@ -110,7 +107,7 @@ public:
|
||||
// table "system.clients" is read. It is expected to generate a list of
|
||||
// clients connected to this server (on this shard). This function is
|
||||
// called by alternator::controller::get_client_data().
|
||||
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
private:
|
||||
void set_routes(seastar::httpd::routes& r);
|
||||
// If verification succeeds, returns the authenticated user's username
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "stats.hh"
|
||||
@@ -14,6 +14,20 @@
|
||||
namespace alternator {
|
||||
|
||||
const char* ALTERNATOR_METRICS = "alternator";
|
||||
static seastar::metrics::histogram estimated_histogram_to_metrics(const utils::estimated_histogram& histogram) {
|
||||
seastar::metrics::histogram res;
|
||||
res.buckets.resize(histogram.bucket_offsets.size());
|
||||
uint64_t cumulative_count = 0;
|
||||
res.sample_count = histogram._count;
|
||||
res.sample_sum = histogram._sample_sum;
|
||||
for (size_t i = 0; i < res.buckets.size(); i++) {
|
||||
auto& v = res.buckets[i];
|
||||
v.upper_bound = histogram.bucket_offsets[i];
|
||||
cumulative_count += histogram.buckets[i];
|
||||
v.count = cumulative_count;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static seastar::metrics::label column_family_label("cf");
|
||||
static seastar::metrics::label keyspace_label("ks");
|
||||
@@ -137,21 +151,21 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
|
||||
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"), labels,
|
||||
stats.api_operations.batch_get_item_batch_total)(op("BatchGetItem")).aggregate(aggregate_labels).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("batch_item_count_histogram", seastar::metrics::description("Histogram of the number of items in a batch request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.api_operations.batch_get_item_histogram);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.api_operations.batch_get_item_histogram);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("batch_item_count_histogram", seastar::metrics::description("Histogram of the number of items in a batch request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.api_operations.batch_write_item_histogram);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.api_operations.batch_write_item_histogram);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.operation_sizes.get_item_op_size_kb);})(op("GetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.get_item_op_size_kb);})(op("GetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.operation_sizes.put_item_op_size_kb);})(op("PutItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.put_item_op_size_kb);})(op("PutItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.operation_sizes.delete_item_op_size_kb);})(op("DeleteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.delete_item_op_size_kb);})(op("DeleteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.operation_sizes.update_item_op_size_kb);})(op("UpdateItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.update_item_op_size_kb);})(op("UpdateItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.operation_sizes.batch_get_item_op_size_kb);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.batch_get_item_op_size_kb);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
|
||||
[&stats]{ return to_metrics_histogram(stats.operation_sizes.batch_write_item_op_size_kb);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.batch_write_item_op_size_kb);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
|
||||
});
|
||||
|
||||
seastar::metrics::label expression_label("expression");
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -16,8 +16,6 @@
|
||||
#include "cql3/stats.hh"
|
||||
|
||||
namespace alternator {
|
||||
using batch_histogram = utils::estimated_histogram_with_max<128>;
|
||||
using op_size_histogram = utils::estimated_histogram_with_max<512>;
|
||||
|
||||
// Object holding per-shard statistics related to Alternator.
|
||||
// While this object is alive, these metrics are also registered to be
|
||||
@@ -78,34 +76,34 @@ public:
|
||||
utils::timed_rate_moving_average_summary_and_histogram batch_get_item_latency;
|
||||
utils::timed_rate_moving_average_summary_and_histogram get_records_latency;
|
||||
|
||||
batch_histogram batch_get_item_histogram;
|
||||
batch_histogram batch_write_item_histogram;
|
||||
utils::estimated_histogram batch_get_item_histogram{22}; // a histogram that covers the range 1 - 100
|
||||
utils::estimated_histogram batch_write_item_histogram{22}; // a histogram that covers the range 1 - 100
|
||||
} api_operations;
|
||||
// Operation size metrics
|
||||
struct {
|
||||
// Item size statistics collected per table and aggregated per node.
|
||||
// Each histogram covers the range 0 - 512. Resolves #25143.
|
||||
// Each histogram covers the range 0 - 446. Resolves #25143.
|
||||
// A size is the retrieved item's size.
|
||||
op_size_histogram get_item_op_size_kb;
|
||||
utils::estimated_histogram get_item_op_size_kb{30};
|
||||
// A size is the maximum of the new item's size and the old item's size.
|
||||
op_size_histogram put_item_op_size_kb;
|
||||
utils::estimated_histogram put_item_op_size_kb{30};
|
||||
// A size is the deleted item's size. If the deleted item's size is
|
||||
// unknown (i.e. read-before-write wasn't necessary and it wasn't
|
||||
// forced by a configuration option), it won't be recorded on the
|
||||
// histogram.
|
||||
op_size_histogram delete_item_op_size_kb;
|
||||
utils::estimated_histogram delete_item_op_size_kb{30};
|
||||
// A size is the maximum of existing item's size and the estimated size
|
||||
// of the update. This will be changed to the maximum of the existing item's
|
||||
// size and the new item's size in a subsequent PR.
|
||||
op_size_histogram update_item_op_size_kb;
|
||||
utils::estimated_histogram update_item_op_size_kb{30};
|
||||
|
||||
// A size is the sum of the sizes of all items per table. This means
|
||||
// that a single BatchGetItem / BatchWriteItem updates the histogram
|
||||
// for each table that it has items in.
|
||||
// The sizes are the retrieved items' sizes grouped per table.
|
||||
op_size_histogram batch_get_item_op_size_kb;
|
||||
utils::estimated_histogram batch_get_item_op_size_kb{30};
|
||||
// The sizes are the the written items' sizes grouped per table.
|
||||
op_size_histogram batch_write_item_op_size_kb;
|
||||
utils::estimated_histogram batch_write_item_op_size_kb{30};
|
||||
} operation_sizes;
|
||||
// Count of authentication and authorization failures, counted if either
|
||||
// alternator_enforce_authorization or alternator_warn_authorization are
|
||||
@@ -142,7 +140,7 @@ public:
|
||||
cql3::cql_stats cql_stats;
|
||||
|
||||
// Enumeration of expression types only for stats
|
||||
// if needed it can be extended e.g. per operation
|
||||
// if needed it can be extended e.g. per operation
|
||||
enum expression_types {
|
||||
UPDATE_EXPRESSION,
|
||||
CONDITION_EXPRESSION,
|
||||
@@ -166,7 +164,7 @@ struct table_stats {
|
||||
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats);
|
||||
|
||||
inline uint64_t bytes_to_kb_ceil(uint64_t bytes) {
|
||||
return (bytes) / 1024;
|
||||
return (bytes + 1023) / 1024;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <type_traits>
|
||||
@@ -33,8 +33,6 @@
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
static logging::logger slogger("alternator-streams");
|
||||
|
||||
/**
|
||||
* Base template type to implement rapidjson::internal::TypeHelper<...>:s
|
||||
* for types that are ostreamable/string constructible/castable.
|
||||
@@ -430,25 +428,6 @@ using namespace std::chrono_literals;
|
||||
// Dynamo docs says no data shall live longer than 24h.
|
||||
static constexpr auto dynamodb_streams_max_window = 24h;
|
||||
|
||||
// find the parent shard in previous generation for the given child shard
|
||||
// takes care of wrap-around case in vnodes
|
||||
// prev_streams must be sorted by token
|
||||
const cdc::stream_id& find_parent_shard_in_previous_generation(db_clock::time_point prev_timestamp, const utils::chunked_vector<cdc::stream_id> &prev_streams, const cdc::stream_id &child) {
|
||||
if (prev_streams.empty()) {
|
||||
// something is really wrong - streams are empty
|
||||
// let's try internal_error in hope it will be notified and fixed
|
||||
on_internal_error(slogger, fmt::format("streams are empty for cdc generation at {} ({})", prev_timestamp, prev_timestamp.time_since_epoch().count()));
|
||||
}
|
||||
auto it = std::lower_bound(prev_streams.begin(), prev_streams.end(), child.token(), [](const cdc::stream_id& id, const dht::token& t) {
|
||||
return id.token() < t;
|
||||
});
|
||||
if (it == prev_streams.end()) {
|
||||
// wrap around case - take first
|
||||
it = prev_streams.begin();
|
||||
}
|
||||
return *it;
|
||||
}
|
||||
|
||||
future<executor::request_return_type> executor::describe_stream(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.describe_stream++;
|
||||
|
||||
@@ -512,7 +491,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
|
||||
if (!opts.enabled()) {
|
||||
rjson::add(ret, "StreamDescription", std::move(stream_desc));
|
||||
co_return rjson::print(std::move(ret));
|
||||
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
||||
}
|
||||
|
||||
// TODO: label
|
||||
@@ -523,113 +502,123 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
|
||||
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
|
||||
|
||||
std::map<db_clock::time_point, cdc::streams_version> topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
|
||||
auto e = topologies.end();
|
||||
auto prev = e;
|
||||
auto shards = rjson::empty_array();
|
||||
return _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }).then([db, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc)] (std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
|
||||
|
||||
std::optional<shard_id> last;
|
||||
auto e = topologies.end();
|
||||
auto prev = e;
|
||||
auto shards = rjson::empty_array();
|
||||
|
||||
auto i = topologies.begin();
|
||||
// if we're a paged query, skip to the generation where we left of.
|
||||
if (shard_start) {
|
||||
i = topologies.find(shard_start->time);
|
||||
}
|
||||
std::optional<shard_id> last;
|
||||
|
||||
// for parent-child stuff we need id:s to be sorted by token
|
||||
// (see explanation above) since we want to find closest
|
||||
// token boundary when determining parent.
|
||||
// #7346 - we processed and searched children/parents in
|
||||
// stored order, which is not necessarily token order,
|
||||
// so the finding of "closest" token boundary (using upper bound)
|
||||
// could give somewhat weird results.
|
||||
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
|
||||
return id1.token() < id2.token();
|
||||
};
|
||||
auto i = topologies.begin();
|
||||
// if we're a paged query, skip to the generation where we left of.
|
||||
if (shard_start) {
|
||||
i = topologies.find(shard_start->time);
|
||||
}
|
||||
|
||||
// #7409 - shards must be returned in lexicographical order,
|
||||
// normal bytes compare is string_traits<int8_t>::compare.
|
||||
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
|
||||
// compare instead we inadvertently will sort in string lexical.
|
||||
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
|
||||
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
|
||||
};
|
||||
|
||||
// need a prev even if we are skipping stuff
|
||||
if (i != topologies.begin()) {
|
||||
prev = std::prev(i);
|
||||
}
|
||||
|
||||
for (; limit > 0 && i != e; prev = i, ++i) {
|
||||
auto& [ts, sv] = *i;
|
||||
|
||||
last = std::nullopt;
|
||||
|
||||
auto lo = sv.streams.begin();
|
||||
auto end = sv.streams.end();
|
||||
// for parent-child stuff we need id:s to be sorted by token
|
||||
// (see explanation above) since we want to find closest
|
||||
// token boundary when determining parent.
|
||||
// #7346 - we processed and searched children/parents in
|
||||
// stored order, which is not necessarily token order,
|
||||
// so the finding of "closest" token boundary (using upper bound)
|
||||
// could give somewhat weird results.
|
||||
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
|
||||
return id1.token() < id2.token();
|
||||
};
|
||||
|
||||
// #7409 - shards must be returned in lexicographical order,
|
||||
std::sort(lo, end, id_cmp);
|
||||
// normal bytes compare is string_traits<int8_t>::compare.
|
||||
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
|
||||
// compare instead we inadvertently will sort in string lexical.
|
||||
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
|
||||
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
|
||||
};
|
||||
|
||||
if (shard_start) {
|
||||
// find next shard position
|
||||
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
|
||||
shard_start = std::nullopt;
|
||||
// need a prev even if we are skipping stuff
|
||||
if (i != topologies.begin()) {
|
||||
prev = std::prev(i);
|
||||
}
|
||||
|
||||
if (lo != end && prev != e) {
|
||||
// We want older stuff sorted in token order so we can find matching
|
||||
// token range when determining parent shard.
|
||||
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
|
||||
}
|
||||
|
||||
auto expired = [&]() -> std::optional<db_clock::time_point> {
|
||||
auto j = std::next(i);
|
||||
if (j == e) {
|
||||
return std::nullopt;
|
||||
}
|
||||
// add this so we sort of match potential
|
||||
// sequence numbers in get_records result.
|
||||
return j->first + confidence_interval(db);
|
||||
}();
|
||||
|
||||
while (lo != end) {
|
||||
auto& id = *lo++;
|
||||
|
||||
auto shard = rjson::empty_object();
|
||||
|
||||
if (prev != e) {
|
||||
auto &pid = find_parent_shard_in_previous_generation(prev->first, prev->second.streams, id);
|
||||
rjson::add(shard, "ParentShardId", shard_id(prev->first, pid));
|
||||
}
|
||||
|
||||
last.emplace(ts, id);
|
||||
rjson::add(shard, "ShardId", *last);
|
||||
auto range = rjson::empty_object();
|
||||
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
|
||||
if (expired) {
|
||||
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
|
||||
}
|
||||
|
||||
rjson::add(shard, "SequenceNumberRange", std::move(range));
|
||||
rjson::push_back(shards, std::move(shard));
|
||||
|
||||
if (--limit == 0) {
|
||||
break;
|
||||
}
|
||||
for (; limit > 0 && i != e; prev = i, ++i) {
|
||||
auto& [ts, sv] = *i;
|
||||
|
||||
last = std::nullopt;
|
||||
|
||||
auto lo = sv.streams.begin();
|
||||
auto end = sv.streams.end();
|
||||
|
||||
// #7409 - shards must be returned in lexicographical order,
|
||||
std::sort(lo, end, id_cmp);
|
||||
|
||||
if (shard_start) {
|
||||
// find next shard position
|
||||
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
|
||||
shard_start = std::nullopt;
|
||||
}
|
||||
|
||||
if (lo != end && prev != e) {
|
||||
// We want older stuff sorted in token order so we can find matching
|
||||
// token range when determining parent shard.
|
||||
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
|
||||
}
|
||||
|
||||
auto expired = [&]() -> std::optional<db_clock::time_point> {
|
||||
auto j = std::next(i);
|
||||
if (j == e) {
|
||||
return std::nullopt;
|
||||
}
|
||||
// add this so we sort of match potential
|
||||
// sequence numbers in get_records result.
|
||||
return j->first + confidence_interval(db);
|
||||
}();
|
||||
|
||||
while (lo != end) {
|
||||
auto& id = *lo++;
|
||||
|
||||
auto shard = rjson::empty_object();
|
||||
|
||||
if (prev != e) {
|
||||
auto& pids = prev->second.streams;
|
||||
auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) {
|
||||
return t < id.token();
|
||||
});
|
||||
if (pid != pids.begin()) {
|
||||
pid = std::prev(pid);
|
||||
}
|
||||
if (pid != pids.end()) {
|
||||
rjson::add(shard, "ParentShardId", shard_id(prev->first, *pid));
|
||||
}
|
||||
}
|
||||
|
||||
last.emplace(ts, id);
|
||||
rjson::add(shard, "ShardId", *last);
|
||||
auto range = rjson::empty_object();
|
||||
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
|
||||
if (expired) {
|
||||
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
|
||||
}
|
||||
|
||||
rjson::add(shard, "SequenceNumberRange", std::move(range));
|
||||
rjson::push_back(shards, std::move(shard));
|
||||
|
||||
if (--limit == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
last = std::nullopt;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (last) {
|
||||
rjson::add(stream_desc, "LastEvaluatedShardId", *last);
|
||||
}
|
||||
if (last) {
|
||||
rjson::add(stream_desc, "LastEvaluatedShardId", *last);
|
||||
}
|
||||
|
||||
rjson::add(stream_desc, "Shards", std::move(shards));
|
||||
rjson::add(ret, "StreamDescription", std::move(stream_desc));
|
||||
|
||||
co_return rjson::print(std::move(ret));
|
||||
rjson::add(stream_desc, "Shards", std::move(shards));
|
||||
rjson::add(ret, "StreamDescription", std::move(stream_desc));
|
||||
|
||||
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
||||
});
|
||||
}
|
||||
|
||||
enum class shard_iterator_type {
|
||||
@@ -787,18 +776,16 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
|
||||
struct event_id {
|
||||
cdc::stream_id stream;
|
||||
utils::UUID timestamp;
|
||||
size_t index = 0;
|
||||
|
||||
static constexpr auto marker = 'E';
|
||||
|
||||
event_id(cdc::stream_id s, utils::UUID ts, size_t index)
|
||||
event_id(cdc::stream_id s, utils::UUID ts)
|
||||
: stream(s)
|
||||
, timestamp(ts)
|
||||
, index(index)
|
||||
{}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const event_id& id) {
|
||||
fmt::print(os, "{}{}:{}:{}", marker, id.stream.to_bytes(), id.timestamp, id.index);
|
||||
fmt::print(os, "{}{}:{}", marker, id.stream.to_bytes(), id.timestamp);
|
||||
return os;
|
||||
}
|
||||
};
|
||||
@@ -810,19 +797,7 @@ struct rapidjson::internal::TypeHelper<ValueType, alternator::event_id>
|
||||
{};
|
||||
|
||||
namespace alternator {
|
||||
namespace {
|
||||
struct managed_bytes_ptr_hash {
|
||||
size_t operator()(const managed_bytes *k) const noexcept {
|
||||
return std::hash<managed_bytes>{}(*k);
|
||||
}
|
||||
};
|
||||
struct managed_bytes_ptr_equal {
|
||||
bool operator()(const managed_bytes *a, const managed_bytes *b) const noexcept {
|
||||
return *a == *b;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
future<executor::request_return_type> executor::get_records(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.get_records++;
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
@@ -893,12 +868,6 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
|
||||
auto pks = schema->partition_key_columns();
|
||||
auto cks = schema->clustering_key_columns();
|
||||
|
||||
auto base_cks = base->clustering_key_columns();
|
||||
if (base_cks.size() > 1) {
|
||||
throw api_error::internal(fmt::format("invalid alternator table, clustering key count ({}) is bigger than one", base_cks.size()));
|
||||
}
|
||||
const bytes *clustering_key_column_name = !base_cks.empty() ? &base_cks.front().name() : nullptr;
|
||||
|
||||
std::transform(pks.begin(), pks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
||||
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
||||
@@ -929,184 +898,172 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
|
||||
query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul));
|
||||
|
||||
service::storage_proxy::coordinator_query_result qr = co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state));
|
||||
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
|
||||
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
|
||||
co_return co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)).then(
|
||||
[this, schema, partition_slice = std::move(partition_slice), selection = std::move(selection), start_time = std::move(start_time), limit, key_names = std::move(key_names), attr_names = std::move(attr_names), type, iter, high_ts] (service::storage_proxy::coordinator_query_result qr) mutable {
|
||||
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
|
||||
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
|
||||
|
||||
auto result_set = builder.build();
|
||||
auto records = rjson::empty_array();
|
||||
auto result_set = builder.build();
|
||||
auto records = rjson::empty_array();
|
||||
|
||||
auto& metadata = result_set->get_metadata();
|
||||
auto& metadata = result_set->get_metadata();
|
||||
|
||||
auto op_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == op_column_name;
|
||||
})
|
||||
);
|
||||
auto ts_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == timestamp_column_name;
|
||||
})
|
||||
);
|
||||
auto eor_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == eor_column_name;
|
||||
})
|
||||
);
|
||||
auto clustering_key_index = clustering_key_column_name ? std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [&](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == *clustering_key_column_name;
|
||||
})
|
||||
) : 0;
|
||||
auto op_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == op_column_name;
|
||||
})
|
||||
);
|
||||
auto ts_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == timestamp_column_name;
|
||||
})
|
||||
);
|
||||
auto eor_index = std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == eor_column_name;
|
||||
})
|
||||
);
|
||||
|
||||
std::optional<utils::UUID> timestamp;
|
||||
struct Record {
|
||||
rjson::value record;
|
||||
rjson::value dynamodb;
|
||||
};
|
||||
const managed_bytes empty_managed_bytes;
|
||||
std::unordered_map<const managed_bytes*, Record, managed_bytes_ptr_hash, managed_bytes_ptr_equal> records_map;
|
||||
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
|
||||
std::optional<utils::UUID> timestamp;
|
||||
auto dynamodb = rjson::empty_object();
|
||||
auto record = rjson::empty_object();
|
||||
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
|
||||
|
||||
using op_utype = std::underlying_type_t<cdc::operation>;
|
||||
using op_utype = std::underlying_type_t<cdc::operation>;
|
||||
|
||||
for (auto& row : result_set->rows()) {
|
||||
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
|
||||
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
|
||||
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
|
||||
const managed_bytes* cs_ptr = clustering_key_column_name ? &*row[clustering_key_index] : &empty_managed_bytes;
|
||||
auto records_it = records_map.emplace(cs_ptr, Record{});
|
||||
auto &record = records_it.first->second;
|
||||
auto maybe_add_record = [&] {
|
||||
if (!dynamodb.ObjectEmpty()) {
|
||||
rjson::add(record, "dynamodb", std::move(dynamodb));
|
||||
dynamodb = rjson::empty_object();
|
||||
}
|
||||
if (!record.ObjectEmpty()) {
|
||||
rjson::add(record, "awsRegion", rjson::from_string(dc_name));
|
||||
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
|
||||
rjson::add(record, "eventSource", "scylladb:alternator");
|
||||
rjson::add(record, "eventVersion", "1.1");
|
||||
rjson::push_back(records, std::move(record));
|
||||
record = rjson::empty_object();
|
||||
--limit;
|
||||
}
|
||||
};
|
||||
|
||||
if (records_it.second) {
|
||||
record.dynamodb = rjson::empty_object();
|
||||
record.record = rjson::empty_object();
|
||||
auto keys = rjson::empty_object();
|
||||
describe_single_item(*selection, row, key_names, keys);
|
||||
rjson::add(record.dynamodb, "Keys", std::move(keys));
|
||||
rjson::add(record.dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
|
||||
rjson::add(record.dynamodb, "SequenceNumber", sequence_number(ts));
|
||||
rjson::add(record.dynamodb, "StreamViewType", type);
|
||||
// TODO: SizeBytes
|
||||
}
|
||||
for (auto& row : result_set->rows()) {
|
||||
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
|
||||
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
|
||||
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
|
||||
|
||||
/**
|
||||
* We merge rows with same timestamp into a single event.
|
||||
* This is pretty much needed, because a CDC row typically
|
||||
* encodes ~half the info of an alternator write.
|
||||
*
|
||||
* A big, big downside to how alternator records are written
|
||||
* (i.e. CQL), is that the distinction between INSERT and UPDATE
|
||||
* is somewhat lost/unmappable to actual eventName.
|
||||
* A write (currently) always looks like an insert+modify
|
||||
* regardless whether we wrote existing record or not.
|
||||
*
|
||||
* Maybe RMW ops could be done slightly differently so
|
||||
* we can distinguish them here...
|
||||
*
|
||||
* For now, all writes will become MODIFY.
|
||||
*
|
||||
* Note: we do not check the current pre/post
|
||||
* flags on CDC log, instead we use data to
|
||||
* drive what is returned. This is (afaict)
|
||||
* consistent with dynamo streams
|
||||
*
|
||||
* Note: BatchWriteItem will generate multiple records with
|
||||
* the same timestamp, when write isolation is set to always
|
||||
* (which triggers lwt), so we need to unpack them based on clustering key.
|
||||
*/
|
||||
switch (op) {
|
||||
case cdc::operation::pre_image:
|
||||
case cdc::operation::post_image:
|
||||
{
|
||||
auto item = rjson::empty_object();
|
||||
describe_single_item(*selection, row, attr_names, item, nullptr, true);
|
||||
describe_single_item(*selection, row, key_names, item);
|
||||
rjson::add(record.dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
|
||||
break;
|
||||
}
|
||||
case cdc::operation::update:
|
||||
rjson::add(record.record, "eventName", "MODIFY");
|
||||
break;
|
||||
case cdc::operation::insert:
|
||||
rjson::add(record.record, "eventName", "INSERT");
|
||||
break;
|
||||
case cdc::operation::service_row_delete:
|
||||
case cdc::operation::service_partition_delete:
|
||||
{
|
||||
auto user_identity = rjson::empty_object();
|
||||
rjson::add(user_identity, "Type", "Service");
|
||||
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
|
||||
rjson::add(record.record, "userIdentity", std::move(user_identity));
|
||||
rjson::add(record.record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
default:
|
||||
rjson::add(record.record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
if (eor) {
|
||||
size_t index = 0;
|
||||
for (auto& [_, rec] : records_map) {
|
||||
rjson::add(rec.record, "awsRegion", rjson::from_string(dc_name));
|
||||
rjson::add(rec.record, "eventID", event_id(iter.shard.id, *timestamp, index++));
|
||||
rjson::add(rec.record, "eventSource", "scylladb:alternator");
|
||||
rjson::add(rec.record, "eventVersion", "1.1");
|
||||
|
||||
rjson::add(rec.record, "dynamodb", std::move(rec.dynamodb));
|
||||
rjson::push_back(records, std::move(rec.record));
|
||||
if (!dynamodb.HasMember("Keys")) {
|
||||
auto keys = rjson::empty_object();
|
||||
describe_single_item(*selection, row, key_names, keys);
|
||||
rjson::add(dynamodb, "Keys", std::move(keys));
|
||||
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
|
||||
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
|
||||
rjson::add(dynamodb, "StreamViewType", type);
|
||||
// TODO: SizeBytes
|
||||
}
|
||||
|
||||
records_map.clear();
|
||||
timestamp = ts;
|
||||
if (records.Size() >= limit) {
|
||||
// Note: we might have more than limit rows here - BatchWriteItem will emit multiple items
|
||||
// with the same timestamp and we have no way of resume iteration midway through those,
|
||||
// so we return all of them here.
|
||||
/**
|
||||
* We merge rows with same timestamp into a single event.
|
||||
* This is pretty much needed, because a CDC row typically
|
||||
* encodes ~half the info of an alternator write.
|
||||
*
|
||||
* A big, big downside to how alternator records are written
|
||||
* (i.e. CQL), is that the distinction between INSERT and UPDATE
|
||||
* is somewhat lost/unmappable to actual eventName.
|
||||
* A write (currently) always looks like an insert+modify
|
||||
* regardless whether we wrote existing record or not.
|
||||
*
|
||||
* Maybe RMW ops could be done slightly differently so
|
||||
* we can distinguish them here...
|
||||
*
|
||||
* For now, all writes will become MODIFY.
|
||||
*
|
||||
* Note: we do not check the current pre/post
|
||||
* flags on CDC log, instead we use data to
|
||||
* drive what is returned. This is (afaict)
|
||||
* consistent with dynamo streams
|
||||
*/
|
||||
switch (op) {
|
||||
case cdc::operation::pre_image:
|
||||
case cdc::operation::post_image:
|
||||
{
|
||||
auto item = rjson::empty_object();
|
||||
describe_single_item(*selection, row, attr_names, item, nullptr, true);
|
||||
describe_single_item(*selection, row, key_names, item);
|
||||
rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
|
||||
break;
|
||||
}
|
||||
case cdc::operation::update:
|
||||
rjson::add(record, "eventName", "MODIFY");
|
||||
break;
|
||||
case cdc::operation::insert:
|
||||
rjson::add(record, "eventName", "INSERT");
|
||||
break;
|
||||
case cdc::operation::service_row_delete:
|
||||
case cdc::operation::service_partition_delete:
|
||||
{
|
||||
auto user_identity = rjson::empty_object();
|
||||
rjson::add(user_identity, "Type", "Service");
|
||||
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
|
||||
rjson::add(record, "userIdentity", std::move(user_identity));
|
||||
rjson::add(record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
default:
|
||||
rjson::add(record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
if (eor) {
|
||||
maybe_add_record();
|
||||
timestamp = ts;
|
||||
if (limit == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto ret = rjson::empty_object();
|
||||
rjson::add(ret, "Records", std::move(records));
|
||||
auto ret = rjson::empty_object();
|
||||
auto nrecords = records.Size();
|
||||
rjson::add(ret, "Records", std::move(records));
|
||||
|
||||
if (timestamp) {
|
||||
// #9642. Set next iterators threshold to > last
|
||||
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
|
||||
// Note that here we unconditionally return NextShardIterator,
|
||||
// without checking if maybe we reached the end-of-shard. If the
|
||||
// shard did end, then the next read will have nrecords == 0 and
|
||||
// will notice end end of shard and not return NextShardIterator.
|
||||
rjson::add(ret, "NextShardIterator", next_iter);
|
||||
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
co_return rjson::print(std::move(ret));
|
||||
}
|
||||
if (nrecords != 0) {
|
||||
// #9642. Set next iterators threshold to > last
|
||||
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
|
||||
// Note that here we unconditionally return NextShardIterator,
|
||||
// without checking if maybe we reached the end-of-shard. If the
|
||||
// shard did end, then the next read will have nrecords == 0 and
|
||||
// will notice end end of shard and not return NextShardIterator.
|
||||
rjson::add(ret, "NextShardIterator", next_iter);
|
||||
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
||||
}
|
||||
|
||||
// ugh. figure out if we are and end-of-shard
|
||||
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
|
||||
// ugh. figure out if we are and end-of-shard
|
||||
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
|
||||
|
||||
db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
|
||||
auto& shard = iter.shard;
|
||||
return _sdks.cdc_current_generation_timestamp({ normal_token_owners }).then([this, iter, high_ts, start_time, ret = std::move(ret)](db_clock::time_point ts) mutable {
|
||||
auto& shard = iter.shard;
|
||||
|
||||
if (shard.time < ts && ts < high_ts) {
|
||||
// The DynamoDB documentation states that when a shard is
|
||||
// closed, reading it until the end has NextShardIterator
|
||||
// "set to null". Our test test_streams_closed_read
|
||||
// confirms that by "null" they meant not set at all.
|
||||
} else {
|
||||
// We could have return the same iterator again, but we did
|
||||
// a search from it until high_ts and found nothing, so we
|
||||
// can also start the next search from high_ts.
|
||||
// TODO: but why? It's simpler just to leave the iterator be.
|
||||
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
|
||||
rjson::add(ret, "NextShardIterator", iter);
|
||||
}
|
||||
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
if (is_big(ret)) {
|
||||
co_return make_streamed(std::move(ret));
|
||||
}
|
||||
co_return rjson::print(std::move(ret));
|
||||
if (shard.time < ts && ts < high_ts) {
|
||||
// The DynamoDB documentation states that when a shard is
|
||||
// closed, reading it until the end has NextShardIterator
|
||||
// "set to null". Our test test_streams_closed_read
|
||||
// confirms that by "null" they meant not set at all.
|
||||
} else {
|
||||
// We could have return the same iterator again, but we did
|
||||
// a search from it until high_ts and found nothing, so we
|
||||
// can also start the next search from high_ts.
|
||||
// TODO: but why? It's simpler just to leave the iterator be.
|
||||
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
|
||||
rjson::add(ret, "NextShardIterator", iter);
|
||||
}
|
||||
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
if (is_big(ret)) {
|
||||
return make_ready_future<executor::request_return_type>(make_streamed(std::move(ret)));
|
||||
}
|
||||
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
bool executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {
|
||||
@@ -1122,7 +1079,6 @@ bool executor::add_stream_options(const rjson::value& stream_specification, sche
|
||||
|
||||
cdc::options opts;
|
||||
opts.enabled(true);
|
||||
// cdc::delta_mode is ignored by Alternator, so aim for the least overhead.
|
||||
opts.set_delta_mode(cdc::delta_mode::keys);
|
||||
opts.ttl(std::chrono::duration_cast<std::chrono::seconds>(dynamodb_streams_max_window).count());
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <chrono>
|
||||
@@ -46,7 +46,6 @@
|
||||
#include "alternator/executor.hh"
|
||||
#include "alternator/controller.hh"
|
||||
#include "alternator/serialization.hh"
|
||||
#include "alternator/ttl_tag.hh"
|
||||
#include "dht/sharder.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/tags/utils.hh"
|
||||
@@ -58,10 +57,19 @@ static logging::logger tlogger("alternator_ttl");
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// We write the expiration-time attribute enabled on a table in a
|
||||
// tag TTL_TAG_KEY.
|
||||
// Currently, the *value* of this tag is simply the name of the attribute,
|
||||
// and the expiration scanner interprets it as an Alternator attribute name -
|
||||
// It can refer to a real column or if that doesn't exist, to a member of
|
||||
// the ":attrs" map column. Although this is designed for Alternator, it may
|
||||
// be good enough for CQL as well (there, the ":attrs" column won't exist).
|
||||
extern const sstring TTL_TAG_KEY;
|
||||
|
||||
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.update_time_to_live++;
|
||||
if (!_proxy.features().alternator_ttl) {
|
||||
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Upgrade all nodes to a version that supports it.");
|
||||
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
|
||||
}
|
||||
|
||||
schema_ptr schema = get_table(_proxy, request);
|
||||
@@ -85,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
|
||||
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
|
||||
}
|
||||
sstring attribute_name = rjson::to_sstring(*v);
|
||||
sstring attribute_name(v->GetString(), v->GetStringLength());
|
||||
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
|
||||
@@ -133,7 +141,7 @@ future<executor::request_return_type> executor::describe_time_to_live(client_sta
|
||||
|
||||
// expiration_service is a sharded service responsible for cleaning up expired
|
||||
// items in all tables with per-item expiration enabled. Currently, this means
|
||||
// Alternator tables with TTL configured via an UpdateTimeToLive request.
|
||||
// Alternator tables with TTL configured via a UpdateTimeToLive request.
|
||||
//
|
||||
// Here is a brief overview of how the expiration service works:
|
||||
//
|
||||
@@ -316,7 +324,9 @@ static future<std::vector<std::pair<dht::token_range, locator::host_id>>> get_se
|
||||
const auto& tm = *erm->get_token_metadata_ptr();
|
||||
const auto& sorted_tokens = tm.sorted_tokens();
|
||||
std::vector<std::pair<dht::token_range, locator::host_id>> ret;
|
||||
throwing_assert(!sorted_tokens.empty());
|
||||
if (sorted_tokens.empty()) {
|
||||
on_internal_error(tlogger, "Token metadata is empty");
|
||||
}
|
||||
auto prev_tok = sorted_tokens.back();
|
||||
for (const auto& tok : sorted_tokens) {
|
||||
co_await coroutine::maybe_yield();
|
||||
@@ -553,7 +563,7 @@ static future<> scan_table_ranges(
|
||||
expiration_service::stats& expiration_stats)
|
||||
{
|
||||
const schema_ptr& s = scan_ctx.s;
|
||||
throwing_assert(partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
|
||||
SCYLLA_ASSERT (partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
|
||||
auto p = service::pager::query_pagers::pager(proxy, s, scan_ctx.selection, *scan_ctx.query_state_ptr,
|
||||
*scan_ctx.query_options, scan_ctx.command, std::move(partition_ranges), nullptr);
|
||||
while (!p->is_exhausted()) {
|
||||
@@ -583,7 +593,7 @@ static future<> scan_table_ranges(
|
||||
if (retries >= 10) {
|
||||
// Don't get stuck forever asking the same page, maybe there's
|
||||
// a bug or a real problem in several replicas. Give up on
|
||||
// this scan and retry the scan from a random position later,
|
||||
// this scan an retry the scan from a random position later,
|
||||
// in the next scan period.
|
||||
throw runtime_exception("scanner thread failed after too many timeouts for the same page");
|
||||
}
|
||||
@@ -630,38 +640,13 @@ static future<> scan_table_ranges(
|
||||
}
|
||||
} else {
|
||||
// For a real column to contain an expiration time, it
|
||||
// must be a numeric type. We currently support decimal
|
||||
// (used by Alternator TTL) as well as bigint, int and
|
||||
// timestamp (used by CQL per-row TTL).
|
||||
switch (meta[*expiration_column]->type->get_kind()) {
|
||||
case abstract_type::kind::decimal:
|
||||
// Used by Alternator TTL for key columns not stored
|
||||
// in the map. The value is in seconds, fractional
|
||||
// part is ignored.
|
||||
expired = is_expired(value_cast<big_decimal>(v), now);
|
||||
break;
|
||||
case abstract_type::kind::long_kind:
|
||||
// Used by CQL per-row TTL. The value is in seconds.
|
||||
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int64_t>(v))), now);
|
||||
break;
|
||||
case abstract_type::kind::int32:
|
||||
// Used by CQL per-row TTL. The value is in seconds.
|
||||
// Using int type is not recommended because it will
|
||||
// overflow in 2038, but we support it to allow users
|
||||
// to use existing int columns for expiration.
|
||||
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int32_t>(v))), now);
|
||||
break;
|
||||
case abstract_type::kind::timestamp:
|
||||
// Used by CQL per-row TTL. The value is in milliseconds
|
||||
// but we truncate it to gc_clock's precision (whole seconds).
|
||||
expired = is_expired(gc_clock::time_point(std::chrono::duration_cast<gc_clock::duration>(value_cast<db_clock::time_point>(v).time_since_epoch())), now);
|
||||
break;
|
||||
default:
|
||||
// Should never happen - we verified the column's type
|
||||
// before starting the scan.
|
||||
[[unlikely]]
|
||||
on_internal_error(tlogger, format("expiration scanner value of unsupported type {} in column {}", meta[*expiration_column]->type->cql3_type_name(), scan_ctx.column_name) );
|
||||
}
|
||||
// must be a numeric type.
|
||||
// FIXME: Currently we only support decimal_type (which is
|
||||
// what Alternator uses), but other numeric types can be
|
||||
// supported as well to make this feature more useful in CQL.
|
||||
// Note that kind::decimal is also checked above.
|
||||
big_decimal n = value_cast<big_decimal>(v);
|
||||
expired = is_expired(n, now);
|
||||
}
|
||||
if (expired) {
|
||||
expiration_stats.items_deleted++;
|
||||
@@ -723,12 +708,16 @@ static future<bool> scan_table(
|
||||
co_return false;
|
||||
}
|
||||
// attribute_name may be one of the schema's columns (in Alternator, this
|
||||
// means a key column, in CQL it's a regular column), or an element in
|
||||
// Alternator's attrs map encoded in Alternator's JSON encoding (which we
|
||||
// decode). If attribute_name is a real column, in Alternator it will have
|
||||
// the type decimal, counting seconds since the UNIX epoch, while in CQL
|
||||
// it will one of the types bigint or int (counting seconds) or timestamp
|
||||
// (counting milliseconds).
|
||||
// means it's a key column), or an element in Alternator's attrs map
|
||||
// encoded in Alternator's JSON encoding.
|
||||
// FIXME: To make this less Alternators-specific, we should encode in the
|
||||
// single key's value three things:
|
||||
// 1. The name of a column
|
||||
// 2. Optionally if column is a map, a member in the map
|
||||
// 3. The deserializer for the value: CQL or Alternator (JSON).
|
||||
// The deserializer can be guessed: If the given column or map item is
|
||||
// numeric, it can be used directly. If it is a "bytes" type, it needs to
|
||||
// be deserialized using Alternator's deserializer.
|
||||
bytes column_name = to_bytes(*attribute_name);
|
||||
const column_definition *cd = s->get_column_definition(column_name);
|
||||
std::optional<std::string> member;
|
||||
@@ -747,14 +736,11 @@ static future<bool> scan_table(
|
||||
data_type column_type = cd->type;
|
||||
// Verify that the column has the right type: If "member" exists
|
||||
// the column must be a map, and if it doesn't, the column must
|
||||
// be decimal_type (Alternator), bigint, int or timestamp (CQL).
|
||||
// If the column has the wrong type nothing can get expired in
|
||||
// this table, and it's pointless to scan it.
|
||||
// (currently) be a decimal_type. If the column has the wrong type
|
||||
// nothing can get expired in this table, and it's pointless to
|
||||
// scan it.
|
||||
if ((member && column_type->get_kind() != abstract_type::kind::map) ||
|
||||
(!member && column_type->get_kind() != abstract_type::kind::decimal &&
|
||||
column_type->get_kind() != abstract_type::kind::long_kind &&
|
||||
column_type->get_kind() != abstract_type::kind::int32 &&
|
||||
column_type->get_kind() != abstract_type::kind::timestamp)) {
|
||||
(!member && column_type->get_kind() != abstract_type::kind::decimal)) {
|
||||
tlogger.info("table {} TTL column has unsupported type, not scanning", s->cf_name());
|
||||
co_return false;
|
||||
}
|
||||
@@ -781,7 +767,7 @@ static future<bool> scan_table(
|
||||
// by tasking another node to take over scanning of the dead node's primary
|
||||
// ranges. What we do here is that this node will also check expiration
|
||||
// on its *secondary* ranges - but only those whose primary owner is down.
|
||||
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet, erm->get_topology()); // throws if no secondary replica
|
||||
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica
|
||||
if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) {
|
||||
if (!gossiper.is_alive(tablet_primary_replica.host)) {
|
||||
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);
|
||||
@@ -892,10 +878,12 @@ future<> expiration_service::run() {
|
||||
future<> expiration_service::start() {
|
||||
// Called by main() on each shard to start the expiration-service
|
||||
// thread. Just runs run() in the background and allows stop().
|
||||
if (!shutting_down()) {
|
||||
_end = run().handle_exception([] (std::exception_ptr ep) {
|
||||
tlogger.error("expiration_service failed: {}", ep);
|
||||
});
|
||||
if (_db.features().alternator_ttl) {
|
||||
if (!shutting_down()) {
|
||||
_end = run().handle_exception([] (std::exception_ptr ep) {
|
||||
tlogger.error("expiration_service failed: {}", ep);
|
||||
});
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -30,7 +30,7 @@ namespace alternator {
|
||||
|
||||
// expiration_service is a sharded service responsible for cleaning up expired
|
||||
// items in all tables with per-item expiration enabled. Currently, this means
|
||||
// Alternator tables with TTL configured via an UpdateTimeToLive request.
|
||||
// Alternator tables with TTL configured via a UpdateTimeToLeave request.
|
||||
class expiration_service final : public seastar::peering_sharded_service<expiration_service> {
|
||||
public:
|
||||
// Object holding per-shard statistics related to the expiration service.
|
||||
@@ -52,7 +52,7 @@ private:
|
||||
data_dictionary::database _db;
|
||||
service::storage_proxy& _proxy;
|
||||
gms::gossiper& _gossiper;
|
||||
// _end is set by start(), and resolves when the background service
|
||||
// _end is set by start(), and resolves when the the background service
|
||||
// started by it ends. To ask the background service to end, _abort_source
|
||||
// should be triggered. stop() below uses both _abort_source and _end.
|
||||
std::optional<future<>> _end;
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
/*
|
||||
* Copyright 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "seastarx.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
namespace alternator {
|
||||
// We use the table tag TTL_TAG_KEY ("system:ttl_attribute") to remember
|
||||
// which attribute was chosen as the expiration-time attribute for
|
||||
// Alternator's TTL and CQL's per-row TTL features.
|
||||
// Currently, the *value* of this tag is simply the name of the attribute:
|
||||
// It can refer to a real column or if that doesn't exist, to a member of
|
||||
// the ":attrs" map column (which Alternator uses).
|
||||
extern const sstring TTL_TAG_KEY;
|
||||
} // namespace alternator
|
||||
|
||||
// let users use TTL_TAG_KEY without the "alternator::" prefix,
|
||||
// to make it easier to move it to a different namespace later.
|
||||
using alternator::TTL_TAG_KEY;
|
||||
@@ -31,7 +31,6 @@ set(swagger_files
|
||||
api-doc/column_family.json
|
||||
api-doc/commitlog.json
|
||||
api-doc/compaction_manager.json
|
||||
api-doc/client_routes.json
|
||||
api-doc/config.json
|
||||
api-doc/cql_server_test.json
|
||||
api-doc/endpoint_snitch_info.json
|
||||
@@ -69,7 +68,6 @@ target_sources(api
|
||||
PRIVATE
|
||||
api.cc
|
||||
cache_service.cc
|
||||
client_routes.cc
|
||||
collectd.cc
|
||||
column_family.cc
|
||||
commitlog.cc
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Resets authorized prepared statements cache",
|
||||
"summary":"Reset cache",
|
||||
"type":"void",
|
||||
"nickname":"authorization_cache_reset",
|
||||
"produces":[
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
, "client_routes_entry": {
|
||||
"id": "client_routes_entry",
|
||||
"summary": "An entry storing client routes",
|
||||
"properties": {
|
||||
"connection_id": {"type": "string"},
|
||||
"host_id": {"type": "string", "format": "uuid"},
|
||||
"address": {"type": "string"},
|
||||
"port": {"type": "integer"},
|
||||
"tls_port": {"type": "integer"},
|
||||
"alternator_port": {"type": "integer"},
|
||||
"alternator_https_port": {"type": "integer"}
|
||||
},
|
||||
"required": ["connection_id", "host_id", "address"]
|
||||
}
|
||||
, "client_routes_key": {
|
||||
"id": "client_routes_key",
|
||||
"summary": "A key of client_routes_entry",
|
||||
"properties": {
|
||||
"connection_id": {"type": "string"},
|
||||
"host_id": {"type": "string", "format": "uuid"}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
, "/v2/client-routes":{
|
||||
"get": {
|
||||
"description":"List all client route entries",
|
||||
"operationId":"get_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[],
|
||||
"responses":{
|
||||
"200":{
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_entry" }
|
||||
}
|
||||
},
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{"$ref":"#/definitions/ErrorModel"}
|
||||
}
|
||||
}
|
||||
},
|
||||
"post": {
|
||||
"description":"Upsert one or more client route entries",
|
||||
"operationId":"set_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"body",
|
||||
"in":"body",
|
||||
"required":true,
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_entry" }
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses":{
|
||||
"200":{ "description": "OK" },
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{ "$ref":"#/definitions/ErrorModel" }
|
||||
}
|
||||
}
|
||||
},
|
||||
"delete": {
|
||||
"description":"Delete one or more client route entries",
|
||||
"operationId":"delete_client_routes",
|
||||
"tags":["client_routes"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"body",
|
||||
"in":"body",
|
||||
"required":true,
|
||||
"schema":{
|
||||
"type":"array",
|
||||
"items":{ "$ref":"#/definitions/client_routes_key" }
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses":{
|
||||
"200":{
|
||||
"description": "OK"
|
||||
},
|
||||
"default":{
|
||||
"description":"unexpected error",
|
||||
"schema":{
|
||||
"$ref":"#/definitions/ErrorModel"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,7 +243,7 @@
|
||||
"GOSSIP_DIGEST_SYN",
|
||||
"GOSSIP_DIGEST_ACK2",
|
||||
"GOSSIP_SHUTDOWN",
|
||||
"UNUSED__DEFINITIONS_UPDATE",
|
||||
"DEFINITIONS_UPDATE",
|
||||
"TRUNCATE",
|
||||
"UNUSED__REPLICATION_FINISHED",
|
||||
"MIGRATION_REQUEST",
|
||||
|
||||
@@ -743,7 +743,7 @@
|
||||
"parameters":[
|
||||
{
|
||||
"name":"tag",
|
||||
"description":"The snapshot tag to delete. If omitted, all snapshots are removed.",
|
||||
"description":"the tag given to the snapshot",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
@@ -751,7 +751,7 @@
|
||||
},
|
||||
{
|
||||
"name":"kn",
|
||||
"description":"Comma-separated list of keyspace names to delete snapshots from. If omitted, snapshots are deleted from all keyspaces.",
|
||||
"description":"Comma-separated keyspaces name that their snapshot will be deleted",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
@@ -759,7 +759,7 @@
|
||||
},
|
||||
{
|
||||
"name":"cf",
|
||||
"description":"A table name used to filter which table's snapshots are deleted. If omitted or empty, snapshots for all tables are eligible. When provided together with 'kn', the table is looked up in each listed keyspace independently. For secondary indexes, the logical index name (e.g. 'myindex') can be used and is resolved automatically.",
|
||||
"description":"an optional table name that its snapshot will be deleted",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
@@ -1295,45 +1295,6 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/logstor_compaction",
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Trigger compaction of the key-value storage",
|
||||
"type":"void",
|
||||
"nickname":"logstor_compaction",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"major",
|
||||
"description":"When true, perform a major compaction",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/logstor_flush",
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Trigger flush of logstor storage",
|
||||
"type":"void",
|
||||
"nickname":"logstor_flush",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/active_repair/",
|
||||
"operations":[
|
||||
@@ -3124,125 +3085,6 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
{
|
||||
"path":"/storage_service/tablets/snapshots",
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Takes the snapshot for the given keyspaces/tables. A snapshot name must be specified.",
|
||||
"type":"void",
|
||||
"nickname":"take_cluster_snapshot",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"tag",
|
||||
"description":"the tag given to the snapshot",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace(s) to snapshot. Multiple keyspaces can be provided using a comma-separated list. If omitted, snapshot all keyspaces.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"table",
|
||||
"description":"Table(s) to snapshot. Multiple tables (in a single keyspace) can be provided using a comma-separated list. If omitted, snapshot all tables in the given keyspace(s).",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
{
|
||||
"path":"/storage_service/vnode_tablet_migrations/keyspaces/{keyspace}",
|
||||
"operations":[{
|
||||
"method":"POST",
|
||||
"summary":"Start vnodes-to-tablets migration for all tables in a keyspace",
|
||||
"type":"void",
|
||||
"nickname":"create_vnode_tablet_migration",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get a keyspace's vnodes-to-tablets migration status",
|
||||
"type":"vnode_tablet_migration_status",
|
||||
"nickname":"get_vnode_tablet_migration",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/vnode_tablet_migrations/node/storage_mode",
|
||||
"operations":[{
|
||||
"method":"PUT",
|
||||
"summary":"Set the intended storage mode for this node during vnodes-to-tablets migration",
|
||||
"type":"void",
|
||||
"nickname":"set_vnode_tablet_migration_node_storage_mode",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"intended_mode",
|
||||
"description":"Intended storage mode (tablets or vnodes)",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/vnode_tablet_migrations/keyspaces/{keyspace}/finalization",
|
||||
"operations":[{
|
||||
"method":"POST",
|
||||
"summary":"Finalize vnodes-to-tablets migration for all tables in a keyspace",
|
||||
"type":"void",
|
||||
"nickname":"finalize_vnode_tablet_migration",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/quiesce_topology",
|
||||
"operations":[
|
||||
@@ -3345,38 +3187,6 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/logstor_info",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Logstor segment information for one table",
|
||||
"type":"table_logstor_info",
|
||||
"nickname":"logstor_info",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"The keyspace",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"table",
|
||||
"description":"table name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/retrain_dict",
|
||||
"operations":[
|
||||
@@ -3785,47 +3595,6 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"logstor_hist_bucket":{
|
||||
"id":"logstor_hist_bucket",
|
||||
"properties":{
|
||||
"bucket":{
|
||||
"type":"long"
|
||||
},
|
||||
"count":{
|
||||
"type":"long"
|
||||
},
|
||||
"min_data_size":{
|
||||
"type":"long"
|
||||
},
|
||||
"max_data_size":{
|
||||
"type":"long"
|
||||
}
|
||||
}
|
||||
},
|
||||
"table_logstor_info":{
|
||||
"id":"table_logstor_info",
|
||||
"description":"Per-table logstor segment distribution",
|
||||
"properties":{
|
||||
"keyspace":{
|
||||
"type":"string"
|
||||
},
|
||||
"table":{
|
||||
"type":"string"
|
||||
},
|
||||
"compaction_groups":{
|
||||
"type":"long"
|
||||
},
|
||||
"segments":{
|
||||
"type":"long"
|
||||
},
|
||||
"data_size_histogram":{
|
||||
"type":"array",
|
||||
"items":{
|
||||
"$ref":"logstor_hist_bucket"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"tablet_repair_result":{
|
||||
"id":"tablet_repair_result",
|
||||
"description":"Tablet repair result",
|
||||
@@ -3860,45 +3629,6 @@
|
||||
"description":"The resulting compression ratio (estimated on a random sample of files)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"vnode_tablet_migration_node_status":{
|
||||
"id":"vnode_tablet_migration_node_status",
|
||||
"description":"Node storage mode info during vnodes-to-tablets migration",
|
||||
"properties":{
|
||||
"host_id":{
|
||||
"type":"string",
|
||||
"description":"The host ID"
|
||||
},
|
||||
"current_mode":{
|
||||
"type":"string",
|
||||
"description":"The current storage mode: `vnodes` or `tablets`"
|
||||
},
|
||||
"intended_mode":{
|
||||
"type":"string",
|
||||
"description":"The intended storage mode: `vnodes` or `tablets`"
|
||||
}
|
||||
}
|
||||
},
|
||||
"vnode_tablet_migration_status":{
|
||||
"id":"vnode_tablet_migration_status",
|
||||
"description":"Vnodes-to-tablets migration status for a keyspace",
|
||||
"properties":{
|
||||
"keyspace":{
|
||||
"type":"string",
|
||||
"description":"The keyspace name"
|
||||
},
|
||||
"status":{
|
||||
"type":"string",
|
||||
"description":"The migration status: `vnodes` (not started), `migrating_to_tablets` (in progress), or `tablets` (complete)"
|
||||
},
|
||||
"nodes":{
|
||||
"type":"array",
|
||||
"items":{
|
||||
"$ref":"vnode_tablet_migration_node_status"
|
||||
},
|
||||
"description":"Per-node storage mode information. Empty if the keyspace is not being migrated."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,21 +209,6 @@
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/system/chosen_sstable_version",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get sstable version currently chosen for use in new sstables",
|
||||
"type":"string",
|
||||
"nickname":"get_chosen_sstable_version",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
21
api/api.cc
21
api/api.cc
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "api.hh"
|
||||
@@ -37,7 +37,6 @@
|
||||
#include "raft.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "service_levels.hh"
|
||||
#include "client_routes.hh"
|
||||
|
||||
logging::logger apilog("api");
|
||||
|
||||
@@ -68,11 +67,9 @@ future<> set_server_init(http_context& ctx) {
|
||||
rb02->set_api_doc(r);
|
||||
rb02->register_api_file(r, "swagger20_header");
|
||||
rb02->register_api_file(r, "metrics");
|
||||
rb02->register_api_file(r, "client_routes");
|
||||
rb->register_function(r, "system",
|
||||
"The system related API");
|
||||
rb02->add_definitions_file(r, "metrics");
|
||||
rb02->add_definitions_file(r, "client_routes");
|
||||
set_system(ctx, r);
|
||||
rb->register_function(r, "error_injection",
|
||||
"The error injection API");
|
||||
@@ -122,9 +119,9 @@ future<> unset_thrift_controller(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_thrift_controller(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
|
||||
return ctx.http_server.set_routes([&ctx, &ss, &ssc, &group0_client] (routes& r) {
|
||||
set_storage_service(ctx, r, ss, ssc, group0_client);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
|
||||
return ctx.http_server.set_routes([&ctx, &ss, &group0_client] (routes& r) {
|
||||
set_storage_service(ctx, r, ss, group0_client);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -132,16 +129,6 @@ future<> unset_server_storage_service(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_service(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr) {
|
||||
return ctx.http_server.set_routes([&ctx, &cr] (routes& r) {
|
||||
set_client_routes(ctx, r, cr);
|
||||
});
|
||||
}
|
||||
|
||||
future<> unset_server_client_routes(http_context& ctx) {
|
||||
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_client_routes(ctx, r); });
|
||||
}
|
||||
|
||||
future<> set_load_meter(http_context& ctx, service::load_meter& lm) {
|
||||
return ctx.http_server.set_routes([&ctx, &lm] (routes& r) { set_load_meter(ctx, r, lm); });
|
||||
}
|
||||
|
||||
27
api/api.hh
27
api/api.hh
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
@@ -23,6 +23,31 @@
|
||||
|
||||
namespace api {
|
||||
|
||||
template<class T>
|
||||
std::vector<T> map_to_key_value(const std::map<sstring, sstring>& map) {
|
||||
std::vector<T> res;
|
||||
res.reserve(map.size());
|
||||
|
||||
for (const auto& [key, value] : map) {
|
||||
res.push_back(T());
|
||||
res.back().key = key;
|
||||
res.back().value = value;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
template<class T, class MAP>
|
||||
std::vector<T>& map_to_key_value(const MAP& map, std::vector<T>& res) {
|
||||
res.reserve(res.size() + std::size(map));
|
||||
|
||||
for (const auto& [key, value] : map) {
|
||||
T val;
|
||||
val.key = fmt::to_string(key);
|
||||
val.value = fmt::to_string(value);
|
||||
res.push_back(val);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
template <typename T, typename S = T>
|
||||
T map_sum(T&& dest, const S& src) {
|
||||
for (const auto& i : src) {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
@@ -29,7 +29,6 @@ class storage_proxy;
|
||||
class storage_service;
|
||||
class raft_group0_client;
|
||||
class raft_group_registry;
|
||||
class client_routes_service;
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -98,10 +97,8 @@ future<> set_server_config(http_context& ctx, db::config& cfg);
|
||||
future<> unset_server_config(http_context& ctx);
|
||||
future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snitch);
|
||||
future<> unset_server_snitch(http_context& ctx);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
|
||||
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
|
||||
future<> unset_server_storage_service(http_context& ctx);
|
||||
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
|
||||
future<> unset_server_client_routes(http_context& ctx);
|
||||
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
|
||||
future<> unset_server_sstables_loader(http_context& ctx);
|
||||
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb, sharded<gms::gossiper>& g);
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "api/api-doc/authorization_cache.json.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "cache_service.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -1,176 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include <seastar/http/short_streams.hh>
|
||||
|
||||
#include "client_routes.hh"
|
||||
#include "api/api.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/client_routes.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
|
||||
#include "api/api-doc/client_routes.json.hh"
|
||||
|
||||
using namespace seastar::httpd;
|
||||
using namespace std::chrono_literals;
|
||||
using namespace json;
|
||||
|
||||
extern logging::logger apilog;
|
||||
|
||||
namespace api {
|
||||
|
||||
static void validate_client_routes_endpoint(sharded<service::client_routes_service>& cr, sstring endpoint_name) {
|
||||
if (!cr.local().get_feature_service().client_routes) {
|
||||
apilog.warn("{}: called before the cluster feature was enabled", endpoint_name);
|
||||
throw std::runtime_error(fmt::format("{} requires all nodes to support the CLIENT_ROUTES cluster feature", endpoint_name));
|
||||
}
|
||||
}
|
||||
|
||||
static sstring parse_string(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
throw bad_param_exception(fmt::format("Missing '{}'", name));
|
||||
}
|
||||
if (!it->value.IsString()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be a string", name));
|
||||
}
|
||||
return {it->value.GetString(), it->value.GetStringLength()};
|
||||
}
|
||||
|
||||
static std::optional<uint32_t> parse_port(const char* name, rapidjson::Value const& v) {
|
||||
const auto it = v.FindMember(name);
|
||||
if (it == v.MemberEnd()) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!it->value.IsInt()) {
|
||||
throw bad_param_exception(fmt::format("'{}' must be an integer", name));
|
||||
}
|
||||
auto port = it->value.GetInt();
|
||||
if (port < 1 || port > 65535) {
|
||||
throw bad_param_exception(fmt::format("'{}' value={} is outside the allowed port range", name, port));
|
||||
}
|
||||
return port;
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_entry> parse_set_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_entry> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
if (!element.IsObject()) { throw bad_param_exception("Each element must be object"); }
|
||||
|
||||
const auto port = parse_port("port", element);
|
||||
const auto tls_port = parse_port("tls_port", element);
|
||||
const auto alternator_port = parse_port("alternator_port", element);
|
||||
const auto alternator_https_port = parse_port("alternator_https_port", element);
|
||||
|
||||
if (!port.has_value() && !tls_port.has_value() && !alternator_port.has_value() && !alternator_https_port.has_value()) {
|
||||
throw bad_param_exception("At least one port field ('port', 'tls_port', 'alternator_port', 'alternator_https_port') must be specified");
|
||||
}
|
||||
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)},
|
||||
parse_string("address", element),
|
||||
port,
|
||||
tls_port,
|
||||
alternator_port,
|
||||
alternator_https_port
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "rest_set_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().set_client_routes(parse_set_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static std::vector<service::client_routes_service::client_route_key> parse_delete_client_array(const rapidjson::Document& root) {
|
||||
if (!root.IsArray()) {
|
||||
throw bad_param_exception("Body must be a JSON array");
|
||||
}
|
||||
|
||||
std::vector<service::client_routes_service::client_route_key> v;
|
||||
v.reserve(root.GetArray().Size());
|
||||
for (const auto& element : root.GetArray()) {
|
||||
v.emplace_back(
|
||||
parse_string("connection_id", element),
|
||||
utils::UUID{parse_string("host_id", element)}
|
||||
);
|
||||
}
|
||||
|
||||
return v;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "delete_client_routes");
|
||||
|
||||
rapidjson::Document root;
|
||||
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
root.Parse(content.c_str());
|
||||
|
||||
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
|
||||
co_return seastar::json::json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_get_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
|
||||
validate_client_routes_endpoint(cr, "get_client_routes");
|
||||
|
||||
co_return co_await cr.invoke_on(0, [] (service::client_routes_service& cr) -> future<json::json_return_type> {
|
||||
co_return json::json_return_type(stream_range_as_array(co_await cr.get_client_routes(), [](const service::client_routes_service::client_route_entry & entry) {
|
||||
seastar::httpd::client_routes_json::client_routes_entry obj;
|
||||
obj.connection_id = entry.connection_id;
|
||||
obj.host_id = fmt::to_string(entry.host_id);
|
||||
obj.address = entry.address;
|
||||
if (entry.port.has_value()) { obj.port = entry.port.value(); }
|
||||
if (entry.tls_port.has_value()) { obj.tls_port = entry.tls_port.value(); }
|
||||
if (entry.alternator_port.has_value()) { obj.alternator_port = entry.alternator_port.value(); }
|
||||
if (entry.alternator_https_port.has_value()) { obj.alternator_https_port = entry.alternator_https_port.value(); }
|
||||
return obj;
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
void set_client_routes(http_context& ctx, routes& r, sharded<service::client_routes_service>& cr) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_set_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::delete_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_delete_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
seastar::httpd::client_routes_json::get_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
|
||||
return rest_get_client_routes(ctx, cr, std::move(req));
|
||||
});
|
||||
}
|
||||
|
||||
void unset_client_routes(http_context& ctx, routes& r) {
|
||||
seastar::httpd::client_routes_json::set_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::delete_client_routes.unset(r);
|
||||
seastar::httpd::client_routes_json::get_client_routes.unset(r);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,20 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/json/json_elements.hh>
|
||||
#include "api/api_init.hh"
|
||||
|
||||
namespace api {
|
||||
|
||||
void set_client_routes(http_context& ctx, httpd::routes& r, sharded<service::client_routes_service>& cr);
|
||||
void unset_client_routes(http_context& ctx, httpd::routes& r);
|
||||
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "collectd.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <fmt/ranges.h>
|
||||
@@ -18,9 +18,7 @@
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/estimated_histogram.hh"
|
||||
#include <algorithm>
|
||||
#include <sstream>
|
||||
#include "db/data_listeners.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "storage_service.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "unimplemented.hh"
|
||||
@@ -344,56 +342,6 @@ uint64_t accumulate_on_active_memtables(replica::table& t, noncopyable_function<
|
||||
return ret;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_toppartitions_generic(sharded<replica::database>& db, std::unique_ptr<http::request> req) {
|
||||
bool filters_provided = false;
|
||||
|
||||
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters {};
|
||||
if (auto filters = req->get_query_param("table_filters"); !filters.empty()) {
|
||||
filters_provided = true;
|
||||
std::stringstream ss { filters };
|
||||
std::string filter;
|
||||
while (!filters.empty() && ss.good()) {
|
||||
std::getline(ss, filter, ',');
|
||||
table_filters.emplace(parse_fully_qualified_cf_name(filter));
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> keyspace_filters {};
|
||||
if (auto filters = req->get_query_param("keyspace_filters"); !filters.empty()) {
|
||||
filters_provided = true;
|
||||
std::stringstream ss { filters };
|
||||
std::string filter;
|
||||
while (!filters.empty() && ss.good()) {
|
||||
std::getline(ss, filter, ',');
|
||||
keyspace_filters.emplace(std::move(filter));
|
||||
}
|
||||
}
|
||||
|
||||
// when the query is empty return immediately
|
||||
if (filters_provided && table_filters.empty() && keyspace_filters.empty()) {
|
||||
apilog.debug("toppartitions query: processing results");
|
||||
cf::toppartitions_query_results results;
|
||||
|
||||
results.read_cardinality = 0;
|
||||
results.write_cardinality = 0;
|
||||
|
||||
return make_ready_future<json::json_return_type>(results);
|
||||
}
|
||||
|
||||
api::req_param<std::chrono::milliseconds, unsigned> duration{*req, "duration", 1000ms};
|
||||
api::req_param<unsigned> capacity(*req, "capacity", 256);
|
||||
api::req_param<unsigned> list_size(*req, "list_size", 10);
|
||||
|
||||
apilog.info("toppartitions query: #table_filters={} #keyspace_filters={} duration={} list_size={} capacity={}",
|
||||
!table_filters.empty() ? std::to_string(table_filters.size()) : "all", !keyspace_filters.empty() ? std::to_string(keyspace_filters.size()) : "all", duration.value, list_size.value, capacity.value);
|
||||
|
||||
return seastar::do_with(db::toppartitions_query(db, std::move(table_filters), std::move(keyspace_filters), duration.value, list_size, capacity), [] (db::toppartitions_query& q) {
|
||||
return run_toppartitions_query(q);
|
||||
});
|
||||
}
|
||||
|
||||
void set_column_family(http_context& ctx, routes& r, sharded<replica::database>& db) {
|
||||
cf::get_column_family_name.set(r, [&db] (const_req req){
|
||||
std::vector<sstring> res;
|
||||
@@ -1099,10 +1047,6 @@ void set_column_family(http_context& ctx, routes& r, sharded<replica::database>&
|
||||
});
|
||||
});
|
||||
|
||||
ss::toppartitions_generic.set(r, [&db] (std::unique_ptr<http::request> req) {
|
||||
return rest_toppartitions_generic(db, std::move(req));
|
||||
});
|
||||
|
||||
cf::force_major_compaction.set(r, [&ctx, &db](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
if (!req->get_query_param("split_output").empty()) {
|
||||
fail(unimplemented::cause::API);
|
||||
@@ -1269,7 +1213,6 @@ void unset_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_sstable_count_per_level.unset(r);
|
||||
cf::get_sstables_for_key.unset(r);
|
||||
cf::toppartitions.unset(r);
|
||||
ss::toppartitions_generic.unset(r);
|
||||
cf::force_major_compaction.unset(r);
|
||||
ss::get_load.unset(r);
|
||||
ss::get_metrics_load.unset(r);
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "commitlog.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "api/api.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "build_mode.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#ifndef SCYLLA_BUILD_MODE_RELEASE
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "locator/snitch_base.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "api/api-doc/error_injection.json.hh"
|
||||
@@ -23,7 +23,7 @@ void set_error_injection(http_context& ctx, routes& r) {
|
||||
|
||||
hf::enable_injection.set(r, [](std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
sstring injection = req->get_path_param("injection");
|
||||
bool one_shot = strcasecmp(req->get_query_param("one_shot").c_str(), "true") == 0;
|
||||
bool one_shot = req->get_query_param("one_shot") == "True";
|
||||
auto params = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
|
||||
const size_t max_params_size = 1024 * 1024;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "failure_detector.hh"
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <vector>
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user