Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Kaul
4b8631fa3d Potential fix for code scanning alert no. 142: Workflow does not contain permissions
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2025-12-22 12:29:24 +02:00
4410 changed files with 38528 additions and 77020 deletions

View File

@@ -55,26 +55,22 @@ ninja build/<mode>/test/boost/<test_name>
ninja build/<mode>/scylla
# Run all tests in a file
./test.py --mode=<mode> test/<suite>/<test_name>.py
./test.py --mode=<mode> <test_path>
# Run a single test case from a file
./test.py --mode=<mode> test/<suite>/<test_name>.py::<test_function_name>
# Run all tests in a directory
./test.py --mode=<mode> test/<suite>/
./test.py --mode=<mode> <test_path>::<test_function_name>
# Examples
./test.py --mode=dev test/alternator/
./test.py --mode=dev test/cluster/test_raft_voters.py::test_raft_limited_voters_retain_coordinator
./test.py --mode=dev test/cqlpy/test_json.py
./test.py --mode=dev alternator/
./test.py --mode=dev cluster/test_raft_voters::test_raft_limited_voters_retain_coordinator
# Optional flags
./test.py --mode=dev test/cluster/test_raft_no_quorum.py -v # Verbose output
./test.py --mode=dev test/cluster/test_raft_no_quorum.py --repeat 5 # Repeat test 5 times
./test.py --mode=dev cluster/test_raft_no_quorum -v # Verbose output
./test.py --mode=dev cluster/test_raft_no_quorum --repeat 5 # Repeat test 5 times
```
**Important:**
- Use full path with `.py` extension (e.g., `test/cluster/test_raft_no_quorum.py`, not `cluster/test_raft_no_quorum`)
- Use path without `.py` extension (e.g., `cluster/test_raft_no_quorum`, not `cluster/test_raft_no_quorum.py`)
- To run a single test case, append `::<test_function_name>` to the file path
- Add `-v` for verbose output
- Add `--repeat <num>` to repeat a test multiple times
@@ -88,14 +84,3 @@ ninja build/<mode>/scylla
- Strive for simplicity and clarity, add complexity only when clearly justified
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context
## Test Philosophy
- Performance matters. Tests should run as quickly as possible. Sleeps in the code are highly discouraged and should be avoided, to reduce run time and flakiness.
- Stability matters. Tests should be stable. New tests should be executed 100 times at least to ensure they pass 100 out of 100 times. (use --repeat 100 --max-failures 1 when running it)
- Unit tests should ideally test one thing and one thing only.
- Tests for bug fixes should run before the fix - and show the failure and after the fix - and show they now pass.
- Tests for bug fixes should have in their comments which bug fixes (GitHub or JIRA issue) they test.
- Tests in debug are always slower, so if needed, reduce number of iterations, rows, data used, cycles, etc. in debug mode.
- Tests should strive to be repeatable, and not use random input that will make their results unpredictable.
- Tests should consume as little resources as possible. Prefer running tests on a single node if it is sufficient, for example.

View File

@@ -1,6 +1,6 @@
version: 2
updates:
- package-ecosystem: "uv"
- package-ecosystem: "pip"
directory: "/docs"
schedule:
interval: "daily"

View File

@@ -4,7 +4,7 @@
# Copyright (C) 2024-present ScyllaDB
#
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import argparse

View File

@@ -8,9 +8,6 @@ on:
jobs:
check-fixes-prefix:
runs-on: ubuntu-latest
permissions:
contents: read
issues: write
steps:
- name: Check PR body for "Fixes" prefix patterns
uses: actions/github-script@v7
@@ -21,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|(?:https://scylladb\\.atlassian\\.net/browse/)?([A-Z]+-\\d+))`;
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -1,53 +0,0 @@
name: Backport with Jira Integration
on:
push:
branches:
- master
- next-*.*
- branch-*.*
pull_request_target:
types: [labeled, closed]
branches:
- master
- next
- next-*.*
- branch-*.*
jobs:
backport-on-push:
if: github.event_name == 'push'
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'push'
base_branch: ${{ github.ref }}
commits: ${{ github.event.before }}..${{ github.sha }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
backport-on-label:
if: github.event_name == 'pull_request_target' && github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'labeled'
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
pull_request_number: ${{ github.event.pull_request.number }}
head_commit: ${{ github.event.pull_request.base.sha }}
label_name: ${{ github.event.label.name }}
pr_state: ${{ github.event.pull_request.state }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
backport-chain:
if: github.event_name == 'pull_request_target' && github.event.action == 'closed' && github.event.pull_request.merged == true
uses: scylladb/github-automation/.github/workflows/backport-with-jira.yaml@main
with:
event_type: 'chain'
base_branch: refs/heads/${{ github.event.pull_request.base.ref }}
pull_request_number: ${{ github.event.pull_request.number }}
pr_body: ${{ github.event.pull_request.body }}
secrets:
gh_token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status In Progress
on:
pull_request_target:
types: [opened]
jobs:
call-jira-status-in-progress:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,14 @@
name: Call Jira Status In Review
on:
pull_request_target:
types: [ready_for_review, review_requested]
jobs:
call-jira-status-in-review:
permissions:
contents: read
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status Ready For Merge
on:
pull_request_target:
types: [labeled]
jobs:
call-jira-status-update:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,18 +0,0 @@
name: Sync Jira Based on PR Events
on:
pull_request_target:
types: [opened, edited, ready_for_review, review_requested, labeled, unlabeled, closed]
permissions:
contents: read
pull-requests: write
issues: write
jobs:
jira-sync:
uses: scylladb/github-automation/.github/workflows/main_pr_events_jira_sync.yml@main
with:
caller_action: ${{ github.event.action }}
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,22 +0,0 @@
name: Sync Jira Based on PR Milestone Events
on:
pull_request_target:
types: [milestoned, demilestoned]
permissions:
contents: read
pull-requests: read
jobs:
jira-sync-milestone-set:
if: github.event.action == 'milestoned'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_set.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-milestone-removed:
if: github.event.action == 'demilestoned'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_milestone_removed.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -2,13 +2,13 @@ name: Call Jira release creation for new milestone
on:
milestone:
types: [created, closed]
types: [created]
jobs:
sync-milestone-to-jira:
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
with:
# Comma-separated list of Jira project keys
jira_project_keys: "SCYLLADB,CUSTOMER,SMI,RELENG,VECTOR"
jira_project_keys: "SCYLLADB,CUSTOMER"
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,18 +0,0 @@
name: validate_pr_author_email
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
permissions:
contents: read
pull-requests: write
statuses: write
jobs:
validate_pr_author_email:
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main

View File

@@ -7,7 +7,7 @@ on:
env:
HEADER_CHECK_LINES: 10
LICENSE: "LicenseRef-ScyllaDB-Source-Available-1.1"
LICENSE: "LicenseRef-ScyllaDB-Source-Available-1.0"
CHECKED_EXTENSIONS: ".cc .hh .py"
jobs:

View File

@@ -1,62 +0,0 @@
name: Close issues created by Scylla associates
on:
issues:
types: [opened, reopened]
permissions:
issues: write
jobs:
comment-and-close:
runs-on: ubuntu-latest
steps:
- name: Comment and close if author email is scylladb.com
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
const issue = context.payload.issue;
const actor = context.actor;
// Get user data (only public email is available)
const { data: user } = await github.rest.users.getByUsername({
username: actor,
});
const email = user.email || "";
console.log(`Actor: ${actor}, public email: ${email || "<none>"}`);
// Only continue if email exists and ends with @scylladb.com
if (!email || !email.toLowerCase().endsWith("@scylladb.com")) {
console.log("User is not a scylladb.com email (or email not public); skipping.");
return;
}
const owner = context.repo.owner;
const repo = context.repo.repo;
const issue_number = issue.number;
const body = "Issues in this repository are closed automatically. Scylla associates should use Jira to manage issues.\nPlease move this issue to Jira https://scylladb.atlassian.net/jira/software/c/projects/SCYLLADB/list";
// Add the comment
await github.rest.issues.createComment({
owner,
repo,
issue_number,
body,
});
console.log(`Comment added to #${issue_number}`);
// Close the issue
await github.rest.issues.update({
owner,
repo,
issue_number,
state: "closed",
state_reason: "not_planned"
});
console.log(`Issue #${issue_number} closed.`);

View File

@@ -13,5 +13,5 @@ jobs:
- uses: codespell-project/actions-codespell@master
with:
only_warn: 1
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison,iif,tread"
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison"
skip: "./.git,./build,./tools,*.js,*.lock,./test,./licenses,./redis/lolwut.cc,*.svg"

View File

@@ -18,10 +18,6 @@ on:
jobs:
release:
permissions:
pages: write
id-token: write
contents: write
runs-on: ubuntu-latest
steps:
- name: Checkout
@@ -33,9 +29,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install uv
uses: astral-sh/setup-uv@v6
python-version: "3.10"
- name: Set up env
run: make -C docs FLAG="${{ env.FLAG }}" setupenv
- name: Build docs

View File

@@ -2,9 +2,6 @@ name: "Docs / Build PR"
# For more information,
# see https://sphinx-theme.scylladb.com/stable/deployment/production.html#available-workflows
permissions:
contents: read
env:
FLAG: ${{ github.repository == 'scylladb/scylla-enterprise' && 'enterprise' || 'opensource' }}
@@ -29,9 +26,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install uv
uses: astral-sh/setup-uv@v6
python-version: "3.10"
- name: Set up env
run: make -C docs FLAG="${{ env.FLAG }}" setupenv
- name: Build docs

View File

@@ -1,8 +1,5 @@
name: Docs / Validate metrics
permissions:
contents: read
on:
pull_request:
branches:

View File

@@ -14,8 +14,7 @@ env:
CLEANER_DIRS: test/unit exceptions alternator api auth cdc compaction db dht gms index lang message mutation mutation_writer node_ops raft redis replica service
SEASTAR_BAD_INCLUDE_OUTPUT_PATH: build/seastar-bad-include.log
permissions:
contents: read
permissions: {}
# cancel the in-progress run upon a repush
concurrency:
@@ -35,6 +34,8 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- run: |
sudo dnf -y install clang-tools-extra
- name: Generate compilation database
run: |
cmake \

View File

@@ -10,8 +10,6 @@ on:
jobs:
read-toolchain:
runs-on: ubuntu-latest
permissions:
contents: read
outputs:
image: ${{ steps.read.outputs.image }}
steps:

View File

@@ -1,6 +1,4 @@
name: Trigger Scylla CI Route
permissions:
contents: read
on:
issue_comment:
@@ -11,56 +9,16 @@ on:
jobs:
trigger-jenkins:
if: (github.event_name == 'issue_comment' && github.event.comment.user.login != 'scylladbbot') || github.event.label.name == 'conflicts'
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
runs-on: ubuntu-latest
steps:
- name: Verify Org Membership
id: verify_author
env:
EVENT_NAME: ${{ github.event_name }}
PR_AUTHOR: ${{ github.event.pull_request.user.login }}
PR_ASSOCIATION: ${{ github.event.pull_request.author_association }}
COMMENT_AUTHOR: ${{ github.event.comment.user.login }}
COMMENT_ASSOCIATION: ${{ github.event.comment.author_association }}
shell: bash
run: |
if [[ "$EVENT_NAME" == "pull_request_target" ]]; then
AUTHOR="$PR_AUTHOR"
ASSOCIATION="$PR_ASSOCIATION"
else
AUTHOR="$COMMENT_AUTHOR"
ASSOCIATION="$COMMENT_ASSOCIATION"
fi
if [[ "$ASSOCIATION" == "MEMBER" || "$ASSOCIATION" == "OWNER" ]]; then
echo "member=true" >> $GITHUB_OUTPUT
else
echo "::warning::${AUTHOR} is not a member of scylladb (association: ${ASSOCIATION}); skipping CI trigger."
echo "member=false" >> $GITHUB_OUTPUT
fi
- name: Validate Comment Trigger
if: github.event_name == 'issue_comment'
id: verify_comment
env:
COMMENT_BODY: ${{ github.event.comment.body }}
shell: bash
run: |
CLEAN_BODY=$(echo "$COMMENT_BODY" | grep -v '^[[:space:]]*>')
if echo "$CLEAN_BODY" | grep -qi '@scylladbbot' && echo "$CLEAN_BODY" | grep -qi 'trigger-ci'; then
echo "trigger=true" >> $GITHUB_OUTPUT
else
echo "trigger=false" >> $GITHUB_OUTPUT
fi
- name: Trigger Scylla-CI-Route Jenkins Job
if: steps.verify_author.outputs.member == 'true' && (github.event_name == 'pull_request_target' || steps.verify_comment.outputs.trigger == 'true')
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
PR_NUMBER: "${{ github.event.issue.number || github.event.pull_request.number }}"
PR_REPO_NAME: "${{ github.event.repository.full_name }}"
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

View File

@@ -1,8 +1,5 @@
name: Trigger next gating
permissions:
contents: read
on:
push:
branches:

View File

@@ -2,12 +2,6 @@ cmake_minimum_required(VERSION 3.27)
project(scylla)
# Disable CMake's automatic -fcolor-diagnostics injection (CMake 3.24+ adds
# it for Clang+Ninja). configure.py does not add any color diagnostics flags,
# so we clear the internal CMake variable to prevent injection.
set(CMAKE_CXX_COMPILE_OPTIONS_COLOR_DIAGNOSTICS "")
set(CMAKE_C_COMPILE_OPTIONS_COLOR_DIAGNOSTICS "")
list(APPEND CMAKE_MODULE_PATH
${CMAKE_CURRENT_SOURCE_DIR}/cmake
${CMAKE_CURRENT_SOURCE_DIR}/seastar/cmake)
@@ -57,16 +51,6 @@ set(CMAKE_CXX_EXTENSIONS ON CACHE INTERNAL "")
set(CMAKE_CXX_SCAN_FOR_MODULES OFF CACHE INTERNAL "")
set(CMAKE_VISIBILITY_INLINES_HIDDEN ON)
# Global defines matching configure.py
# Since gcc 13, libgcc doesn't need the exception workaround
add_compile_definitions(SEASTAR_NO_EXCEPTION_HACK)
# Hacks needed to expose internal APIs for xxhash dependencies
add_compile_definitions(XXH_PRIVATE_API)
# SEASTAR_TESTING_MAIN is added later (after add_subdirectory(seastar) and
# add_subdirectory(abseil)) to avoid leaking into the seastar subdirectory.
# If SEASTAR_TESTING_MAIN is defined globally before seastar, it causes a
# duplicate 'main' symbol in seastar_testing.
if(is_multi_config)
find_package(Seastar)
# this is atypical compared to standard ExternalProject usage:
@@ -112,33 +96,12 @@ else()
set(Seastar_EXCLUDE_APPS_FROM_ALL ON CACHE BOOL "" FORCE)
set(Seastar_EXCLUDE_TESTS_FROM_ALL ON CACHE BOOL "" FORCE)
set(Seastar_IO_URING ON CACHE BOOL "" FORCE)
set(Seastar_SCHEDULING_GROUPS_COUNT 24 CACHE STRING "" FORCE)
set(Seastar_SCHEDULING_GROUPS_COUNT 21 CACHE STRING "" FORCE)
set(Seastar_UNUSED_RESULT_ERROR ON CACHE BOOL "" FORCE)
# Match configure.py's build_seastar_shared_libs: Debug and Dev
# build Seastar as a shared library, others build it static.
if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_BUILD_TYPE STREQUAL "Dev")
set(BUILD_SHARED_LIBS ON CACHE BOOL "" FORCE)
else()
set(BUILD_SHARED_LIBS OFF CACHE BOOL "" FORCE)
endif()
add_subdirectory(seastar)
# Coverage mode sets cmake_build_type='Debug' for Seastar
# (configure.py:515), so Seastar's pkg-config output includes sanitizer
# link flags in seastar_libs_coverage (configure.py:2514,2649).
# Seastar's own CMake only activates sanitizer targets for Debug/Sanitize
# configs, so we inject link options on the seastar target for Coverage.
# Using PUBLIC ensures they propagate to all targets linking Seastar
# (but not standalone tools like patchelf), matching configure.py's
# behavior. Compile-time flags and defines are handled globally in
# cmake/mode.Coverage.cmake.
if(CMAKE_BUILD_TYPE STREQUAL "Coverage")
target_link_options(seastar
PUBLIC
-fsanitize=address
-fsanitize=undefined
-fsanitize=vptr)
endif()
target_compile_definitions (seastar
PRIVATE
SEASTAR_NO_EXCEPTION_HACK)
endif()
set(ABSL_PROPAGATE_CXX_STD ON CACHE BOOL "" FORCE)
@@ -148,10 +111,8 @@ if(Scylla_ENABLE_LTO)
endif()
find_package(Sanitizers QUIET)
# Match configure.py:2192 — abseil gets sanitizer flags with -fno-sanitize=vptr
# to exclude vptr checks which are incompatible with abseil's usage.
list(APPEND absl_cxx_flags
$<$<CONFIG:Debug,Sanitize>:$<TARGET_PROPERTY:Sanitizers::address,INTERFACE_COMPILE_OPTIONS>;$<TARGET_PROPERTY:Sanitizers::undefined_behavior,INTERFACE_COMPILE_OPTIONS>;-fno-sanitize=vptr>)
$<$<CONFIG:Debug,Sanitize>:$<TARGET_PROPERTY:Sanitizers::address,INTERFACE_COMPILE_OPTIONS>;$<TARGET_PROPERTY:Sanitizers::undefined_behavior,INTERFACE_COMPILE_OPTIONS>>)
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
list(APPEND ABSL_GCC_FLAGS ${absl_cxx_flags})
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
@@ -176,38 +137,9 @@ add_library(absl::headers ALIAS absl-headers)
# unfortunately.
set_target_properties(absl_strerror PROPERTIES EXCLUDE_FROM_ALL TRUE)
# Now that seastar and abseil subdirectories are fully processed, add
# SEASTAR_TESTING_MAIN globally. This matches configure.py's global define
# without leaking into seastar (which would cause duplicate main symbols).
add_compile_definitions(SEASTAR_TESTING_MAIN)
# System libraries dependencies
find_package(Boost REQUIRED
COMPONENTS filesystem program_options system thread regex unit_test_framework)
# When using shared Boost libraries, define BOOST_ALL_DYN_LINK (matching configure.py)
if(NOT Boost_USE_STATIC_LIBS)
add_compile_definitions(BOOST_ALL_DYN_LINK)
endif()
# CMake's Boost package config adds per-component defines like
# BOOST_UNIT_TEST_FRAMEWORK_DYN_LINK, BOOST_REGEX_DYN_LINK, etc. on the
# imported targets. configure.py only uses BOOST_ALL_DYN_LINK (which covers
# all components), so strip the per-component defines to align the two build
# systems.
foreach(_boost_target
Boost::unit_test_framework
Boost::regex
Boost::filesystem
Boost::program_options
Boost::system
Boost::thread)
if(TARGET ${_boost_target})
# Completely remove all INTERFACE_COMPILE_DEFINITIONS from the Boost target.
# This prevents per-component *_DYN_LINK and *_NO_LIB defines from
# propagating. BOOST_ALL_DYN_LINK (set globally) covers all components.
set_property(TARGET ${_boost_target} PROPERTY INTERFACE_COMPILE_DEFINITIONS)
endif()
endforeach()
target_link_libraries(Boost::regex
INTERFACE
ICU::i18n
@@ -234,9 +166,6 @@ generate_scylla_version()
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
add_library(scylla-precompiled-header STATIC exported_templates.cc)
target_include_directories(scylla-precompiled-header PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}"
"${scylla_gen_build_dir}")
target_link_libraries(scylla-precompiled-header PRIVATE
absl::headers
absl::btree
@@ -267,10 +196,6 @@ if (Scylla_USE_PRECOMPILED_HEADER)
message(STATUS "Using precompiled header for Scylla - remember to add `sloppiness = pch_defines,time_macros` to ccache.conf, if you're using ccache.")
target_precompile_headers(scylla-precompiled-header PRIVATE "stdafx.hh")
target_compile_definitions(scylla-precompiled-header PRIVATE SCYLLA_USE_PRECOMPILED_HEADER)
# Match configure.py: -fpch-validate-input-files-content tells the compiler
# to check content of stdafx.hh if timestamps don't match (important for
# ccache/git workflows where timestamps may not be preserved).
add_compile_options(-fpch-validate-input-files-content)
endif()
else()
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
@@ -375,6 +300,7 @@ add_subdirectory(locator)
add_subdirectory(message)
add_subdirectory(mutation)
add_subdirectory(mutation_writer)
add_subdirectory(node_ops)
add_subdirectory(readers)
add_subdirectory(replica)
add_subdirectory(raft)

View File

@@ -1,8 +1,8 @@
## **SCYLLADB SOFTWARE LICENSE AGREEMENT**
| Version: | 1.1 |
| Version: | 1.0 |
| :---- | :---- |
| Last updated: | April 12, 2026 |
| Last updated: | December 18, 2024 |
**Your Acceptance**
@@ -12,48 +12,20 @@ The terms "**You**" or "**Licensee**" refer to any individual accessing or using
**Grant of License**
* **Definitions:**
1. **Software:** Software means the ScyllaDB software provided by Licensor, including the source code, object code, and any accompanying documentation or tools, or any part thereof, as made available under this Agreement.
2. **Commercial Customer**: means any legal entity (including its Affiliates) that has entered into a transaction with Licensor, or an authorized reseller/distributor, for the provision of any ScyllaDB products or services. This includes, without limitation: (a) Scope of Service: Any paid subscription, enterprise license, "BYOA" or Database-as-a-Service (DBaaS) offering, technical support, professional services, consulting, or training. (b) Scale and Volume: Any deployment regardless of size, capacity, or performance metrics (c) Payment Method: Any compensation model, including but not limited to, fixed-fee, consumption-based (On-Demand), committed spend, third-party marketplace credits (e.g., AWS, GCP, Azure), or promotional credits and discounts.
* **Grant of License:** Subject to the terms and conditions of this Agreement, including the Eligibility and Exclusive Use Restrictions clause, Licensor grants You a limited, non-exclusive, revocable, non-sublicensable, non-transferable, royalty free license to Use the Software, in each case solely for the purposes of:
* **Software Definitions:** Software means the ScyllaDB software provided by Licensor, including the source code, object code, and any accompanying documentation or tools, or any part thereof, as made available under this Agreement.
* **Grant of License:** Subject to the terms and conditions of this Agreement, Licensor grants You a limited, non-exclusive, revocable, non-sublicensable, non-transferable, royalty free license to Use the Software, in each case solely for the purposes of:
1) Copying, distributing, evaluating (including performing benchmarking or comparative tests or evaluations , subject to the limitations below) and improving the Software and ScyllaDB; and
2) create a modified version of the Software (each, a "**Licensed Work**"); provided however, that each such Licensed Work keeps all or substantially all of the functions and features of the Software, and/or using all or substantially all of the source code of the Software. You hereby agree that all the Licensed Work are, upon creation, considered Licensed Work of the Licensor, shall be the sole property of the Licensor and its assignees, and the Licensor and its assignees shall be the sole owner of all rights of any kind or nature, in connection with such Licensed Work. You hereby irrevocably and unconditionally assign to the Licensor all the Licensed Work and any part thereof. This License applies separately for each version of the Licensed Work, which shall be considered "Software" for the purpose of this Agreement.
* **Eligibility and Exclusive Use Restrictions**
i. Restricted to "Never Customers" Only. The license granted under this Agreement is strictly limited to Never Customers. For purposes of this Agreement, a "Never Customer" is an entity (including its Affiliates) that does not have, and has not had within the previous twelve (12) months, a paid commercial subscription, professional services agreement, or any other commercial relationship with Licensor. Satisfaction of the Never Customer criteria is a strict condition precedent to the effectiveness of this License.
ii. Total Prohibition for Existing Commercial Customers. If You (or any of Your Affiliates) are an existing Commercial Customer of Licensor within the last twelve (12) months, no license is deemed to have been offered or extended to You, and any download or installation of the Software by You is unauthorized. This prohibition applies to all deployments, including but not limited to:
(a) existing commercial workloads;
(b) any new use cases, new applications, or new workloads
iii. **No "Hybrid" Usage**. Licensee is expressly prohibited from combining free tier usage under this Agreement with paid commercial units.
If You are a Commercial Customer, all use of the Software across Your entire organization (and any of your Affiliates) must be governed by a valid, paid commercial agreement. Use of the Software under this license by a Commercial Customer (which is not a "Never Customer") shall:
(a) Void this license *ab initio*;
(b) Be deemed a material breach of both this Agreement and any existing commercial terms; and
(c) Entitle Licensor to invoice Licensee for such unauthorized usage at Licensor's standard list prices, retroactive to the date of first use.
Notwithstanding anything to the contrary in the Eligibility or License Limitations sections above a Commercial Customer may use the Software exclusively for non-production purposes, including Continuous Integration (CI), automated testing, and quality assurance environments, provided that such use at all times remains compliant with the Usage Limit.
iv. **Verification**. Licensor reserves the right to audit Licensee's environment and corporate identity to ensure compliance with these eligibility criteria.
For the purposes of this Agreement an "**Affiliate**" means any entity that directly or indirectly controls, is controlled by, or is under common control with a party, where "control" means ownership of more than 50% of the voting stock or decision-making authority
**License Limitations, Restrictions and Obligations:** The license grant above is subject to the following limitations, restrictions, and obligations. If Licensees Use of the Software does not comply with the above license grant or the terms of this section (including exceeding the Usage Limit set forth below), Licensee must: (i) refrain from any Use of the Software; and (ii) unless Licensee is a Never Customer, purchase a [commercial paid license](https://www.scylladb.com/scylladb-proprietary-software-license-agreement/) from the Licensor.
**License Limitations, Restrictions and Obligations:** The license grant above is subject to the following limitations, restrictions, and obligations. If Licensees Use of the Software does not comply with the above license grant or the terms of this section (including exceeding the Usage Limit set forth below), Licensee must: (i) refrain from any Use of the Software; and (ii) purchase a [commercial paid license](https://www.scylladb.com/scylladb-proprietary-software-license-agreement/) from the Licensor.
* **Updates:** You shall be solely responsible for providing all equipment, systems, assets, access, and ancillary goods and services needed to access and Use the Software. Licensor may modify or update the Software at any time, without notification, in its sole and absolute discretion. After the effective date of each such update, Licensor shall bear no obligation to run, provide or support legacy versions of the Software.
* **"Usage Limit":** Licensee's total overall available storage across all deployments and clusters of the Software and the Licensed Work under this License shall not exceed 10TB and/or an upper limit of 50 VCPUs (hyper threads).
* **IP Markings:** Licensee must retain all copyright, trademark, and other proprietary notices contained in the Software. You will not modify, delete, alter, remove, or obscure any intellectual property, including without limitations licensing, copyright, trademark, or any other notices of Licensor in the Software.
* **License Reproduction:** You must conspicuously display this Agreement on each copy of the Software. If You receive the Software from a third party, this Agreement still applies to Your Use of the Software. You will be responsible for any breach of this Agreement by any such third-party.
* Distribution of any Licensed Works is permitted, provided that: (i) You must include in any Licensed Work prominent notices stating that You have modified the Software, (ii) You include a copy of this Agreement with the Licensed Work, and (iii) You clearly identify all modifications made in the Licensed Work and provides attribution to the Licensor as the original author(s) of the Software.
* **Commercial Use Restrictions:** Licensee may not offer the Software as a software-as-a-service (SaaS) or commercial database-as-as-service (dBaaS) offering. Licensee may not use the Software to compete with Licensor's existing or future products or services. If your Use of the Software does not comply with the requirements currently in effect as described in this License, you must purchase a commercial license from the Licensor, its Affiliated entities, or you must refrain from using the Software and all Licensed Work. Furthermore, if You make any written claim of patent infringement relating to the Software, Your patent license for the Software granted under this Agreement terminates immediately.
* **Commercial Use Restrictions:** Licensee may not offer the Software as a software-as-a-service (SaaS) or commercial database-as-as-service (dBaaS) offering. Licensee may not use the Software to compete with Licensor's existing or future products or services. If your Use of the Software does not comply with the requirements currently in effect as described in this License, you must purchase a commercial license from the Licensor, its affiliated entities, or you must refrain from using the Software and all Licensed Work. Furthermore, if You make any written claim of patent infringement relating to the Software, Your patent license for the Software granted under this Agreement terminates immediately.
* Notwithstanding anything to the contrary, under the License granted hereunder, You shall not and shall not permit others to: (i) transfer the Software or any portions thereof to any other party except as expressly permitted herein; (ii) attempt to circumvent or overcome any technological protection measures incorporated into the Software; (iii) incorporate the Software into the structure, machinery or controls of any aircraft, other aerial device, military vehicle, hovercraft, waterborne craft or any medical equipment of any kind; or (iv) use the Software or any part thereof in any unlawful, harmful or illegal manner, or in a manner which infringes third parties rights in any way, including intellectual property rights.
**Monitoring; Audit**
@@ -69,14 +41,14 @@ For the purposes of this Agreement an "**Affiliate**" means any entity that dire
**Indemnity; Disclaimer; Limitation of Liability**
* **Indemnity:** Licensee hereby agrees to indemnify, defend and hold harmless Licensor and its Affiliates from any losses or damages incurred due to a third party claim arising out of: (i) Licensees breach of this Agreement; (ii) Licensees negligence, willful misconduct or violation of law, or (iii) Licensees products or services.
* **Indemnity:** Licensee hereby agrees to indemnify, defend and hold harmless Licensor and its affiliates from any losses or damages incurred due to a third party claim arising out of: (i) Licensees breach of this Agreement; (ii) Licensees negligence, willful misconduct or violation of law, or (iii) Licensees products or services.
* DISCLAIMER OF WARRANTIES: LICENSEE AGREES THAT LICENSOR HAS MADE NO EXPRESS WARRANTIES REGARDING THE SOFTWARE AND THAT THE SOFTWARE IS BEING PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. LICENSOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THE SOFTWARE, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF FITNESS FOR A PARTICULAR PURPOSE; TITLE; MERCHANTABILITY; OR NON-INFRINGEMENT OF THIRD PARTY RIGHTS. LICENSOR DOES NOT WARRANT THAT THE SOFTWARE WILL OPERATE UNINTERRUPTED OR ERROR FREE, OR THAT ALL ERRORS WILL BE CORRECTED. LICENSOR DOES NOT GUARANTEE ANY PARTICULAR RESULTS FROM THE USE OF THE SOFTWARE, AND DOES NOT WARRANT THAT THE SOFTWARE IS FIT FOR ANY PARTICULAR PURPOSE.
* LIMITATION OF LIABILITY: TO THE FULLEST EXTENT PERMISSIBLE UNDER APPLICABLE LAW, IN NO EVENT WILL LICENSOR AND/OR ITS AFFILIATES, EMPLOYEES, OFFICERS AND DIRECTORS BE LIABLE TO LICENSEE FOR (I) ANY LOSS OF USE OR DATA; INTERRUPTION OF BUSINESS; OR ANY INDIRECT; SPECIAL; INCIDENTAL; OR CONSEQUENTIAL DAMAGES OF ANY KIND (INCLUDING LOST PROFITS); AND (II) ANY DIRECT DAMAGES EXCEEDING THE TOTAL AMOUNT OF ONE THOUSAND US DOLLARS ($1,000). THE FOREGOING PROVISIONS LIMITING THE LIABILITY OF LICENSOR SHALL APPLY REGARDLESS OF THE FORM OR CAUSE OF ACTION, WHETHER IN STRICT LIABILITY, CONTRACT OR TORT.
**Proprietary Rights; No Other Rights**
* **Ownership:** Licensor retains sole and exclusive ownership of all rights, interests and title in the Software and any scripts, processes, techniques, methodologies, inventions, know-how, concepts, formatting, arrangements, visual attributes, ideas, database rights, copyrights, patents, trade secrets, and other intellectual property related thereto, and all derivatives, enhancements, modifications and improvements thereof. Except for the limited license rights granted herein, Licensee has no rights in or to the Software and/ or Licensors trademarks, logo, or branding and You acknowledge that such Software, trademarks, logo, or branding is the sole property of Licensor.
* **Feedback:** Licensee is not required to provide any suggestions, enhancement requests, recommendations or other feedback regarding the Software ("Feedback"). If, notwithstanding this policy, Licensee submits Feedback, Licensee understands and acknowledges that such Feedback is not submitted in confidence and Licensor assumes no obligation, expressed or implied, by considering it. All right in any trademark or logo of Licensor or its Affiliates and You shall make no claim of right to the Software or any part thereof to be supplied by Licensor hereunder and acknowledges that as between Licensor and You, such Software is the sole proprietary, title and interest in and to Licensor.such Feedback shall be assigned to, and shall become the sole and exclusive property of, Licensor upon its creation.
* **Feedback:** Licensee is not required to provide any suggestions, enhancement requests, recommendations or other feedback regarding the Software ("Feedback"). If, notwithstanding this policy, Licensee submits Feedback, Licensee understands and acknowledges that such Feedback is not submitted in confidence and Licensor assumes no obligation, expressed or implied, by considering it. All right in any trademark or logo of Licensor or its affiliates and You shall make no claim of right to the Software or any part thereof to be supplied by Licensor hereunder and acknowledges that as between Licensor and You, such Software is the sole proprietary, title and interest in and to Licensor.such Feedback shall be assigned to, and shall become the sole and exclusive property of, Licensor upon its creation.
* Except for the rights expressly granted to You under this Agreement, You are not granted any other licenses or rights in the Software or otherwise. This Agreement constitutes the entire agreement between You and the Licensor with respect to the subject matter hereof and supersedes all prior or contemporaneous communications, representations, or agreements, whether oral or written.
* **Third-Party Software:** Customer acknowledges that the Software may contain open and closed source components (“OSS Components”) that are governed separately by certain licenses, in each case as further provided by Company upon request. Any applicable OSS Component license is solely between Licensee and the applicable licensor of the OSS Component and Licensee shall comply with the applicable OSS Component license.
* If any provision of this Agreement is held to be invalid or unenforceable, such provision shall be struck and the remaining provisions shall remain in full force and effect.
@@ -84,7 +56,7 @@ For the purposes of this Agreement an "**Affiliate**" means any entity that dire
**Miscellaneous**
* **Miscellaneous:** This Agreement may be modified at any time by Licensor, and constitutes the entire agreement between the parties with respect to the subject matter hereof. Licensee may not assign or subcontract its rights or obligations under this Agreement. This Agreement does not, and shall not be construed to create any relationship, partnership, joint venture, employer-employee, agency, or franchisor-franchisee relationship between the parties.
* **Modifications**: Licensor reserves the right to modify this Agreement at any time. Changes will be effective upon posting to the Website or within the Software repository. Continued use of the Software after such changes constitutes acceptance.
* **Governing Law & Jurisdiction:** This Agreement shall be governed and construed in accordance with the laws of Israel, without giving effect to their respective conflicts of laws provisions, and the competent courts situated in Tel Aviv, Israel, shall have sole and exclusive jurisdiction over the parties and any conflict and/or dispute arising out of, or in connection to, this Agreement
\[*End of ScyllaDB Software License Agreement*\]

View File

@@ -43,7 +43,7 @@ For further information, please see:
[developer documentation]: HACKING.md
[build documentation]: docs/dev/building.md
[docker image build documentation]: dist/docker/redhat/README.md
[docker image build documentation]: dist/docker/debian/README.md
## Running Scylla

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.2.0-dev
VERSION=2026.1.0-dev
if test -f version
then

2
abseil

Submodule abseil updated: 255c84dadd...d7aaad83b4

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "absl-flat_hash_map.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -18,7 +18,6 @@ target_sources(alternator
consumed_capacity.cc
ttl.cc
parsed_expression_cache.cc
http_compression.cc
${cql_grammar_srcs})
target_include_directories(alternator
PUBLIC

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "alternator/error.hh"
@@ -13,8 +13,7 @@
#include <string_view>
#include "alternator/auth.hh"
#include <fmt/format.h>
#include "db/consistency_level_type.hh"
#include "db/system_keyspace.hh"
#include "auth/password_authenticator.hh"
#include "service/storage_proxy.hh"
#include "alternator/executor.hh"
#include "cql3/selection/selection.hh"
@@ -26,8 +25,8 @@ namespace alternator {
static logging::logger alogger("alternator-auth");
future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::string username) {
schema_ptr schema = proxy.data_dictionary().find_schema(db::system_keyspace::NAME, "roles");
future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::service& as, std::string username) {
schema_ptr schema = proxy.data_dictionary().find_schema(auth::get_auth_ks_name(as.query_processor()), "roles");
partition_key pk = partition_key::from_single_value(*schema, utf8_type->decompose(username));
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
@@ -40,7 +39,7 @@ future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::strin
auto partition_slice = query::partition_slice(std::move(bounds), {}, query::column_id_vector{salted_hash_col->id, can_login_col->id}, selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice,
proxy.get_max_result_size(partition_slice), query::tombstone_limit(proxy.get_tombstone_limit()));
auto cl = db::consistency_level::LOCAL_ONE;
auto cl = auth::password_authenticator::consistency_for_user(username);
service::client_state client_state{service::client_state::internal_tag()};
service::storage_proxy::coordinator_query_result qr = co_await proxy.query(schema, std::move(command), std::move(partition_ranges), cl,

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -20,6 +20,6 @@ namespace alternator {
using key_cache = utils::loading_cache<std::string, std::string, 1>;
future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::string username);
future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::service& as, std::string username);
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <string_view>
@@ -618,7 +618,7 @@ conditional_operator_type get_conditional_operator(const rjson::value& req) {
// Check if the existing values of the item (previous_item) match the
// conditions given by the Expected and ConditionalOperator parameters
// (if they exist) in the request (an UpdateItem, PutItem or DeleteItem).
// This function can throw a ValidationException API error if there
// This function can throw an ValidationException API error if there
// are errors in the format of the condition itself.
bool verify_expected(const rjson::value& req, const rjson::value* previous_item) {
const rjson::value* expected = rjson::find(req, "Expected");

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
/*

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "consumed_capacity.hh"
@@ -45,7 +45,7 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
}
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
if (_should_add_to_response) {
if (_should_add_to_reponse) {
auto consumption = rjson::empty_object();
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
rjson::add(response, "ConsumedCapacity", std::move(consumption));

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -28,9 +28,9 @@ namespace alternator {
class consumed_capacity_counter {
public:
consumed_capacity_counter() = default;
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){}
consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
bool operator()() const noexcept {
return _should_add_to_response;
return _should_add_to_reponse;
}
consumed_capacity_counter& operator +=(uint64_t bytes);
@@ -44,7 +44,7 @@ public:
uint64_t _total_bytes = 0;
static bool should_add_capacity(const rjson::value& request);
protected:
bool _should_add_to_response = false;
bool _should_add_to_reponse = false;
};
class rcu_consumed_capacity_counter : public consumed_capacity_counter {

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/with_scheduling_group.hh>
@@ -28,7 +28,6 @@ static logging::logger logger("alternator_controller");
controller::controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -40,7 +39,6 @@ controller::controller(
: protocol_server(sg)
, _gossiper(gossiper)
, _proxy(proxy)
, _ss(ss)
, _mm(mm)
, _sys_dist_ks(sys_dist_ks)
, _cdc_gen_svc(cdc_gen_svc)
@@ -91,7 +89,7 @@ future<> controller::start_server() {
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
return cfg.alternator_timeout_in_ms;
};
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
@@ -105,23 +103,11 @@ future<> controller::start_server() {
alternator_port = _config.alternator_port();
_listen_addresses.push_back({addr, *alternator_port});
}
std::optional<uint16_t> alternator_port_proxy_protocol;
if (_config.alternator_port_proxy_protocol()) {
alternator_port_proxy_protocol = _config.alternator_port_proxy_protocol();
_listen_addresses.push_back({addr, *alternator_port_proxy_protocol});
}
std::optional<uint16_t> alternator_https_port;
std::optional<uint16_t> alternator_https_port_proxy_protocol;
std::optional<tls::credentials_builder> creds;
if (_config.alternator_https_port() || _config.alternator_https_port_proxy_protocol()) {
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
_listen_addresses.push_back({addr, *alternator_https_port});
}
if (_config.alternator_https_port_proxy_protocol()) {
alternator_https_port_proxy_protocol = _config.alternator_https_port_proxy_protocol();
_listen_addresses.push_back({addr, *alternator_https_port_proxy_protocol});
}
if (_config.alternator_https_port()) {
alternator_https_port = _config.alternator_https_port();
_listen_addresses.push_back({addr, *alternator_https_port});
creds.emplace();
auto opts = _config.alternator_encryption_options();
if (opts.empty()) {
@@ -147,29 +133,20 @@ future<> controller::start_server() {
}
}
_server.invoke_on_all(
[this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds,
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
_config.alternator_max_users_query_size_in_trace_output,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).handle_exception([this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] (std::exception_ptr ep) {
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}, proxy-protocol port {}, TLS proxy-protocol port {}: {}",
addr,
alternator_port ? std::to_string(*alternator_port) : "OFF",
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF",
ep);
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}: {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF", ep);
return stop_server().then([ep = std::move(ep)] { return make_exception_future<>(ep); });
}).then([addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}, proxy-protocol port {}, TLS proxy-protocol port {}",
addr,
alternator_port ? std::to_string(*alternator_port) : "OFF",
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF");
}).then([addr, alternator_port, alternator_https_port] {
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
}).get();
});
}
@@ -192,7 +169,7 @@ future<> controller::request_stop_server() {
});
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
future<utils::chunked_vector<client_data>> controller::get_client_data() {
return _server.local().get_client_data();
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -15,7 +15,6 @@
namespace service {
class storage_proxy;
class storage_service;
class migration_manager;
class memory_limiter;
}
@@ -58,7 +57,6 @@ class server;
class controller : public protocol_server {
sharded<gms::gossiper>& _gossiper;
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<service::migration_manager>& _mm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<cdc::generation_service>& _cdc_gen_svc;
@@ -76,7 +74,6 @@ public:
controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -96,7 +93,7 @@ public:
// This virtual function is called (on each shard separately) when the
// virtual table "system.clients" is read. It is expected to generate a
// list of clients connected to this server (on this shard).
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
};
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <fmt/ranges.h>
@@ -17,7 +17,6 @@
#include "auth/service.hh"
#include "db/config.hh"
#include "db/view/view_build_status.hh"
#include "locator/tablets.hh"
#include "mutation/tombstone.hh"
#include "locator/abstract_replication_strategy.hh"
#include "utils/log.hh"
@@ -63,20 +62,11 @@
#include "types/types.hh"
#include "db/system_keyspace.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "alternator/ttl_tag.hh"
using namespace std::chrono_literals;
logging::logger elogger("alternator-executor");
namespace std {
template <> struct hash<std::pair<sstring, sstring>> {
size_t operator () (const std::pair<sstring, sstring>& p) const {
return std::hash<sstring>()(p.first) * 1009 + std::hash<sstring>()(p.second) * 3;
}
};
}
namespace alternator {
// Alternator-specific table properties stored as hidden table tags:
@@ -165,7 +155,7 @@ static map_type attrs_type() {
static const column_definition& attrs_column(const schema& schema) {
const column_definition* cdef = schema.get_column_definition(bytes(executor::ATTRS_COLUMN_NAME));
throwing_assert(cdef);
SCYLLA_ASSERT(cdef);
return *cdef;
}
@@ -238,7 +228,7 @@ static void validate_is_object(const rjson::value& value, const char* caller) {
}
// This function assumes the given value is an object and returns requested member value.
// If it is not possible, an api_error::validation is thrown.
// If it is not possible an api_error::validation is thrown.
static const rjson::value& get_member(const rjson::value& obj, const char* member_name, const char* caller) {
validate_is_object(obj, caller);
const rjson::value* ret = rjson::find(obj, member_name);
@@ -250,7 +240,7 @@ static const rjson::value& get_member(const rjson::value& obj, const char* membe
// This function assumes the given value is an object with a single member, and returns this member.
// In case the requirements are not met, an api_error::validation is thrown.
// In case the requirements are not met an api_error::validation is thrown.
static const rjson::value::Member& get_single_member(const rjson::value& v, const char* caller) {
if (!v.IsObject() || v.MemberCount() != 1) {
throw api_error::validation(format("{}: expected an object with a single member.", caller));
@@ -258,66 +248,14 @@ static const rjson::value::Member& get_single_member(const rjson::value& v, cons
return *(v.MemberBegin());
}
class executor::describe_table_info_manager : public service::migration_listener::empty_listener {
executor &_executor;
struct table_info {
utils::simple_value_with_expiry<std::uint64_t> size_in_bytes;
};
std::unordered_map<std::pair<sstring, sstring>, table_info> info_for_tables;
bool active = false;
public:
describe_table_info_manager(executor& executor) : _executor(executor) {
_executor._proxy.data_dictionary().real_database_ptr()->get_notifier().register_listener(this);
active = true;
}
describe_table_info_manager(const describe_table_info_manager &) = delete;
describe_table_info_manager(describe_table_info_manager&&) = delete;
~describe_table_info_manager() {
if (active) {
on_fatal_internal_error(elogger, "describe_table_info_manager was not stopped before destruction");
}
}
describe_table_info_manager &operator = (const describe_table_info_manager &) = delete;
describe_table_info_manager &operator = (describe_table_info_manager&&) = delete;
static std::chrono::high_resolution_clock::time_point now() {
return std::chrono::high_resolution_clock::now();
}
std::optional<std::uint64_t> get_cached_size_in_bytes(const sstring &ks_name, const sstring &cf_name) const {
auto it = info_for_tables.find({ks_name, cf_name});
if (it != info_for_tables.end()) {
return it->second.size_in_bytes.get();
}
return std::nullopt;
}
void cache_size_in_bytes(sstring ks_name, sstring cf_name, std::uint64_t size_in_bytes, std::chrono::high_resolution_clock::time_point expiry) {
info_for_tables[{std::move(ks_name), std::move(cf_name)}].size_in_bytes.set_if_longer_expiry(size_in_bytes, expiry);
}
future<> stop() {
co_await _executor._proxy.data_dictionary().real_database_ptr()->get_notifier().unregister_listener(this);
active = false;
co_return;
}
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
if (!ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX)) return;
info_for_tables.erase({ks_name, cf_name});
}
};
executor::executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
smp_service_group ssg,
utils::updateable_value<uint32_t> default_timeout_in_ms)
: _gossiper(gossiper),
_ss(ss),
_proxy(proxy),
_mm(mm),
_sdks(sdks),
@@ -330,7 +268,6 @@ executor::executor(gms::gossiper& gossiper,
_stats))
{
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
_describe_table_info_manager = std::make_unique<describe_table_info_manager>(*this);
register_metrics(_metrics, _stats);
}
@@ -683,7 +620,7 @@ static std::optional<int> get_int_attribute(const rjson::value& value, std::stri
}
// Sets a KeySchema object inside the given JSON parent describing the key
// attributes of the given schema as being either HASH or RANGE keys.
// attributes of the the given schema as being either HASH or RANGE keys.
// Additionally, adds to a given map mappings between the key attribute
// names and their type (as a DynamoDB type string).
void executor::describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>* attribute_types, const std::map<sstring, sstring> *tags) {
@@ -815,44 +752,12 @@ static future<bool> is_view_built(
}
future<> executor::cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl) {
auto expiry = describe_table_info_manager::now() + ttl;
return container().invoke_on_all(
[schema, size_in_bytes, expiry] (executor& exec) {
exec._describe_table_info_manager->cache_size_in_bytes(schema->ks_name(), schema->cf_name(), size_in_bytes, expiry);
});
}
future<> executor::fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting) {
auto cached_size = _describe_table_info_manager->get_cached_size_in_bytes(schema->ks_name(), schema->cf_name());
std::uint64_t total_size = 0;
if (cached_size) {
total_size = *cached_size;
} else {
// there's no point in trying to estimate value of table that is being deleted, as other nodes more often than not might
// move forward with deletion faster than we calculate the size
if (!deleting) {
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// which is also fine, as the specification doesn't give precision guarantees of any kind.
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
}
}
rjson::add(table_description, "TableSizeBytes", total_size);
}
future<rjson::value> executor::fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
{
rjson::value table_description = rjson::empty_object();
auto tags_ptr = db::get_tags_of_table(schema);
rjson::add(table_description, "TableName", rjson::from_string(schema->cf_name()));
co_await fill_table_size(table_description, schema, tbl_status == table_status::deleting);
auto creation_timestamp = get_table_creation_time(*schema);
@@ -896,7 +801,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", wcu);
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
data_dictionary::table t = proxy.data_dictionary().find_column_family(schema);
if (tbl_status != table_status::deleting) {
rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp));
@@ -917,7 +824,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
sstring index_name = cf_name.substr(delim_it + 1);
rjson::add(view_entry, "IndexName", rjson::from_string(index_name));
rjson::add(view_entry, "IndexArn", generate_arn_for_index(*schema, index_name));
// Add index's KeySchema and collect types for AttributeDefinitions:
// Add indexes's KeySchema and collect types for AttributeDefinitions:
executor::describe_key_schema(view_entry, *vptr, key_attribute_types, db::get_tags_of_table(vptr));
// Add projection type
rjson::value projection = rjson::empty_object();
@@ -933,7 +840,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
// (for a built view) or CREATING+Backfilling (if view building
// is in progress).
if (!is_lsi) {
if (co_await is_view_built(vptr, _proxy, client_state, trace_state, permit)) {
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
rjson::add(view_entry, "IndexStatus", "ACTIVE");
} else {
rjson::add(view_entry, "IndexStatus", "CREATING");
@@ -961,8 +868,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
}
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
}
executor::supplement_table_stream_info(table_description, *schema, _proxy);
executor::supplement_table_stream_info(table_description, *schema, proxy);
// FIXME: still missing some response fields (issue #5026)
co_return table_description;
}
@@ -982,7 +890,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
rjson::value table_description = co_await fill_table_description(schema, table_status::active, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
rjson::value response = rjson::empty_object();
rjson::add(response, "Table", std::move(table_description));
elogger.trace("returning {}", response);
@@ -1085,7 +993,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
auto& p = _proxy.container();
schema_ptr schema = get_table(_proxy, request);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
size_t retries = mm.get_concurrent_ddl_retries();
@@ -1649,8 +1557,9 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
}
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
throwing_assert(this_shard_id() == 0);
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
// command. We can't inspect the current database schema at this point
@@ -1836,7 +1745,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
if (executor::add_stream_options(*stream_specification, builder, _proxy)) {
if (executor::add_stream_options(*stream_specification, builder, sp)) {
validate_cdc_log_name_length(builder.cf_name());
}
}
@@ -1855,7 +1764,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
set_table_creation_time(tags_map, db_clock::now());
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, _stats);
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
schema_ptr schema = builder.build();
for (auto& view_builder : view_builders) {
@@ -1871,49 +1780,38 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
}
size_t retries = _mm.get_concurrent_ddl_retries();
size_t retries = mm.get_concurrent_ddl_retries();
for (;;) {
auto group0_guard = co_await _mm.start_group0_operation();
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
utils::chunked_vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
if (stream_specification && stream_specification->IsObject()) {
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = sp.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
if (rs->uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
}
}
}
// Creating an index in tablets mode requires the keyspace to be RF-rack-valid.
// GSI and LSI indexes are based on materialized views which require RF-rack-validity to avoid consistency issues.
if (!view_builders.empty() || _proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
try {
locator::assert_rf_rack_valid_keyspace(keyspace_name, _proxy.local_db().get_token_metadata_ptr(), *rs);
} catch (const std::invalid_argument& ex) {
if (!view_builders.empty()) {
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes and LocalSecondaryIndexes on a table "
"using tablets require the number of racks in the cluster to be either 1 or 3"));
} else {
co_return api_error::validation(fmt::format("Cannot create table '{}' with tablets: the configuration "
"option 'rf_rack_valid_keyspaces' is enabled, which enforces that tables using tablets can only be created in clusters "
"that have either 1 or 3 racks", table_name));
}
}
// Creating an index in tablets mode requires the rf_rack_valid_keyspaces option to be enabled.
// GSI and LSI indexes are based on materialized views which require this option to avoid consistency issues.
if (!view_builders.empty() && ksm->uses_tablets() && !sp.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
co_return api_error::validation("GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
}
try {
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
} catch (exceptions::already_exists_exception&) {
if (_proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
co_return api_error::resource_in_use(fmt::format("Table {} already exists", table_name));
}
}
if (_proxy.data_dictionary().try_find_table(schema->id())) {
if (sp.data_dictionary().try_find_table(schema->id())) {
// This should never happen, the ID is supposed to be unique
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
}
@@ -1922,9 +1820,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
for (schema_builder& view_builder : view_builders) {
schemas.push_back(view_builder.build());
}
co_await service::prepare_new_column_families_announcement(schema_mutations, _proxy, *ksm, schemas, ts);
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
if (ksm->uses_tablets()) {
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, _proxy);
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
}
// If a role is allowed to create a table, we must give it permissions to
@@ -1949,7 +1847,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
std::tie(schema_mutations, group0_guard) = co_await std::move(mc).extract();
try {
co_await _mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
break;
} catch (const service::group0_concurrent_modification& ex) {
elogger.info("Failed to execute CreateTable {} due to concurrent schema modifications. {}.",
@@ -1961,9 +1859,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
}
co_await _mm.wait_for_schema_agreement(_proxy.local_db(), db::timeout_clock::now() + 10s, nullptr);
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
rjson::value status = rjson::empty_object();
executor::supplement_table_info(request, *schema, _proxy);
executor::supplement_table_info(request, *schema, sp);
rjson::add(status, "TableDescription", std::move(request));
co_return rjson::print(std::move(status));
}
@@ -1972,11 +1870,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
// `invoke_on` hopped us to shard 0, but `this` points to `executor` is from 'old' shard, we need to hop it too.
co_return co_await e.local().create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), enforce_authorization, warn_authorization, std::move(tablets_mode));
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
});
}
@@ -2127,12 +2024,9 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_return api_error::validation(fmt::format(
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
}
try {
locator::assert_rf_rack_valid_keyspace(keyspace_name, p.local().local_db().get_token_metadata_ptr(),
p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy());
} catch (const std::invalid_argument& ex) {
co_return api_error::validation(fmt::format("GlobalSecondaryIndexes on a table "
"using tablets require the number of racks in the cluster to be either 1 or 3"));
if (p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy().uses_tablets() &&
!p.local().data_dictionary().get_config().rf_rack_valid_keyspaces()) {
co_return api_error::validation("GlobalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
}
elogger.trace("Adding GSI {}", index_name);
@@ -2436,7 +2330,7 @@ std::unordered_map<bytes, std::string> si_key_attributes(data_dictionary::table
// case, this function simply won't be called for this attribute.)
//
// This function checks if the given attribute update is an update to some
// GSI's key, and if the value is unsuitable, an api_error::validation is
// GSI's key, and if the value is unsuitable, a api_error::validation is
// thrown. The checking here is similar to the checking done in
// get_key_from_typed_value() for the base table's key columns.
//
@@ -2838,12 +2732,14 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
}
} else if (_write_isolation != write_isolation::LWT_ALWAYS) {
std::optional<mutation> m = apply(nullptr, api::new_timestamp(), cdc_opts);
throwing_assert(m); // !needs_read_before_write, so apply() did not check a condition
SCYLLA_ASSERT(m); // !needs_read_before_write, so apply() did not check a condition
return proxy.mutate(utils::chunked_vector<mutation>{std::move(*m)}, db::consistency_level::LOCAL_QUORUM, executor::default_timeout(), trace_state, std::move(permit), db::allow_per_partition_rate_limit::yes, false, std::move(cdc_opts)).then([this, &wcu_total] () mutable {
return rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total);
});
}
throwing_assert(cas_shard);
if (!cas_shard) {
on_internal_error(elogger, "cas_shard is not set");
}
// If we're still here, we need to do this write using LWT:
global_stats.write_using_lwt++;
per_table_stats.write_using_lwt++;
@@ -3463,11 +3359,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
if (should_add_wcu) {
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
}
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_write_item_latency.mark(duration);
for (const auto& w : per_table_wcu) {
w.first->api_operations.batch_write_item_latency.mark(duration);
}
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
co_return rjson::print(std::move(ret));
}
@@ -3551,7 +3443,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
return true;
}
// Add a path to an attribute_path_map. Throws a validation error if the path
// Add a path to a attribute_path_map. Throws a validation error if the path
// "overlaps" with one already in the filter (one is a sub-path of the other)
// or "conflicts" with it (both a member and index is requested).
template<typename T>
@@ -4978,12 +4870,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
if (!some_succeeded && eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_get_item_latency.mark(duration);
for (const table_requests& rs : requests) {
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
}
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
if (is_big(response)) {
co_return make_streamed(std::move(response));
} else {
@@ -5421,7 +5308,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
}
static dht::token token_for_segment(int segment, int total_segments) {
throwing_assert(total_segments > 1 && segment >= 0 && segment < total_segments);
SCYLLA_ASSERT(total_segments > 1 && segment >= 0 && segment < total_segments);
uint64_t delta = std::numeric_limits<uint64_t>::max() / total_segments;
return dht::token::from_int64(std::numeric_limits<int64_t>::min() + delta * segment);
}
@@ -6009,11 +5896,6 @@ future<executor::request_return_type> executor::list_tables(client_state& client
_stats.api_operations.list_tables++;
elogger.trace("Listing tables {}", request);
co_await utils::get_local_injector().inject("alternator_list_tables", [] (auto& handler) -> future<> {
handler.set("waiting", true);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
});
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
@@ -6205,10 +6087,9 @@ future<> executor::start() {
}
future<> executor::stop() {
co_await _describe_table_info_manager->stop();
// disconnect from the value source, but keep the value unchanged.
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
co_await _parsed_expression_cache->stop();
return _parsed_expression_cache->stop();
}
} // namespace alternator

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -17,13 +17,11 @@
#include "service/client_state.hh"
#include "service_permit.hh"
#include "db/timeout_clock.hh"
#include "db/config.hh"
#include "alternator/error.hh"
#include "stats.hh"
#include "utils/rjson.hh"
#include "utils/updateable_value.hh"
#include "utils/simple_value_with_expiry.hh"
#include "tracing/trace_state.hh"
@@ -43,7 +41,6 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
class storage_service;
}
namespace cdc {
@@ -60,7 +57,6 @@ class schema_builder;
namespace alternator {
enum class table_status;
class rmw_operation;
class put_or_delete_item;
@@ -140,7 +136,6 @@ class expression_cache;
class executor : public peering_sharded_service<executor> {
gms::gossiper& _gossiper;
service::storage_service& _ss;
service::storage_proxy& _proxy;
service::migration_manager& _mm;
db::system_distributed_keyspace& _sdks;
@@ -153,11 +148,6 @@ class executor : public peering_sharded_service<executor> {
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
struct describe_table_info_manager;
std::unique_ptr<describe_table_info_manager> _describe_table_info_manager;
future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl);
future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting);
public:
using client_state = service::client_state;
// request_return_type is the return type of the executor methods, which
@@ -183,7 +173,6 @@ public:
executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
@@ -231,8 +220,6 @@ private:
friend class rmw_operation;
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "expressions.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
/*

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -50,7 +50,7 @@ public:
_operators.emplace_back(i);
check_depth_limit();
}
void add_dot(std::string name) {
void add_dot(std::string(name)) {
_operators.emplace_back(std::move(name));
check_depth_limit();
}
@@ -85,7 +85,7 @@ struct constant {
}
};
// "value" is a value used in the right hand side of an assignment
// "value" is is a value used in the right hand side of an assignment
// expression, "SET a = ...". It can be a constant (a reference to a value
// included in the request, e.g., ":val"), a path to an attribute from the
// existing item (e.g., "a.b[3].c"), or a function of other such values.
@@ -205,7 +205,7 @@ public:
// The supported primitive conditions are:
// 1. Binary operators - v1 OP v2, where OP is =, <>, <, <=, >, or >= and
// v1 and v2 are values - from the item (an attribute path), the query
// (a ":val" reference), or a function of the above (only the size()
// (a ":val" reference), or a function of the the above (only the size()
// function is supported).
// 2. Ternary operator - v1 BETWEEN v2 and v3 (means v1 >= v2 AND v1 <= v3).
// 3. N-ary operator - v1 IN ( v2, v3, ... )

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -1,301 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include "alternator/http_compression.hh"
#include "alternator/server.hh"
#include <seastar/coroutine/maybe_yield.hh>
#include <zlib.h>
static logging::logger slogger("alternator-http-compression");
namespace alternator {
static constexpr size_t compressed_buffer_size = 1024;
class zlib_compressor {
z_stream _zs;
temporary_buffer<char> _output_buf;
noncopyable_function<future<>(temporary_buffer<char>&&)> _write_func;
public:
zlib_compressor(bool gzip, int compression_level, noncopyable_function<future<>(temporary_buffer<char>&&)> write_func)
: _write_func(std::move(write_func)) {
memset(&_zs, 0, sizeof(_zs));
if (deflateInit2(&_zs, std::clamp(compression_level, Z_NO_COMPRESSION, Z_BEST_COMPRESSION), Z_DEFLATED,
(gzip ? 16 : 0) + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
}
~zlib_compressor() {
deflateEnd(&_zs);
}
future<> close() {
return compress(nullptr, 0, true);
}
future<> compress(const char* buf, size_t len, bool is_last_chunk = false) {
_zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(buf));
_zs.avail_in = (uInt) len;
int mode = is_last_chunk ? Z_FINISH : Z_NO_FLUSH;
while(_zs.avail_in > 0 || is_last_chunk) {
co_await coroutine::maybe_yield();
if (_output_buf.empty()) {
if (is_last_chunk) {
uint32_t max_buffer_size = 0;
deflatePending(&_zs, &max_buffer_size, nullptr);
max_buffer_size += deflateBound(&_zs, _zs.avail_in) + 1;
_output_buf = temporary_buffer<char>(std::min(compressed_buffer_size, (size_t) max_buffer_size));
} else {
_output_buf = temporary_buffer<char>(compressed_buffer_size);
}
_zs.next_out = reinterpret_cast<unsigned char*>(_output_buf.get_write());
_zs.avail_out = compressed_buffer_size;
}
int e = deflate(&_zs, mode);
if (e < Z_OK) {
throw api_error::internal("Error during compression of response body");
}
if (e == Z_STREAM_END || _zs.avail_out < compressed_buffer_size / 4) {
_output_buf.trim(compressed_buffer_size - _zs.avail_out);
co_await _write_func(std::move(_output_buf));
if (e == Z_STREAM_END) {
break;
}
}
}
}
};
// Helper string_view functions for parsing Accept-Encoding header
struct case_insensitive_cmp_sv {
bool operator()(std::string_view s1, std::string_view s2) const {
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(),
[](char a, char b) { return ::tolower(a) == ::tolower(b); });
}
};
static inline std::string_view trim_left(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.front())))
sv.remove_prefix(1);
return sv;
}
static inline std::string_view trim_right(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.back())))
sv.remove_suffix(1);
return sv;
}
static inline std::string_view trim(std::string_view sv) {
return trim_left(trim_right(sv));
}
inline std::vector<std::string_view> split(std::string_view text, char separator) {
std::vector<std::string_view> tokens;
if (text == "") {
return tokens;
}
while (true) {
auto pos = text.find_first_of(separator);
if (pos != std::string_view::npos) {
tokens.emplace_back(text.data(), pos);
text.remove_prefix(pos + 1);
} else {
tokens.emplace_back(text);
break;
}
}
return tokens;
}
constexpr response_compressor::compression_type response_compressor::get_compression_type(std::string_view encoding) {
for (size_t i = 0; i < static_cast<size_t>(compression_type::count); ++i) {
if (case_insensitive_cmp_sv{}(encoding, compression_names[i])) {
return static_cast<compression_type>(i);
}
}
return compression_type::unknown;
}
response_compressor::compression_type response_compressor::find_compression(std::string_view accept_encoding, size_t response_size) {
std::optional<float> ct_q[static_cast<size_t>(compression_type::count)];
ct_q[static_cast<size_t>(compression_type::none)] = std::numeric_limits<float>::min(); // enabled, but lowest priority
compression_type selected_ct = compression_type::none;
std::vector<std::string_view> entries = split(accept_encoding, ',');
for (auto& e : entries) {
std::vector<std::string_view> params = split(e, ';');
if (params.size() == 0) {
continue;
}
compression_type ct = get_compression_type(trim(params[0]));
if (ct == compression_type::unknown) {
continue; // ignore unknown encoding types
}
if (ct_q[static_cast<size_t>(ct)].has_value() && ct_q[static_cast<size_t>(ct)] != 0.0f) {
continue; // already processed this encoding
}
if (response_size < _threshold[static_cast<size_t>(ct)]) {
continue; // below threshold treat as unknown
}
for (size_t i = 1; i < params.size(); ++i) { // find "q=" parameter
auto pos = params[i].find("q=");
if (pos == std::string_view::npos) {
continue;
}
std::string_view param = params[i].substr(pos + 2);
param = trim(param);
// parse quality value
float q_value = 1.0f;
auto [ptr, ec] = std::from_chars(param.data(), param.data() + param.size(), q_value);
if (ec != std::errc() || ptr != param.data() + param.size()) {
continue;
}
if (q_value < 0.0) {
q_value = 0.0;
} else if (q_value > 1.0) {
q_value = 1.0;
}
ct_q[static_cast<size_t>(ct)] = q_value;
break; // we parsed quality value
}
if (!ct_q[static_cast<size_t>(ct)].has_value()) {
ct_q[static_cast<size_t>(ct)] = 1.0f; // default quality value
}
// keep the highest encoding (in the order, unless 'any')
if (selected_ct == compression_type::any) {
if (ct_q[static_cast<size_t>(ct)] >= ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
} else {
if (ct_q[static_cast<size_t>(ct)] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
}
}
if (selected_ct == compression_type::any) {
// select any not mentioned or highest quality
selected_ct = compression_type::none;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (!ct_q[i].has_value()) {
return static_cast<compression_type>(i);
}
if (ct_q[i] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = static_cast<compression_type>(i);
}
}
}
return selected_ct;
}
static future<chunked_content> compress(response_compressor::compression_type ct, const db::config& cfg, std::string str) {
chunked_content compressed;
auto write = [&compressed](temporary_buffer<char>&& buf) -> future<> {
compressed.push_back(std::move(buf));
return make_ready_future<>();
};
zlib_compressor compressor(ct != response_compressor::compression_type::deflate,
cfg.alternator_response_gzip_compression_level(), std::move(write));
co_await compressor.compress(str.data(), str.size(), true);
co_return compressed;
}
static sstring flatten(chunked_content&& cc) {
size_t total_size = 0;
for (const auto& chunk : cc) {
total_size += chunk.size();
}
sstring result = sstring{ sstring::initialized_later{}, total_size };
size_t offset = 0;
for (const auto& chunk : cc) {
std::copy(chunk.begin(), chunk.end(), result.begin() + offset);
offset += chunk.size();
}
return result;
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, std::string&& response_body) {
response_compressor::compression_type ct = find_compression(accept_encoding, response_body.size());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->set_content_type(content_type);
return compress(ct, cfg, std::move(response_body)).then([rep = std::move(rep)] (chunked_content compressed) mutable {
rep->_content = flatten(std::move(compressed));
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
});
} else {
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(response_body);
rep->set_content_type(content_type);
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
template<typename Compressor>
class compressed_data_sink_impl : public data_sink_impl {
output_stream<char> _out;
Compressor _compressor;
public:
template<typename... Args>
compressed_data_sink_impl(output_stream<char>&& out, Args&&... args)
: _out(std::move(out)), _compressor(std::forward<Args>(args)..., [this](temporary_buffer<char>&& buf) {
return _out.write(std::move(buf));
}) { }
future<> put(std::span<temporary_buffer<char>> data) override {
return data_sink_impl::fallback_put(data, [this] (temporary_buffer<char>&& buf) {
return do_put(std::move(buf));
});
}
private:
future<> do_put(temporary_buffer<char> buf) {
co_return co_await _compressor.compress(buf.get(), buf.size());
}
future<> close() override {
return _compressor.close().then([this] {
return _out.close();
});
}
};
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
output_stream_options opts;
opts.trim_to_size = true;
std::unique_ptr<data_sink_impl> data_sink_impl;
switch (ct) {
case response_compressor::compression_type::gzip:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), true, level);
break;
case response_compressor::compression_type::deflate:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), false, level);
break;
case response_compressor::compression_type::none:
case response_compressor::compression_type::any:
case response_compressor::compression_type::unknown:
on_internal_error(slogger,"Compression not selected");
default:
on_internal_error(slogger, "Unsupported compression type for data sink");
}
return bw(output_stream<char>(data_sink(std::move(data_sink_impl)), compressed_buffer_size, opts));
};
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->write_body(content_type, compress(ct, cfg, std::move(body_writer)));
} else {
rep->write_body(content_type, std::move(body_writer));
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
} // namespace alternator

View File

@@ -1,91 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include "alternator/executor.hh"
#include <seastar/http/httpd.hh>
#include "db/config.hh"
namespace alternator {
class response_compressor {
public:
enum class compression_type {
gzip,
deflate,
compressions_count,
any = compressions_count,
none,
count,
unknown = count
};
static constexpr std::string_view compression_names[] = {
"gzip",
"deflate",
"*",
"identity"
};
static sstring get_encoding_name(compression_type ct) {
return sstring(compression_names[static_cast<size_t>(ct)]);
}
static constexpr compression_type get_compression_type(std::string_view encoding);
sstring get_accepted_encoding(const http::request& req) {
if (get_threshold() == 0) {
return "";
}
return req.get_header("Accept-Encoding");
}
compression_type find_compression(std::string_view accept_encoding, size_t response_size);
response_compressor(const db::config& cfg)
: cfg(cfg)
,_gzip_level_observer(
cfg.alternator_response_gzip_compression_level.observe([this](int v) {
update_threshold();
}))
,_gzip_threshold_observer(
cfg.alternator_response_compression_threshold_in_bytes.observe([this](uint32_t v) {
update_threshold();
}))
{
update_threshold();
}
response_compressor(const response_compressor& rhs) : response_compressor(rhs.cfg) {}
private:
const db::config& cfg;
utils::observable<int>::observer _gzip_level_observer;
utils::observable<uint32_t>::observer _gzip_threshold_observer;
uint32_t _threshold[static_cast<size_t>(compression_type::count)];
size_t get_threshold() { return _threshold[static_cast<size_t>(compression_type::any)]; }
void update_threshold() {
_threshold[static_cast<size_t>(compression_type::none)] = std::numeric_limits<uint32_t>::max();
_threshold[static_cast<size_t>(compression_type::any)] = std::numeric_limits<uint32_t>::max();
uint32_t gzip = cfg.alternator_response_gzip_compression_level() <= 0 ? std::numeric_limits<uint32_t>::max()
: cfg.alternator_response_compression_threshold_in_bytes();
_threshold[static_cast<size_t>(compression_type::gzip)] = gzip;
_threshold[static_cast<size_t>(compression_type::deflate)] = gzip;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (_threshold[i] < _threshold[static_cast<size_t>(compression_type::any)]) {
_threshold[static_cast<size_t>(compression_type::any)] = _threshold[i];
}
}
}
public:
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, std::string&& response_body);
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
};
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "expressions.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/base64.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -55,7 +55,7 @@ partition_key pk_from_json(const rjson::value& item, schema_ptr schema);
clustering_key ck_from_json(const rjson::value& item, schema_ptr schema);
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema);
// If v encodes a number (i.e., it is a {"N": [...]}), returns an object representing it. Otherwise,
// If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise,
// raises ValidationException with diagnostic.
big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic);

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "alternator/server.hh"
@@ -34,7 +34,6 @@
#include "client_data.hh"
#include "utils/updateable_value.hh"
#include <zlib.h>
#include "alternator/http_compression.hh"
static logging::logger slogger("alternator-server");
@@ -112,12 +111,9 @@ class api_handler : public handler_base {
// type applies to all replies, both success and error.
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
public:
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle,
const db::config& config) : _response_compressor(config), _f_handle(
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
sstring accept_encoding = _response_compressor.get_accepted_encoding(*req);
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped(
[this, rep = std::move(rep), accept_encoding=std::move(accept_encoding)](future<executor::request_return_type> resf) mutable {
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
if (resf.failed()) {
// Exceptions of type api_error are wrapped as JSON and
// returned to the client as expected. Other types of
@@ -137,20 +133,22 @@ public:
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
auto res = resf.get();
return std::visit(overloaded_functor {
std::visit(overloaded_functor {
[&] (std::string&& str) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(str));
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(str);
rep->set_content_type(REPLY_CONTENT_TYPE);
},
[&] (executor::body_writer&& body_writer) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(body_writer));
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
},
[&] (const api_error& err) {
generate_error_reply(*rep, err);
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
}, std::move(res));
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}) { }
@@ -179,7 +177,6 @@ protected:
slogger.trace("api_handler error case: {}", rep._content);
}
response_compressor _response_compressor;
future_handler_function _f_handle;
};
@@ -374,45 +371,18 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
for (const auto& header : signed_headers) {
signed_headers_map.emplace(header, std::string_view());
}
std::vector<std::string> modified_values;
for (auto& header : req._headers) {
std::string header_str;
header_str.resize(header.first.size());
std::transform(header.first.begin(), header.first.end(), header_str.begin(), ::tolower);
auto it = signed_headers_map.find(header_str);
if (it != signed_headers_map.end()) {
// replace multiple spaces in the header value header.second with
// a single space, as required by AWS SigV4 header canonization.
// If we modify the value, we need to save it in modified_values
// to keep it alive.
std::string value;
value.reserve(header.second.size());
bool prev_space = false;
bool modified = false;
for (char ch : header.second) {
if (ch == ' ') {
if (!prev_space) {
value += ch;
prev_space = true;
} else {
modified = true; // skip a space
}
} else {
value += ch;
prev_space = false;
}
}
if (modified) {
modified_values.emplace_back(std::move(value));
it->second = std::string_view(modified_values.back());
} else {
it->second = std::string_view(header.second);
}
it->second = std::string_view(header.second);
}
}
auto cache_getter = [&proxy = _proxy] (std::string username) {
return get_key_from_roles(proxy, std::move(username));
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
return get_key_from_roles(proxy, as, std::move(username));
};
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
user = std::move(user),
@@ -420,7 +390,6 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
datestamp = std::move(datestamp),
signed_headers_str = std::move(signed_headers_str),
signed_headers_map = std::move(signed_headers_map),
modified_values = std::move(modified_values),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
@@ -591,11 +560,11 @@ read_entire_stream(input_stream<char>& inp, size_t length_limit) {
class safe_gzip_zstream {
z_stream _zs;
public:
// If gzip is true, decode a gzip header (for "Content-Encoding: gzip").
// Otherwise, a zlib header (for "Content-Encoding: deflate").
safe_gzip_zstream(bool gzip = true) {
safe_gzip_zstream() {
memset(&_zs, 0, sizeof(_zs));
if (inflateInit2(&_zs, gzip ? 16 + MAX_WBITS : MAX_WBITS) != Z_OK) {
// The strange 16 + WMAX_BITS tells zlib to expect and decode
// a gzip header, not a zlib header.
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
@@ -614,21 +583,19 @@ public:
}
};
// ungzip() takes a chunked_content of a compressed request body, and returns
// the uncompressed content as a chunked_content. If gzip is true, we expect
// gzip header (for "Content-Encoding: gzip"), if gzip is false, we expect a
// zlib header (for "Content-Encoding: deflate").
// ungzip() takes a chunked_content with a gzip-compressed request body,
// uncompresses it, and returns the uncompressed content as a chunked_content.
// If the uncompressed content exceeds length_limit, an error is thrown.
static future<chunked_content>
ungzip(chunked_content&& compressed_body, size_t length_limit, bool gzip = true) {
ungzip(chunked_content&& compressed_body, size_t length_limit) {
chunked_content ret;
// output_buf can be any size - when uncompressing input_buf, it doesn't
// need to fit in a single output_buf, we'll use multiple output_buf for
// a single input_buf if needed.
constexpr size_t OUTPUT_BUF_SIZE = 4096;
temporary_buffer<char> output_buf;
safe_gzip_zstream strm(gzip);
bool complete_stream = false; // empty input is not a valid gzip/deflate
safe_gzip_zstream strm;
bool complete_stream = false; // empty input is not a valid gzip
size_t total_out_bytes = 0;
for (const temporary_buffer<char>& input_buf : compressed_body) {
if (input_buf.empty()) {
@@ -699,17 +666,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// for such a size.
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
}
// Check the concurrency limit early, before acquiring memory and
// reading the request body, to avoid piling up memory from excess
// requests that will be rejected anyway. This mirrors the CQL
// transport which also checks concurrency before memory acquisition
// (transport/server.cc).
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
// JSON parsing can allocate up to roughly 2x the size of the raw
// document, + a couple of bytes for maintenance.
// If the Content-Length of the request is not available, we assume
@@ -721,7 +677,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
++_executor._stats.requests_blocked_memory;
}
auto units = co_await std::move(units_fut);
throwing_assert(req->content_stream);
SCYLLA_ASSERT(req->content_stream);
chunked_content content = co_await read_entire_stream(*req->content_stream, request_content_length_limit);
// If the request had no Content-Length, we reserved too many units
// so need to return some
@@ -742,8 +698,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
sstring content_encoding = req->get_header("Content-Encoding");
if (content_encoding == "gzip") {
content = co_await ungzip(std::move(content), request_content_length_limit);
} else if (content_encoding == "deflate") {
content = co_await ungzip(std::move(content), request_content_length_limit, false);
} else if (!content_encoding.empty()) {
// DynamoDB returns a 500 error for unsupported Content-Encoding.
// I'm not sure if this is the best error code, but let's do it too.
@@ -754,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto system_clients_entry = _ongoing_requests.emplace(
req->get_client_address(), std::move(user_agent_header),
req->get_client_address(), req->get_header("User-Agent"),
username, current_scheduling_group(),
req->get_protocol_name() == "https");
@@ -771,12 +721,18 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
_executor._stats.unsupported_operations++;
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
}
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
executor::client_state client_state(service::client_state::external_tag(),
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
if (!username.empty()) {
client_state.set_login(auth::authenticated_user(username));
}
client_state.maybe_update_per_service_level_params();
co_await client_state.maybe_update_per_service_level_params();
tracing::trace_state_ptr trace_state = maybe_trace_query(client_state, username, op, content, _max_users_query_size_in_trace_output.get());
tracing::trace(trace_state, "{}", op);
@@ -798,7 +754,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
void server::set_routes(routes& r) {
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
return handle_api_request(std::move(req));
}, _proxy.data_dictionary().get_config());
});
r.put(operation_type::POST, "/", req_handler);
r.put(operation_type::GET, "/", new health_handler(_pending_requests));
@@ -909,9 +865,7 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
} {
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
std::optional<tls::credentials_builder> creds,
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
@@ -919,28 +873,20 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
_warn_authorization = std::move(warn_authorization);
_max_concurrent_requests = std::move(max_concurrent_requests);
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
if (!port && !https_port && !port_proxy_protocol && !https_port_proxy_protocol) {
if (!port && !https_port) {
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
" must be specified in order to init an alternator HTTP server instance"));
}
return seastar::async([this, addr, port, https_port, port_proxy_protocol, https_port_proxy_protocol, creds] {
return seastar::async([this, addr, port, https_port, creds] {
_executor.start().get();
if (port || port_proxy_protocol) {
if (port) {
set_routes(_http_server._routes);
_http_server.set_content_streaming(true);
if (port) {
_http_server.listen(socket_address{addr, *port}).get();
}
if (port_proxy_protocol) {
listen_options lo;
lo.reuse_address = true;
lo.proxy_protocol = true;
_http_server.listen(socket_address{addr, *port_proxy_protocol}, lo).get();
}
_http_server.listen(socket_address{addr, *port}).get();
_enabled_servers.push_back(std::ref(_http_server));
}
if (https_port || https_port_proxy_protocol) {
if (https_port) {
set_routes(_https_server._routes);
_https_server.set_content_streaming(true);
@@ -960,15 +906,7 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
} else {
_credentials = creds->build_server_credentials();
}
if (https_port) {
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
}
if (https_port_proxy_protocol) {
listen_options lo;
lo.reuse_address = true;
lo.proxy_protocol = true;
_https_server.listen(socket_address{addr, *https_port_proxy_protocol}, lo, _credentials).get();
}
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
_enabled_servers.push_back(std::ref(_https_server));
}
});
@@ -1041,15 +979,16 @@ client_data server::ongoing_request::make_client_data() const {
// and keep "driver_version" unset.
cd.driver_name = _user_agent;
// Leave "protocol_version" unset, it has no meaning in Alternator.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
// As reported in issue #9216, we never set these fields in CQL
// either (see cql_server::connection::make_client_data()).
return cd;
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
future<utils::chunked_vector<client_data>> server::get_client_data() {
utils::chunked_vector<client_data> ret;
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
ret.emplace_back(r.make_client_data());
});
co_return ret;
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
// though it isn't really relevant for Alternator which defines its own
// timeouts separately. We can create this object only once.
updateable_timeout_config _timeout_config;
client_options_cache_type _connection_options_keys_and_values;
alternator_callbacks_map _callbacks;
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
// is called when reading the "system.clients" virtual table.
struct ongoing_request {
socket_address _client_address;
client_options_cache_entry_type _user_agent;
sstring _user_agent;
sstring _username;
scheduling_group _scheduling_group;
bool _is_https;
@@ -100,9 +99,7 @@ class server : public peering_sharded_service<server> {
public:
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
std::optional<tls::credentials_builder> creds,
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
@@ -110,7 +107,7 @@ public:
// table "system.clients" is read. It is expected to generate a list of
// clients connected to this server (on this shard). This function is
// called by alternator::controller::get_client_data().
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
future<utils::chunked_vector<client_data>> get_client_data();
private:
void set_routes(seastar::httpd::routes& r);
// If verification succeeds, returns the authenticated user's username

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "stats.hh"
@@ -14,6 +14,20 @@
namespace alternator {
const char* ALTERNATOR_METRICS = "alternator";
static seastar::metrics::histogram estimated_histogram_to_metrics(const utils::estimated_histogram& histogram) {
seastar::metrics::histogram res;
res.buckets.resize(histogram.bucket_offsets.size());
uint64_t cumulative_count = 0;
res.sample_count = histogram._count;
res.sample_sum = histogram._sample_sum;
for (size_t i = 0; i < res.buckets.size(); i++) {
auto& v = res.buckets[i];
v.upper_bound = histogram.bucket_offsets[i];
cumulative_count += histogram.buckets[i];
v.count = cumulative_count;
}
return res;
}
static seastar::metrics::label column_family_label("cf");
static seastar::metrics::label keyspace_label("ks");
@@ -137,21 +151,21 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"), labels,
stats.api_operations.batch_get_item_batch_total)(op("BatchGetItem")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_histogram("batch_item_count_histogram", seastar::metrics::description("Histogram of the number of items in a batch request"), labels,
[&stats]{ return to_metrics_histogram(stats.api_operations.batch_get_item_histogram);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.api_operations.batch_get_item_histogram);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("batch_item_count_histogram", seastar::metrics::description("Histogram of the number of items in a batch request"), labels,
[&stats]{ return to_metrics_histogram(stats.api_operations.batch_write_item_histogram);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.api_operations.batch_write_item_histogram);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return to_metrics_histogram(stats.operation_sizes.get_item_op_size_kb);})(op("GetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.get_item_op_size_kb);})(op("GetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return to_metrics_histogram(stats.operation_sizes.put_item_op_size_kb);})(op("PutItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.put_item_op_size_kb);})(op("PutItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return to_metrics_histogram(stats.operation_sizes.delete_item_op_size_kb);})(op("DeleteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.delete_item_op_size_kb);})(op("DeleteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return to_metrics_histogram(stats.operation_sizes.update_item_op_size_kb);})(op("UpdateItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.update_item_op_size_kb);})(op("UpdateItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return to_metrics_histogram(stats.operation_sizes.batch_get_item_op_size_kb);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.batch_get_item_op_size_kb);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return to_metrics_histogram(stats.operation_sizes.batch_write_item_op_size_kb);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.batch_write_item_op_size_kb);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
});
seastar::metrics::label expression_label("expression");

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -16,8 +16,6 @@
#include "cql3/stats.hh"
namespace alternator {
using batch_histogram = utils::estimated_histogram_with_max<128>;
using op_size_histogram = utils::estimated_histogram_with_max<512>;
// Object holding per-shard statistics related to Alternator.
// While this object is alive, these metrics are also registered to be
@@ -78,34 +76,34 @@ public:
utils::timed_rate_moving_average_summary_and_histogram batch_get_item_latency;
utils::timed_rate_moving_average_summary_and_histogram get_records_latency;
batch_histogram batch_get_item_histogram;
batch_histogram batch_write_item_histogram;
utils::estimated_histogram batch_get_item_histogram{22}; // a histogram that covers the range 1 - 100
utils::estimated_histogram batch_write_item_histogram{22}; // a histogram that covers the range 1 - 100
} api_operations;
// Operation size metrics
struct {
// Item size statistics collected per table and aggregated per node.
// Each histogram covers the range 0 - 512. Resolves #25143.
// Each histogram covers the range 0 - 446. Resolves #25143.
// A size is the retrieved item's size.
op_size_histogram get_item_op_size_kb;
utils::estimated_histogram get_item_op_size_kb{30};
// A size is the maximum of the new item's size and the old item's size.
op_size_histogram put_item_op_size_kb;
utils::estimated_histogram put_item_op_size_kb{30};
// A size is the deleted item's size. If the deleted item's size is
// unknown (i.e. read-before-write wasn't necessary and it wasn't
// forced by a configuration option), it won't be recorded on the
// histogram.
op_size_histogram delete_item_op_size_kb;
utils::estimated_histogram delete_item_op_size_kb{30};
// A size is the maximum of existing item's size and the estimated size
// of the update. This will be changed to the maximum of the existing item's
// size and the new item's size in a subsequent PR.
op_size_histogram update_item_op_size_kb;
utils::estimated_histogram update_item_op_size_kb{30};
// A size is the sum of the sizes of all items per table. This means
// that a single BatchGetItem / BatchWriteItem updates the histogram
// for each table that it has items in.
// The sizes are the retrieved items' sizes grouped per table.
op_size_histogram batch_get_item_op_size_kb;
utils::estimated_histogram batch_get_item_op_size_kb{30};
// The sizes are the the written items' sizes grouped per table.
op_size_histogram batch_write_item_op_size_kb;
utils::estimated_histogram batch_write_item_op_size_kb{30};
} operation_sizes;
// Count of authentication and authorization failures, counted if either
// alternator_enforce_authorization or alternator_warn_authorization are
@@ -142,7 +140,7 @@ public:
cql3::cql_stats cql_stats;
// Enumeration of expression types only for stats
// if needed it can be extended e.g. per operation
// if needed it can be extended e.g. per operation
enum expression_types {
UPDATE_EXPRESSION,
CONDITION_EXPRESSION,
@@ -166,7 +164,7 @@ struct table_stats {
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats);
inline uint64_t bytes_to_kb_ceil(uint64_t bytes) {
return (bytes) / 1024;
return (bytes + 1023) / 1024;
}
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <type_traits>
@@ -33,8 +33,6 @@
#include "data_dictionary/data_dictionary.hh"
#include "utils/rjson.hh"
static logging::logger slogger("alternator-streams");
/**
* Base template type to implement rapidjson::internal::TypeHelper<...>:s
* for types that are ostreamable/string constructible/castable.
@@ -430,25 +428,6 @@ using namespace std::chrono_literals;
// Dynamo docs says no data shall live longer than 24h.
static constexpr auto dynamodb_streams_max_window = 24h;
// find the parent shard in previous generation for the given child shard
// takes care of wrap-around case in vnodes
// prev_streams must be sorted by token
const cdc::stream_id& find_parent_shard_in_previous_generation(db_clock::time_point prev_timestamp, const utils::chunked_vector<cdc::stream_id> &prev_streams, const cdc::stream_id &child) {
if (prev_streams.empty()) {
// something is really wrong - streams are empty
// let's try internal_error in hope it will be notified and fixed
on_internal_error(slogger, fmt::format("streams are empty for cdc generation at {} ({})", prev_timestamp, prev_timestamp.time_since_epoch().count()));
}
auto it = std::lower_bound(prev_streams.begin(), prev_streams.end(), child.token(), [](const cdc::stream_id& id, const dht::token& t) {
return id.token() < t;
});
if (it == prev_streams.end()) {
// wrap around case - take first
it = prev_streams.begin();
}
return *it;
}
future<executor::request_return_type> executor::describe_stream(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.describe_stream++;
@@ -512,7 +491,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
if (!opts.enabled()) {
rjson::add(ret, "StreamDescription", std::move(stream_desc));
co_return rjson::print(std::move(ret));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
}
// TODO: label
@@ -523,113 +502,123 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
// filter out cdc generations older than the table or now() - cdc::ttl (typically dynamodb_streams_max_window - 24h)
auto low_ts = std::max(as_timepoint(schema->id()), db_clock::now() - ttl);
std::map<db_clock::time_point, cdc::streams_version> topologies = co_await _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners });
auto e = topologies.end();
auto prev = e;
auto shards = rjson::empty_array();
return _sdks.cdc_get_versioned_streams(low_ts, { normal_token_owners }).then([db, shard_start, limit, ret = std::move(ret), stream_desc = std::move(stream_desc)] (std::map<db_clock::time_point, cdc::streams_version> topologies) mutable {
std::optional<shard_id> last;
auto e = topologies.end();
auto prev = e;
auto shards = rjson::empty_array();
auto i = topologies.begin();
// if we're a paged query, skip to the generation where we left of.
if (shard_start) {
i = topologies.find(shard_start->time);
}
std::optional<shard_id> last;
// for parent-child stuff we need id:s to be sorted by token
// (see explanation above) since we want to find closest
// token boundary when determining parent.
// #7346 - we processed and searched children/parents in
// stored order, which is not necessarily token order,
// so the finding of "closest" token boundary (using upper bound)
// could give somewhat weird results.
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return id1.token() < id2.token();
};
auto i = topologies.begin();
// if we're a paged query, skip to the generation where we left of.
if (shard_start) {
i = topologies.find(shard_start->time);
}
// #7409 - shards must be returned in lexicographical order,
// normal bytes compare is string_traits<int8_t>::compare.
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
// compare instead we inadvertently will sort in string lexical.
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
};
// need a prev even if we are skipping stuff
if (i != topologies.begin()) {
prev = std::prev(i);
}
for (; limit > 0 && i != e; prev = i, ++i) {
auto& [ts, sv] = *i;
last = std::nullopt;
auto lo = sv.streams.begin();
auto end = sv.streams.end();
// for parent-child stuff we need id:s to be sorted by token
// (see explanation above) since we want to find closest
// token boundary when determining parent.
// #7346 - we processed and searched children/parents in
// stored order, which is not necessarily token order,
// so the finding of "closest" token boundary (using upper bound)
// could give somewhat weird results.
static auto token_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return id1.token() < id2.token();
};
// #7409 - shards must be returned in lexicographical order,
std::sort(lo, end, id_cmp);
// normal bytes compare is string_traits<int8_t>::compare.
// thus bytes 0x8000 is less than 0x0000. By doing unsigned
// compare instead we inadvertently will sort in string lexical.
static auto id_cmp = [](const cdc::stream_id& id1, const cdc::stream_id& id2) {
return compare_unsigned(id1.to_bytes(), id2.to_bytes()) < 0;
};
if (shard_start) {
// find next shard position
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
shard_start = std::nullopt;
// need a prev even if we are skipping stuff
if (i != topologies.begin()) {
prev = std::prev(i);
}
if (lo != end && prev != e) {
// We want older stuff sorted in token order so we can find matching
// token range when determining parent shard.
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
}
auto expired = [&]() -> std::optional<db_clock::time_point> {
auto j = std::next(i);
if (j == e) {
return std::nullopt;
}
// add this so we sort of match potential
// sequence numbers in get_records result.
return j->first + confidence_interval(db);
}();
while (lo != end) {
auto& id = *lo++;
auto shard = rjson::empty_object();
if (prev != e) {
auto &pid = find_parent_shard_in_previous_generation(prev->first, prev->second.streams, id);
rjson::add(shard, "ParentShardId", shard_id(prev->first, pid));
}
last.emplace(ts, id);
rjson::add(shard, "ShardId", *last);
auto range = rjson::empty_object();
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
if (expired) {
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
}
rjson::add(shard, "SequenceNumberRange", std::move(range));
rjson::push_back(shards, std::move(shard));
if (--limit == 0) {
break;
}
for (; limit > 0 && i != e; prev = i, ++i) {
auto& [ts, sv] = *i;
last = std::nullopt;
auto lo = sv.streams.begin();
auto end = sv.streams.end();
// #7409 - shards must be returned in lexicographical order,
std::sort(lo, end, id_cmp);
if (shard_start) {
// find next shard position
lo = std::upper_bound(lo, end, shard_start->id, id_cmp);
shard_start = std::nullopt;
}
if (lo != end && prev != e) {
// We want older stuff sorted in token order so we can find matching
// token range when determining parent shard.
std::stable_sort(prev->second.streams.begin(), prev->second.streams.end(), token_cmp);
}
auto expired = [&]() -> std::optional<db_clock::time_point> {
auto j = std::next(i);
if (j == e) {
return std::nullopt;
}
// add this so we sort of match potential
// sequence numbers in get_records result.
return j->first + confidence_interval(db);
}();
while (lo != end) {
auto& id = *lo++;
auto shard = rjson::empty_object();
if (prev != e) {
auto& pids = prev->second.streams;
auto pid = std::upper_bound(pids.begin(), pids.end(), id.token(), [](const dht::token& t, const cdc::stream_id& id) {
return t < id.token();
});
if (pid != pids.begin()) {
pid = std::prev(pid);
}
if (pid != pids.end()) {
rjson::add(shard, "ParentShardId", shard_id(prev->first, *pid));
}
}
last.emplace(ts, id);
rjson::add(shard, "ShardId", *last);
auto range = rjson::empty_object();
rjson::add(range, "StartingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(ts.time_since_epoch())));
if (expired) {
rjson::add(range, "EndingSequenceNumber", sequence_number(utils::UUID_gen::min_time_UUID(expired->time_since_epoch())));
}
rjson::add(shard, "SequenceNumberRange", std::move(range));
rjson::push_back(shards, std::move(shard));
if (--limit == 0) {
break;
}
last = std::nullopt;
}
}
}
if (last) {
rjson::add(stream_desc, "LastEvaluatedShardId", *last);
}
if (last) {
rjson::add(stream_desc, "LastEvaluatedShardId", *last);
}
rjson::add(stream_desc, "Shards", std::move(shards));
rjson::add(ret, "StreamDescription", std::move(stream_desc));
co_return rjson::print(std::move(ret));
rjson::add(stream_desc, "Shards", std::move(shards));
rjson::add(ret, "StreamDescription", std::move(stream_desc));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
});
}
enum class shard_iterator_type {
@@ -787,18 +776,16 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
struct event_id {
cdc::stream_id stream;
utils::UUID timestamp;
size_t index = 0;
static constexpr auto marker = 'E';
event_id(cdc::stream_id s, utils::UUID ts, size_t index)
event_id(cdc::stream_id s, utils::UUID ts)
: stream(s)
, timestamp(ts)
, index(index)
{}
friend std::ostream& operator<<(std::ostream& os, const event_id& id) {
fmt::print(os, "{}{}:{}:{}", marker, id.stream.to_bytes(), id.timestamp, id.index);
fmt::print(os, "{}{}:{}", marker, id.stream.to_bytes(), id.timestamp);
return os;
}
};
@@ -810,19 +797,7 @@ struct rapidjson::internal::TypeHelper<ValueType, alternator::event_id>
{};
namespace alternator {
namespace {
struct managed_bytes_ptr_hash {
size_t operator()(const managed_bytes *k) const noexcept {
return std::hash<managed_bytes>{}(*k);
}
};
struct managed_bytes_ptr_equal {
bool operator()(const managed_bytes *a, const managed_bytes *b) const noexcept {
return *a == *b;
}
};
}
future<executor::request_return_type> executor::get_records(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
_stats.api_operations.get_records++;
auto start_time = std::chrono::steady_clock::now();
@@ -893,12 +868,6 @@ future<executor::request_return_type> executor::get_records(client_state& client
auto pks = schema->partition_key_columns();
auto cks = schema->clustering_key_columns();
auto base_cks = base->clustering_key_columns();
if (base_cks.size() > 1) {
throw api_error::internal(fmt::format("invalid alternator table, clustering key count ({}) is bigger than one", base_cks.size()));
}
const bytes *clustering_key_column_name = !base_cks.empty() ? &base_cks.front().name() : nullptr;
std::transform(pks.begin(), pks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
@@ -929,184 +898,172 @@ future<executor::request_return_type> executor::get_records(client_state& client
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
query::tombstone_limit(_proxy.get_tombstone_limit()), query::row_limit(limit * mul));
service::storage_proxy::coordinator_query_result qr = co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state));
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
co_return co_await _proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(default_timeout(), std::move(permit), client_state)).then(
[this, schema, partition_slice = std::move(partition_slice), selection = std::move(selection), start_time = std::move(start_time), limit, key_names = std::move(key_names), attr_names = std::move(attr_names), type, iter, high_ts] (service::storage_proxy::coordinator_query_result qr) mutable {
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build();
auto records = rjson::empty_array();
auto result_set = builder.build();
auto records = rjson::empty_array();
auto& metadata = result_set->get_metadata();
auto& metadata = result_set->get_metadata();
auto op_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == op_column_name;
})
);
auto ts_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == timestamp_column_name;
})
);
auto eor_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == eor_column_name;
})
);
auto clustering_key_index = clustering_key_column_name ? std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [&](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == *clustering_key_column_name;
})
) : 0;
auto op_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == op_column_name;
})
);
auto ts_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == timestamp_column_name;
})
);
auto eor_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == eor_column_name;
})
);
std::optional<utils::UUID> timestamp;
struct Record {
rjson::value record;
rjson::value dynamodb;
};
const managed_bytes empty_managed_bytes;
std::unordered_map<const managed_bytes*, Record, managed_bytes_ptr_hash, managed_bytes_ptr_equal> records_map;
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
std::optional<utils::UUID> timestamp;
auto dynamodb = rjson::empty_object();
auto record = rjson::empty_object();
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
using op_utype = std::underlying_type_t<cdc::operation>;
using op_utype = std::underlying_type_t<cdc::operation>;
for (auto& row : result_set->rows()) {
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
const managed_bytes* cs_ptr = clustering_key_column_name ? &*row[clustering_key_index] : &empty_managed_bytes;
auto records_it = records_map.emplace(cs_ptr, Record{});
auto &record = records_it.first->second;
auto maybe_add_record = [&] {
if (!dynamodb.ObjectEmpty()) {
rjson::add(record, "dynamodb", std::move(dynamodb));
dynamodb = rjson::empty_object();
}
if (!record.ObjectEmpty()) {
rjson::add(record, "awsRegion", rjson::from_string(dc_name));
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
rjson::add(record, "eventSource", "scylladb:alternator");
rjson::add(record, "eventVersion", "1.1");
rjson::push_back(records, std::move(record));
record = rjson::empty_object();
--limit;
}
};
if (records_it.second) {
record.dynamodb = rjson::empty_object();
record.record = rjson::empty_object();
auto keys = rjson::empty_object();
describe_single_item(*selection, row, key_names, keys);
rjson::add(record.dynamodb, "Keys", std::move(keys));
rjson::add(record.dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
rjson::add(record.dynamodb, "SequenceNumber", sequence_number(ts));
rjson::add(record.dynamodb, "StreamViewType", type);
// TODO: SizeBytes
}
for (auto& row : result_set->rows()) {
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
/**
* We merge rows with same timestamp into a single event.
* This is pretty much needed, because a CDC row typically
* encodes ~half the info of an alternator write.
*
* A big, big downside to how alternator records are written
* (i.e. CQL), is that the distinction between INSERT and UPDATE
* is somewhat lost/unmappable to actual eventName.
* A write (currently) always looks like an insert+modify
* regardless whether we wrote existing record or not.
*
* Maybe RMW ops could be done slightly differently so
* we can distinguish them here...
*
* For now, all writes will become MODIFY.
*
* Note: we do not check the current pre/post
* flags on CDC log, instead we use data to
* drive what is returned. This is (afaict)
* consistent with dynamo streams
*
* Note: BatchWriteItem will generate multiple records with
* the same timestamp, when write isolation is set to always
* (which triggers lwt), so we need to unpack them based on clustering key.
*/
switch (op) {
case cdc::operation::pre_image:
case cdc::operation::post_image:
{
auto item = rjson::empty_object();
describe_single_item(*selection, row, attr_names, item, nullptr, true);
describe_single_item(*selection, row, key_names, item);
rjson::add(record.dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
break;
}
case cdc::operation::update:
rjson::add(record.record, "eventName", "MODIFY");
break;
case cdc::operation::insert:
rjson::add(record.record, "eventName", "INSERT");
break;
case cdc::operation::service_row_delete:
case cdc::operation::service_partition_delete:
{
auto user_identity = rjson::empty_object();
rjson::add(user_identity, "Type", "Service");
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
rjson::add(record.record, "userIdentity", std::move(user_identity));
rjson::add(record.record, "eventName", "REMOVE");
break;
}
default:
rjson::add(record.record, "eventName", "REMOVE");
break;
}
if (eor) {
size_t index = 0;
for (auto& [_, rec] : records_map) {
rjson::add(rec.record, "awsRegion", rjson::from_string(dc_name));
rjson::add(rec.record, "eventID", event_id(iter.shard.id, *timestamp, index++));
rjson::add(rec.record, "eventSource", "scylladb:alternator");
rjson::add(rec.record, "eventVersion", "1.1");
rjson::add(rec.record, "dynamodb", std::move(rec.dynamodb));
rjson::push_back(records, std::move(rec.record));
if (!dynamodb.HasMember("Keys")) {
auto keys = rjson::empty_object();
describe_single_item(*selection, row, key_names, keys);
rjson::add(dynamodb, "Keys", std::move(keys));
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
rjson::add(dynamodb, "StreamViewType", type);
// TODO: SizeBytes
}
records_map.clear();
timestamp = ts;
if (records.Size() >= limit) {
// Note: we might have more than limit rows here - BatchWriteItem will emit multiple items
// with the same timestamp and we have no way of resume iteration midway through those,
// so we return all of them here.
/**
* We merge rows with same timestamp into a single event.
* This is pretty much needed, because a CDC row typically
* encodes ~half the info of an alternator write.
*
* A big, big downside to how alternator records are written
* (i.e. CQL), is that the distinction between INSERT and UPDATE
* is somewhat lost/unmappable to actual eventName.
* A write (currently) always looks like an insert+modify
* regardless whether we wrote existing record or not.
*
* Maybe RMW ops could be done slightly differently so
* we can distinguish them here...
*
* For now, all writes will become MODIFY.
*
* Note: we do not check the current pre/post
* flags on CDC log, instead we use data to
* drive what is returned. This is (afaict)
* consistent with dynamo streams
*/
switch (op) {
case cdc::operation::pre_image:
case cdc::operation::post_image:
{
auto item = rjson::empty_object();
describe_single_item(*selection, row, attr_names, item, nullptr, true);
describe_single_item(*selection, row, key_names, item);
rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
break;
}
case cdc::operation::update:
rjson::add(record, "eventName", "MODIFY");
break;
case cdc::operation::insert:
rjson::add(record, "eventName", "INSERT");
break;
case cdc::operation::service_row_delete:
case cdc::operation::service_partition_delete:
{
auto user_identity = rjson::empty_object();
rjson::add(user_identity, "Type", "Service");
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
rjson::add(record, "userIdentity", std::move(user_identity));
rjson::add(record, "eventName", "REMOVE");
break;
}
default:
rjson::add(record, "eventName", "REMOVE");
break;
}
if (eor) {
maybe_add_record();
timestamp = ts;
if (limit == 0) {
break;
}
}
}
}
auto ret = rjson::empty_object();
rjson::add(ret, "Records", std::move(records));
auto ret = rjson::empty_object();
auto nrecords = records.Size();
rjson::add(ret, "Records", std::move(records));
if (timestamp) {
// #9642. Set next iterators threshold to > last
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
// Note that here we unconditionally return NextShardIterator,
// without checking if maybe we reached the end-of-shard. If the
// shard did end, then the next read will have nrecords == 0 and
// will notice end end of shard and not return NextShardIterator.
rjson::add(ret, "NextShardIterator", next_iter);
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
co_return rjson::print(std::move(ret));
}
if (nrecords != 0) {
// #9642. Set next iterators threshold to > last
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
// Note that here we unconditionally return NextShardIterator,
// without checking if maybe we reached the end-of-shard. If the
// shard did end, then the next read will have nrecords == 0 and
// will notice end end of shard and not return NextShardIterator.
rjson::add(ret, "NextShardIterator", next_iter);
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
}
// ugh. figure out if we are and end-of-shard
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
// ugh. figure out if we are and end-of-shard
auto normal_token_owners = _proxy.get_token_metadata_ptr()->count_normal_token_owners();
db_clock::time_point ts = co_await _sdks.cdc_current_generation_timestamp({ normal_token_owners });
auto& shard = iter.shard;
return _sdks.cdc_current_generation_timestamp({ normal_token_owners }).then([this, iter, high_ts, start_time, ret = std::move(ret)](db_clock::time_point ts) mutable {
auto& shard = iter.shard;
if (shard.time < ts && ts < high_ts) {
// The DynamoDB documentation states that when a shard is
// closed, reading it until the end has NextShardIterator
// "set to null". Our test test_streams_closed_read
// confirms that by "null" they meant not set at all.
} else {
// We could have return the same iterator again, but we did
// a search from it until high_ts and found nothing, so we
// can also start the next search from high_ts.
// TODO: but why? It's simpler just to leave the iterator be.
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
rjson::add(ret, "NextShardIterator", iter);
}
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
if (is_big(ret)) {
co_return make_streamed(std::move(ret));
}
co_return rjson::print(std::move(ret));
if (shard.time < ts && ts < high_ts) {
// The DynamoDB documentation states that when a shard is
// closed, reading it until the end has NextShardIterator
// "set to null". Our test test_streams_closed_read
// confirms that by "null" they meant not set at all.
} else {
// We could have return the same iterator again, but we did
// a search from it until high_ts and found nothing, so we
// can also start the next search from high_ts.
// TODO: but why? It's simpler just to leave the iterator be.
shard_iterator next_iter(iter.table, iter.shard, utils::UUID_gen::min_time_UUID(high_ts.time_since_epoch()), true);
rjson::add(ret, "NextShardIterator", iter);
}
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
if (is_big(ret)) {
return make_ready_future<executor::request_return_type>(make_streamed(std::move(ret)));
}
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
});
});
}
bool executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {
@@ -1122,7 +1079,6 @@ bool executor::add_stream_options(const rjson::value& stream_specification, sche
cdc::options opts;
opts.enabled(true);
// cdc::delta_mode is ignored by Alternator, so aim for the least overhead.
opts.set_delta_mode(cdc::delta_mode::keys);
opts.ttl(std::chrono::duration_cast<std::chrono::seconds>(dynamodb_streams_max_window).count());

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <chrono>
@@ -46,7 +46,6 @@
#include "alternator/executor.hh"
#include "alternator/controller.hh"
#include "alternator/serialization.hh"
#include "alternator/ttl_tag.hh"
#include "dht/sharder.hh"
#include "db/config.hh"
#include "db/tags/utils.hh"
@@ -58,10 +57,19 @@ static logging::logger tlogger("alternator_ttl");
namespace alternator {
// We write the expiration-time attribute enabled on a table in a
// tag TTL_TAG_KEY.
// Currently, the *value* of this tag is simply the name of the attribute,
// and the expiration scanner interprets it as an Alternator attribute name -
// It can refer to a real column or if that doesn't exist, to a member of
// the ":attrs" map column. Although this is designed for Alternator, it may
// be good enough for CQL as well (there, the ":attrs" column won't exist).
extern const sstring TTL_TAG_KEY;
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.update_time_to_live++;
if (!_proxy.features().alternator_ttl) {
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Upgrade all nodes to a version that supports it.");
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
}
schema_ptr schema = get_table(_proxy, request);
@@ -133,7 +141,7 @@ future<executor::request_return_type> executor::describe_time_to_live(client_sta
// expiration_service is a sharded service responsible for cleaning up expired
// items in all tables with per-item expiration enabled. Currently, this means
// Alternator tables with TTL configured via an UpdateTimeToLive request.
// Alternator tables with TTL configured via a UpdateTimeToLive request.
//
// Here is a brief overview of how the expiration service works:
//
@@ -316,7 +324,9 @@ static future<std::vector<std::pair<dht::token_range, locator::host_id>>> get_se
const auto& tm = *erm->get_token_metadata_ptr();
const auto& sorted_tokens = tm.sorted_tokens();
std::vector<std::pair<dht::token_range, locator::host_id>> ret;
throwing_assert(!sorted_tokens.empty());
if (sorted_tokens.empty()) {
on_internal_error(tlogger, "Token metadata is empty");
}
auto prev_tok = sorted_tokens.back();
for (const auto& tok : sorted_tokens) {
co_await coroutine::maybe_yield();
@@ -553,7 +563,7 @@ static future<> scan_table_ranges(
expiration_service::stats& expiration_stats)
{
const schema_ptr& s = scan_ctx.s;
throwing_assert(partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
SCYLLA_ASSERT (partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
auto p = service::pager::query_pagers::pager(proxy, s, scan_ctx.selection, *scan_ctx.query_state_ptr,
*scan_ctx.query_options, scan_ctx.command, std::move(partition_ranges), nullptr);
while (!p->is_exhausted()) {
@@ -583,7 +593,7 @@ static future<> scan_table_ranges(
if (retries >= 10) {
// Don't get stuck forever asking the same page, maybe there's
// a bug or a real problem in several replicas. Give up on
// this scan and retry the scan from a random position later,
// this scan an retry the scan from a random position later,
// in the next scan period.
throw runtime_exception("scanner thread failed after too many timeouts for the same page");
}
@@ -630,38 +640,13 @@ static future<> scan_table_ranges(
}
} else {
// For a real column to contain an expiration time, it
// must be a numeric type. We currently support decimal
// (used by Alternator TTL) as well as bigint, int and
// timestamp (used by CQL per-row TTL).
switch (meta[*expiration_column]->type->get_kind()) {
case abstract_type::kind::decimal:
// Used by Alternator TTL for key columns not stored
// in the map. The value is in seconds, fractional
// part is ignored.
expired = is_expired(value_cast<big_decimal>(v), now);
break;
case abstract_type::kind::long_kind:
// Used by CQL per-row TTL. The value is in seconds.
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int64_t>(v))), now);
break;
case abstract_type::kind::int32:
// Used by CQL per-row TTL. The value is in seconds.
// Using int type is not recommended because it will
// overflow in 2038, but we support it to allow users
// to use existing int columns for expiration.
expired = is_expired(gc_clock::time_point(std::chrono::seconds(value_cast<int32_t>(v))), now);
break;
case abstract_type::kind::timestamp:
// Used by CQL per-row TTL. The value is in milliseconds
// but we truncate it to gc_clock's precision (whole seconds).
expired = is_expired(gc_clock::time_point(std::chrono::duration_cast<gc_clock::duration>(value_cast<db_clock::time_point>(v).time_since_epoch())), now);
break;
default:
// Should never happen - we verified the column's type
// before starting the scan.
[[unlikely]]
on_internal_error(tlogger, format("expiration scanner value of unsupported type {} in column {}", meta[*expiration_column]->type->cql3_type_name(), scan_ctx.column_name) );
}
// must be a numeric type.
// FIXME: Currently we only support decimal_type (which is
// what Alternator uses), but other numeric types can be
// supported as well to make this feature more useful in CQL.
// Note that kind::decimal is also checked above.
big_decimal n = value_cast<big_decimal>(v);
expired = is_expired(n, now);
}
if (expired) {
expiration_stats.items_deleted++;
@@ -723,12 +708,16 @@ static future<bool> scan_table(
co_return false;
}
// attribute_name may be one of the schema's columns (in Alternator, this
// means a key column, in CQL it's a regular column), or an element in
// Alternator's attrs map encoded in Alternator's JSON encoding (which we
// decode). If attribute_name is a real column, in Alternator it will have
// the type decimal, counting seconds since the UNIX epoch, while in CQL
// it will one of the types bigint or int (counting seconds) or timestamp
// (counting milliseconds).
// means it's a key column), or an element in Alternator's attrs map
// encoded in Alternator's JSON encoding.
// FIXME: To make this less Alternators-specific, we should encode in the
// single key's value three things:
// 1. The name of a column
// 2. Optionally if column is a map, a member in the map
// 3. The deserializer for the value: CQL or Alternator (JSON).
// The deserializer can be guessed: If the given column or map item is
// numeric, it can be used directly. If it is a "bytes" type, it needs to
// be deserialized using Alternator's deserializer.
bytes column_name = to_bytes(*attribute_name);
const column_definition *cd = s->get_column_definition(column_name);
std::optional<std::string> member;
@@ -747,14 +736,11 @@ static future<bool> scan_table(
data_type column_type = cd->type;
// Verify that the column has the right type: If "member" exists
// the column must be a map, and if it doesn't, the column must
// be decimal_type (Alternator), bigint, int or timestamp (CQL).
// If the column has the wrong type nothing can get expired in
// this table, and it's pointless to scan it.
// (currently) be a decimal_type. If the column has the wrong type
// nothing can get expired in this table, and it's pointless to
// scan it.
if ((member && column_type->get_kind() != abstract_type::kind::map) ||
(!member && column_type->get_kind() != abstract_type::kind::decimal &&
column_type->get_kind() != abstract_type::kind::long_kind &&
column_type->get_kind() != abstract_type::kind::int32 &&
column_type->get_kind() != abstract_type::kind::timestamp)) {
(!member && column_type->get_kind() != abstract_type::kind::decimal)) {
tlogger.info("table {} TTL column has unsupported type, not scanning", s->cf_name());
co_return false;
}
@@ -781,7 +767,7 @@ static future<bool> scan_table(
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet, erm->get_topology()); // throws if no secondary replica
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica
if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) {
if (!gossiper.is_alive(tablet_primary_replica.host)) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);
@@ -892,10 +878,12 @@ future<> expiration_service::run() {
future<> expiration_service::start() {
// Called by main() on each shard to start the expiration-service
// thread. Just runs run() in the background and allows stop().
if (!shutting_down()) {
_end = run().handle_exception([] (std::exception_ptr ep) {
tlogger.error("expiration_service failed: {}", ep);
});
if (_db.features().alternator_ttl) {
if (!shutting_down()) {
_end = run().handle_exception([] (std::exception_ptr ep) {
tlogger.error("expiration_service failed: {}", ep);
});
}
}
return make_ready_future<>();
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -30,7 +30,7 @@ namespace alternator {
// expiration_service is a sharded service responsible for cleaning up expired
// items in all tables with per-item expiration enabled. Currently, this means
// Alternator tables with TTL configured via an UpdateTimeToLive request.
// Alternator tables with TTL configured via a UpdateTimeToLeave request.
class expiration_service final : public seastar::peering_sharded_service<expiration_service> {
public:
// Object holding per-shard statistics related to the expiration service.
@@ -52,7 +52,7 @@ private:
data_dictionary::database _db;
service::storage_proxy& _proxy;
gms::gossiper& _gossiper;
// _end is set by start(), and resolves when the background service
// _end is set by start(), and resolves when the the background service
// started by it ends. To ask the background service to end, _abort_source
// should be triggered. stop() below uses both _abort_source and _end.
std::optional<future<>> _end;

View File

@@ -1,26 +0,0 @@
/*
* Copyright 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include "seastarx.hh"
#include <seastar/core/sstring.hh>
namespace alternator {
// We use the table tag TTL_TAG_KEY ("system:ttl_attribute") to remember
// which attribute was chosen as the expiration-time attribute for
// Alternator's TTL and CQL's per-row TTL features.
// Currently, the *value* of this tag is simply the name of the attribute:
// It can refer to a real column or if that doesn't exist, to a member of
// the ":attrs" map column (which Alternator uses).
extern const sstring TTL_TAG_KEY;
} // namespace alternator
// let users use TTL_TAG_KEY without the "alternator::" prefix,
// to make it easier to move it to a different namespace later.
using alternator::TTL_TAG_KEY;

View File

@@ -12,7 +12,7 @@
"operations":[
{
"method":"POST",
"summary":"Resets authorized prepared statements cache",
"summary":"Reset cache",
"type":"void",
"nickname":"authorization_cache_reset",
"produces":[

View File

@@ -243,7 +243,7 @@
"GOSSIP_DIGEST_SYN",
"GOSSIP_DIGEST_ACK2",
"GOSSIP_SHUTDOWN",
"UNUSED__DEFINITIONS_UPDATE",
"DEFINITIONS_UPDATE",
"TRUNCATE",
"UNUSED__REPLICATION_FINISHED",
"MIGRATION_REQUEST",

View File

@@ -743,7 +743,7 @@
"parameters":[
{
"name":"tag",
"description":"The snapshot tag to delete. If omitted, all snapshots are removed.",
"description":"the tag given to the snapshot",
"required":false,
"allowMultiple":false,
"type":"string",
@@ -751,7 +751,7 @@
},
{
"name":"kn",
"description":"Comma-separated list of keyspace names to delete snapshots from. If omitted, snapshots are deleted from all keyspaces.",
"description":"Comma-separated keyspaces name that their snapshot will be deleted",
"required":false,
"allowMultiple":false,
"type":"string",
@@ -759,7 +759,7 @@
},
{
"name":"cf",
"description":"A table name used to filter which table's snapshots are deleted. If omitted or empty, snapshots for all tables are eligible. When provided together with 'kn', the table is looked up in each listed keyspace independently. For secondary indexes, the logical index name (e.g. 'myindex') can be used and is resolved automatically.",
"description":"an optional table name that its snapshot will be deleted",
"required":false,
"allowMultiple":false,
"type":"string",
@@ -1295,45 +1295,6 @@
}
]
},
{
"path":"/storage_service/logstor_compaction",
"operations":[
{
"method":"POST",
"summary":"Trigger compaction of the key-value storage",
"type":"void",
"nickname":"logstor_compaction",
"produces":[
"application/json"
],
"parameters":[
{
"name":"major",
"description":"When true, perform a major compaction",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/logstor_flush",
"operations":[
{
"method":"POST",
"summary":"Trigger flush of logstor storage",
"type":"void",
"nickname":"logstor_flush",
"produces":[
"application/json"
],
"parameters":[]
}
]
},
{
"path":"/storage_service/active_repair/",
"operations":[
@@ -3090,7 +3051,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"required":false,
"allowMultiple":false,
"type":"string",
@@ -3124,125 +3085,6 @@
}
]
},
{
"path":"/storage_service/tablets/snapshots",
"operations":[
{
"method":"POST",
"summary":"Takes the snapshot for the given keyspaces/tables. A snapshot name must be specified.",
"type":"void",
"nickname":"take_cluster_snapshot",
"produces":[
"application/json"
],
"parameters":[
{
"name":"tag",
"description":"the tag given to the snapshot",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"keyspace",
"description":"Keyspace(s) to snapshot. Multiple keyspaces can be provided using a comma-separated list. If omitted, snapshot all keyspaces.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"Table(s) to snapshot. Multiple tables (in a single keyspace) can be provided using a comma-separated list. If omitted, snapshot all tables in the given keyspace(s).",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/vnode_tablet_migrations/keyspaces/{keyspace}",
"operations":[{
"method":"POST",
"summary":"Start vnodes-to-tablets migration for all tables in a keyspace",
"type":"void",
"nickname":"create_vnode_tablet_migration",
"produces":["application/json"],
"parameters":[
{
"name":"keyspace",
"description":"Keyspace name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
}
]
},
{
"method":"GET",
"summary":"Get a keyspace's vnodes-to-tablets migration status",
"type":"vnode_tablet_migration_status",
"nickname":"get_vnode_tablet_migration",
"produces":["application/json"],
"parameters":[
{
"name":"keyspace",
"description":"Keyspace name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
}
]
}]
},
{
"path":"/storage_service/vnode_tablet_migrations/node/storage_mode",
"operations":[{
"method":"PUT",
"summary":"Set the intended storage mode for this node during vnodes-to-tablets migration",
"type":"void",
"nickname":"set_vnode_tablet_migration_node_storage_mode",
"produces":["application/json"],
"parameters":[
{
"name":"intended_mode",
"description":"Intended storage mode (tablets or vnodes)",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}]
},
{
"path":"/storage_service/vnode_tablet_migrations/keyspaces/{keyspace}/finalization",
"operations":[{
"method":"POST",
"summary":"Finalize vnodes-to-tablets migration for all tables in a keyspace",
"type":"void",
"nickname":"finalize_vnode_tablet_migration",
"produces":["application/json"],
"parameters":[
{
"name":"keyspace",
"description":"Keyspace name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
}
]
}]
},
{
"path":"/storage_service/quiesce_topology",
"operations":[
@@ -3345,38 +3187,6 @@
}
]
},
{
"path":"/storage_service/logstor_info",
"operations":[
{
"method":"GET",
"summary":"Logstor segment information for one table",
"type":"table_logstor_info",
"nickname":"logstor_info",
"produces":[
"application/json"
],
"parameters":[
{
"name":"keyspace",
"description":"The keyspace",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"table name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/retrain_dict",
"operations":[
@@ -3785,47 +3595,6 @@
}
}
},
"logstor_hist_bucket":{
"id":"logstor_hist_bucket",
"properties":{
"bucket":{
"type":"long"
},
"count":{
"type":"long"
},
"min_data_size":{
"type":"long"
},
"max_data_size":{
"type":"long"
}
}
},
"table_logstor_info":{
"id":"table_logstor_info",
"description":"Per-table logstor segment distribution",
"properties":{
"keyspace":{
"type":"string"
},
"table":{
"type":"string"
},
"compaction_groups":{
"type":"long"
},
"segments":{
"type":"long"
},
"data_size_histogram":{
"type":"array",
"items":{
"$ref":"logstor_hist_bucket"
}
}
}
},
"tablet_repair_result":{
"id":"tablet_repair_result",
"description":"Tablet repair result",
@@ -3860,45 +3629,6 @@
"description":"The resulting compression ratio (estimated on a random sample of files)"
}
}
},
"vnode_tablet_migration_node_status":{
"id":"vnode_tablet_migration_node_status",
"description":"Node storage mode info during vnodes-to-tablets migration",
"properties":{
"host_id":{
"type":"string",
"description":"The host ID"
},
"current_mode":{
"type":"string",
"description":"The current storage mode: `vnodes` or `tablets`"
},
"intended_mode":{
"type":"string",
"description":"The intended storage mode: `vnodes` or `tablets`"
}
}
},
"vnode_tablet_migration_status":{
"id":"vnode_tablet_migration_status",
"description":"Vnodes-to-tablets migration status for a keyspace",
"properties":{
"keyspace":{
"type":"string",
"description":"The keyspace name"
},
"status":{
"type":"string",
"description":"The migration status: `vnodes` (not started), `migrating_to_tablets` (in progress), or `tablets` (complete)"
},
"nodes":{
"type":"array",
"items":{
"$ref":"vnode_tablet_migration_node_status"
},
"description":"Per-node storage mode information. Empty if the keyspace is not being migrated."
}
}
}
}
}

View File

@@ -209,21 +209,6 @@
"parameters":[]
}
]
},
{
"path":"/system/chosen_sstable_version",
"operations":[
{
"method":"GET",
"summary":"Get sstable version currently chosen for use in new sstables",
"type":"string",
"nickname":"get_chosen_sstable_version",
"produces":[
"application/json"
],
"parameters":[]
}
]
}
]
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "api.hh"
@@ -122,9 +122,9 @@ future<> unset_thrift_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_thrift_controller(ctx, r); });
}
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& ssc, service::raft_group0_client& group0_client) {
return ctx.http_server.set_routes([&ctx, &ss, &ssc, &group0_client] (routes& r) {
set_storage_service(ctx, r, ss, ssc, group0_client);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client& group0_client) {
return ctx.http_server.set_routes([&ctx, &ss, &group0_client] (routes& r) {
set_storage_service(ctx, r, ss, group0_client);
});
}

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -23,6 +23,31 @@
namespace api {
template<class T>
std::vector<T> map_to_key_value(const std::map<sstring, sstring>& map) {
std::vector<T> res;
res.reserve(map.size());
for (const auto& [key, value] : map) {
res.push_back(T());
res.back().key = key;
res.back().value = value;
}
return res;
}
template<class T, class MAP>
std::vector<T>& map_to_key_value(const MAP& map, std::vector<T>& res) {
res.reserve(res.size() + std::size(map));
for (const auto& [key, value] : map) {
T val;
val.key = fmt::to_string(key);
val.value = fmt::to_string(value);
res.push_back(val);
}
return res;
}
template <typename T, typename S = T>
T map_sum(T&& dest, const S& src) {
for (const auto& i : src) {

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
@@ -98,7 +98,7 @@ future<> set_server_config(http_context& ctx, db::config& cfg);
future<> unset_server_config(http_context& ctx);
future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snitch);
future<> unset_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>&, service::raft_group0_client&);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
future<> unset_server_storage_service(http_context& ctx);
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
future<> unset_server_client_routes(http_context& ctx);

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "api/api-doc/authorization_cache.json.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "cache_service.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -4,7 +4,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/http/short_streams.hh>
@@ -100,8 +100,9 @@ rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
const auto route_entries = parse_set_client_array(root);
co_await cr.local().set_client_routes(parse_set_client_array(root));
co_await cr.local().set_client_routes(route_entries);
co_return seastar::json::json_void();
}
@@ -131,7 +132,8 @@ rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_serv
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
const auto route_keys = parse_delete_client_array(root);
co_await cr.local().delete_client_routes(route_keys);
co_return seastar::json::json_void();
}

View File

@@ -4,7 +4,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "collectd.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <fmt/ranges.h>
@@ -18,9 +18,7 @@
#include "utils/assert.hh"
#include "utils/estimated_histogram.hh"
#include <algorithm>
#include <sstream>
#include "db/data_listeners.hh"
#include "utils/hash.hh"
#include "storage_service.hh"
#include "compaction/compaction_manager.hh"
#include "unimplemented.hh"
@@ -344,56 +342,6 @@ uint64_t accumulate_on_active_memtables(replica::table& t, noncopyable_function<
return ret;
}
static
future<json::json_return_type>
rest_toppartitions_generic(sharded<replica::database>& db, std::unique_ptr<http::request> req) {
bool filters_provided = false;
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters {};
if (auto filters = req->get_query_param("table_filters"); !filters.empty()) {
filters_provided = true;
std::stringstream ss { filters };
std::string filter;
while (!filters.empty() && ss.good()) {
std::getline(ss, filter, ',');
table_filters.emplace(parse_fully_qualified_cf_name(filter));
}
}
std::unordered_set<sstring> keyspace_filters {};
if (auto filters = req->get_query_param("keyspace_filters"); !filters.empty()) {
filters_provided = true;
std::stringstream ss { filters };
std::string filter;
while (!filters.empty() && ss.good()) {
std::getline(ss, filter, ',');
keyspace_filters.emplace(std::move(filter));
}
}
// when the query is empty return immediately
if (filters_provided && table_filters.empty() && keyspace_filters.empty()) {
apilog.debug("toppartitions query: processing results");
cf::toppartitions_query_results results;
results.read_cardinality = 0;
results.write_cardinality = 0;
return make_ready_future<json::json_return_type>(results);
}
api::req_param<std::chrono::milliseconds, unsigned> duration{*req, "duration", 1000ms};
api::req_param<unsigned> capacity(*req, "capacity", 256);
api::req_param<unsigned> list_size(*req, "list_size", 10);
apilog.info("toppartitions query: #table_filters={} #keyspace_filters={} duration={} list_size={} capacity={}",
!table_filters.empty() ? std::to_string(table_filters.size()) : "all", !keyspace_filters.empty() ? std::to_string(keyspace_filters.size()) : "all", duration.value, list_size.value, capacity.value);
return seastar::do_with(db::toppartitions_query(db, std::move(table_filters), std::move(keyspace_filters), duration.value, list_size, capacity), [] (db::toppartitions_query& q) {
return run_toppartitions_query(q);
});
}
void set_column_family(http_context& ctx, routes& r, sharded<replica::database>& db) {
cf::get_column_family_name.set(r, [&db] (const_req req){
std::vector<sstring> res;
@@ -1099,10 +1047,6 @@ void set_column_family(http_context& ctx, routes& r, sharded<replica::database>&
});
});
ss::toppartitions_generic.set(r, [&db] (std::unique_ptr<http::request> req) {
return rest_toppartitions_generic(db, std::move(req));
});
cf::force_major_compaction.set(r, [&ctx, &db](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
if (!req->get_query_param("split_output").empty()) {
fail(unimplemented::cause::API);
@@ -1269,7 +1213,6 @@ void unset_column_family(http_context& ctx, routes& r) {
cf::get_sstable_count_per_level.unset(r);
cf::get_sstables_for_key.unset(r);
cf::toppartitions.unset(r);
ss::toppartitions_generic.unset(r);
cf::force_major_compaction.unset(r);
ss::get_load.unset(r);
ss::get_metrics_load.unset(r);

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "commitlog.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/coroutine.hh>

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "api/api.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "build_mode.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#ifndef SCYLLA_BUILD_MODE_RELEASE

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "locator/snitch_base.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "api/api-doc/error_injection.json.hh"
@@ -23,7 +23,7 @@ void set_error_injection(http_context& ctx, routes& r) {
hf::enable_injection.set(r, [](std::unique_ptr<request> req) -> future<json::json_return_type> {
sstring injection = req->get_path_param("injection");
bool one_shot = strcasecmp(req->get_query_param("one_shot").c_str(), "true") == 0;
bool one_shot = req->get_query_param("one_shot") == "True";
auto params = co_await util::read_entire_stream_contiguous(*req->content_stream);
const size_t max_params_size = 1024 * 1024;

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "failure_detector.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/coroutine.hh>

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <vector>

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "api/api-doc/lsa.json.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "messaging_service.hh"

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once

View File

@@ -3,7 +3,7 @@
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/coroutine.hh>

Some files were not shown because too many files have changed in this diff Show More