Compare commits

..

8 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
309f1994db Squash commits into single clean commit
Squashed 7 commits into a single commit with a clean commit message
following repository conventions.

Co-authored-by: avikivity <1017210+avikivity@users.noreply.github.com>
2025-11-04 11:44:13 +00:00
copilot-swe-agent[bot]
70e83c7687 Remove temporary changes from .gitignore and tablets.cc
Removed leftover temporary comment and .gitignore entries that were
added during earlier push attempts.

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-11-01 10:27:52 +00:00
copilot-swe-agent[bot]
0f0d30cc34 Change process_one_row to return std::optional<tablet_id>
Per review feedback, returning std::optional is cleaner than using value_or.
Updated the function signature and call site to handle the optional properly.

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-11-01 09:18:49 +00:00
copilot-swe-agent[bot]
5faee478c4 Force push with updated commit message
User explicitly authorized force push to update commit 829b85e with Fixes line.

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-10-31 18:46:39 +00:00
copilot-swe-agent[bot]
88fb5130b3 Trigger push of updated commit message
Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-10-31 17:44:17 +00:00
copilot-swe-agent[bot]
e5db893125 Temporary commit to trigger push
Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-10-31 17:39:17 +00:00
copilot-swe-agent[bot]
829b85ee46 Fix tablet_id optional dereference for -O0 builds
Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-10-31 08:18:56 +00:00
copilot-swe-agent[bot]
009ab048f1 Initial plan 2025-10-31 08:10:33 +00:00
554 changed files with 6810 additions and 20435 deletions

View File

@@ -1,86 +0,0 @@
# ScyllaDB Development Instructions
## Project Context
High-performance distributed NoSQL database. Core values: performance, correctness, readability.
## Build System
### Modern Build (configure.py + ninja)
```bash
# Configure (run once per mode, or when switching modes)
./configure.py --mode=<mode> # mode: dev, debug, release, sanitize
# Build everything
ninja <mode>-build # e.g., ninja dev-build
# Build Scylla binary only (sufficient for Python integration tests)
ninja build/<mode>/scylla
# Build specific test
ninja build/<mode>/test/boost/<test_name>
```
## Running Tests
### C++ Unit Tests
```bash
# Run all tests in a file
./test.py --mode=<mode> test/<suite>/<test_name>.cc
# Run a single test case from a file
./test.py --mode=<mode> test/<suite>/<test_name>.cc::<test_case_name>
# Examples
./test.py --mode=dev test/boost/memtable_test.cc
./test.py --mode=dev test/raft/raft_server_test.cc::test_check_abort_on_client_api
```
**Important:**
- Use full path with `.cc` extension (e.g., `test/boost/test_name.cc`, not `boost/test_name`)
- To run a single test case, append `::<test_case_name>` to the file path
- If you encounter permission issues with cgroup metric gathering, add `--no-gather-metrics` flag
**Rebuilding Tests:**
- test.py does NOT automatically rebuild when test source files are modified
- Many tests are part of composite binaries (e.g., `combined_tests` in test/boost contains multiple test files)
- To find which binary contains a test, check `configure.py` in the repository root (primary source) or `test/<suite>/CMakeLists.txt`
- To rebuild a specific test binary: `ninja build/<mode>/test/<suite>/<binary_name>`
- Examples:
- `ninja build/dev/test/boost/combined_tests` (contains group0_voter_calculator_test.cc and others)
- `ninja build/dev/test/raft/replication_test` (standalone Raft test)
### Python Integration Tests
```bash
# Only requires Scylla binary (full build usually not needed)
ninja build/<mode>/scylla
# Run all tests in a file
./test.py --mode=<mode> <test_path>
# Run a single test case from a file
./test.py --mode=<mode> <test_path>::<test_function_name>
# Examples
./test.py --mode=dev alternator/
./test.py --mode=dev cluster/test_raft_voters::test_raft_limited_voters_retain_coordinator
# Optional flags
./test.py --mode=dev cluster/test_raft_no_quorum -v # Verbose output
./test.py --mode=dev cluster/test_raft_no_quorum --repeat 5 # Repeat test 5 times
```
**Important:**
- Use path without `.py` extension (e.g., `cluster/test_raft_no_quorum`, not `cluster/test_raft_no_quorum.py`)
- To run a single test case, append `::<test_function_name>` to the file path
- Add `-v` for verbose output
- Add `--repeat <num>` to repeat a test multiple times
- After modifying C++ source files, only rebuild the Scylla binary for Python tests - building the entire repository is unnecessary
## Code Philosophy
- Performance matters in hot paths (data read/write, inner loops)
- Self-documenting code through clear naming
- Comments explain "why", not "what"
- Prefer standard library over custom implementations
- Strive for simplicity and clarity, add complexity only when clearly justified
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context

View File

@@ -1,115 +0,0 @@
---
applyTo: "**/*.{cc,hh}"
---
# C++ Guidelines
**Important:** Always match the style and conventions of existing code in the file and directory.
## Memory Management
- Prefer stack allocation whenever possible
- Use `std::unique_ptr` by default for dynamic allocations
- `new`/`delete` are forbidden (use RAII)
- Use `seastar::lw_shared_ptr` or `seastar::shared_ptr` for shared ownership within same shard
- Use `seastar::foreign_ptr` for cross-shard sharing
- Avoid `std::shared_ptr` except when interfacing with external C++ APIs
- Avoid raw pointers except for non-owning references or C API interop
## Seastar Asynchronous Programming
- Use `seastar::future<T>` for all async operations
- Prefer coroutines (`co_await`, `co_return`) over `.then()` chains for readability
- Coroutines are preferred over `seastar::do_with()` for managing temporary state
- In hot paths where futures are ready, continuations may be more efficient than coroutines
- Chain futures with `.then()`, don't block with `.get()` (unless in `seastar::thread` context)
- All I/O must be asynchronous (no blocking calls)
- Use `seastar::gate` for shutdown coordination
- Use `seastar::semaphore` for resource limiting (not `std::mutex`)
- Break long loops with `maybe_yield()` to avoid reactor stalls
## Coroutines
```cpp
seastar::future<T> func() {
auto result = co_await async_operation();
co_return result;
}
```
## Error Handling
- Throw exceptions for errors (futures propagate them automatically)
- In data path: avoid exceptions, use `std::expected` (or `boost::outcome`) instead
- Use standard exceptions (`std::runtime_error`, `std::invalid_argument`)
- Database-specific: throw appropriate schema/query exceptions
## Performance
- Pass large objects by `const&` or `&&` (move semantics)
- Use `std::string_view` for non-owning string references
- Avoid copies: prefer move semantics
- Use `utils::chunked_vector` instead of `std::vector` for large allocations (>128KB)
- Minimize dynamic allocations in hot paths
## Database-Specific Types
- Use `schema_ptr` for schema references
- Use `mutation` and `mutation_partition` for data modifications
- Use `partition_key` and `clustering_key` for keys
- Use `api::timestamp_type` for database timestamps
- Use `gc_clock` for garbage collection timing
## Style
- C++23 standard (prefer modern features, especially coroutines)
- Use `auto` when type is obvious from RHS
- Avoid `auto` when it obscures the type
- Use range-based for loops: `for (const auto& item : container)`
- Use standard algorithms when they clearly simplify code (e.g., replacing 10-line loops)
- Avoid chaining multiple algorithms if a straightforward loop is clearer
- Mark functions and variables `const` whenever possible
- Use scoped enums: `enum class` (not unscoped `enum`)
## Headers
- Use `#pragma once`
- Include order: own header, C++ std, Seastar, Boost, project headers
- Forward declare when possible
- Never `using namespace` in headers (exception: `using namespace seastar` is globally available via `seastarx.hh`)
## Documentation
- Public APIs require clear documentation
- Implementation details should be self-evident from code
- Use `///` or Doxygen `/** */` for public documentation, `//` for implementation notes - follow the existing style
## Naming
- `snake_case` for most identifiers (classes, functions, variables, namespaces)
- Template parameters: `CamelCase` (e.g., `template<typename ValueType>`)
- Member variables: prefix with `_` (e.g., `int _count;`)
- Structs (value-only): no `_` prefix on members
- Constants and `constexpr`: `snake_case` (e.g., `static constexpr int max_size = 100;`)
- Files: `.hh` for headers, `.cc` for source
## Formatting
- 4 spaces indentation, never tabs
- Opening braces on same line as control structure (except namespaces)
- Space after keywords: `if (`, `while (`, `return `
- Whitespace around operators matches precedence: `*a + *b` not `* a+* b`
- Line length: keep reasonable (<160 chars), use continuation lines with double indent if needed
- Brace all nested scopes, even single statements
- Minimal patches: only format code you modify, never reformat entire files
## Logging
- Use structured logging with appropriate levels: DEBUG, INFO, WARN, ERROR
- Include context in log messages (e.g., request IDs)
- Never log sensitive data (credentials, PII)
## Forbidden
- `malloc`/`free`
- `printf` family (use logging or fmt)
- Raw pointers for ownership
- `using namespace` in headers
- Blocking operations: `std::sleep`, `std::read`, `std::mutex` (use Seastar equivalents)
- `std::atomic` (reserved for very special circumstances only)
- Macros (use `inline`, `constexpr`, or templates instead)
## Testing
When modifying existing code, follow TDD: create/update test first, then implement.
- Examine existing tests for style and structure
- Use Boost.Test framework
- Use `SEASTAR_THREAD_TEST_CASE` for Seastar asynchronous tests
- Aim for high code coverage, especially for new features and bug fixes
- Maintain bisectability: all tests must pass in every commit. Mark failing tests with `BOOST_FAIL()` or similar, then fix in subsequent commit

View File

@@ -1,51 +0,0 @@
---
applyTo: "**/*.py"
---
# Python Guidelines
**Important:** Match existing code style. Some directories (like `test/cqlpy` and `test/alternator`) prefer simplicity over type hints and docstrings.
## Style
- Follow PEP 8
- Use type hints for function signatures (unless directory style omits them)
- Use f-strings for formatting
- Line length: 160 characters max
- 4 spaces for indentation
## Imports
Order: standard library, third-party, local imports
```python
import os
import sys
import pytest
from cassandra.cluster import Cluster
from test.utils import setup_keyspace
```
Never use `from module import *`
## Documentation
All public functions/classes need docstrings (unless the current directory conventions omit them):
```python
def my_function(arg1: str, arg2: int) -> bool:
"""
Brief summary of function purpose.
Args:
arg1: Description of first argument.
arg2: Description of second argument.
Returns:
Description of return value.
"""
pass
```
## Testing Best Practices
- Maintain bisectability: all tests must pass in every commit
- Mark currently-failing tests with `@pytest.mark.xfail`, unmark when fixed
- Use descriptive names that convey intent
- Docstrings/comments should explain what the test verifies and why, and if it reproduces a specific issue or how it fits into the larger test suite

View File

@@ -142,31 +142,20 @@ def backport(repo, pr, version, commits, backport_base_branch, is_collaborator):
def with_github_keyword_prefix(repo, pr):
# GitHub issue pattern: #123, scylladb/scylladb#123, or full GitHub URLs
github_pattern = rf"(?:fix(?:|es|ed))\s*:?\s*(?:(?:(?:{repo.full_name})?#)|https://github\.com/{repo.full_name}/issues/)(\d+)"
# JIRA issue pattern: PKG-92 or https://scylladb.atlassian.net/browse/PKG-92
jira_pattern = r"(?:fix(?:|es|ed))\s*:?\s*(?:(?:https://scylladb\.atlassian\.net/browse/)?([A-Z]+-\d+))"
# Check PR body for GitHub issues
github_match = re.findall(github_pattern, pr.body, re.IGNORECASE)
# Check PR body for JIRA issues
jira_match = re.findall(jira_pattern, pr.body, re.IGNORECASE)
match = github_match or jira_match
if match:
pattern = rf"(?:fix(?:|es|ed))\s*:?\s*(?:(?:(?:{repo.full_name})?#)|https://github\.com/{repo.full_name}/issues/)(\d+)"
match = re.findall(pattern, pr.body, re.IGNORECASE)
if not match:
for commit in pr.get_commits():
match = re.findall(pattern, commit.commit.message, re.IGNORECASE)
if match:
print(f'{pr.number} has a valid close reference in commit message {commit.sha}')
break
if not match:
print(f'No valid close reference for {pr.number}')
return False
else:
return True
for commit in pr.get_commits():
github_match = re.findall(github_pattern, commit.commit.message, re.IGNORECASE)
jira_match = re.findall(jira_pattern, commit.commit.message, re.IGNORECASE)
if github_match or jira_match:
print(f'{pr.number} has a valid close reference in commit message {commit.sha}')
return True
print(f'No valid close reference for {pr.number}')
return False
def main():
args = parse_args()

View File

@@ -1,34 +0,0 @@
name: Docs / Validate metrics
on:
pull_request:
branches:
- master
- enterprise
paths:
- '**/*.cc'
- 'scripts/metrics-config.yml'
- 'scripts/get_description.py'
- 'docs/_ext/scylladb_metrics.py'
jobs:
validate-metrics:
runs-on: ubuntu-latest
name: Check metrics documentation coverage
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
- name: Install dependencies
run: pip install PyYAML
- name: Validate metrics
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml

View File

@@ -1,242 +0,0 @@
name: Trigger next gating
on:
pull_request_target:
types: [opened, reopened, synchronize]
issue_comment:
types: [created]
jobs:
trigger-ci:
runs-on: ubuntu-latest
steps:
- name: Dump GitHub context
env:
GITHUB_CONTEXT: ${{ toJson(github) }}
run: echo "$GITHUB_CONTEXT"
- name: Checkout PR code
uses: actions/checkout@v3
with:
fetch-depth: 0 # Needed to access full history
ref: ${{ github.event.pull_request.head.ref }}
- name: Fetch before commit if needed
run: |
if ! git cat-file -e ${{ github.event.before }} 2>/dev/null; then
echo "Fetching before commit ${{ github.event.before }}"
git fetch --depth=1 origin ${{ github.event.before }}
fi
- name: Compare commits for file changes
if: github.action == 'synchronize'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
echo "Base: ${{ github.event.before }}"
echo "Head: ${{ github.event.after }}"
TREE_BEFORE=$(git show -s --format=%T ${{ github.event.before }})
TREE_AFTER=$(git show -s --format=%T ${{ github.event.after }})
echo "TREE_BEFORE=$TREE_BEFORE" >> $GITHUB_ENV
echo "TREE_AFTER=$TREE_AFTER" >> $GITHUB_ENV
- name: Check if last push has file changes
run: |
if [[ "${{ env.TREE_BEFORE }}" == "${{ env.TREE_AFTER }}" ]]; then
echo "No file changes detected in the last push, only commit message edit."
echo "has_file_changes=false" >> $GITHUB_ENV
else
echo "File changes detected in the last push."
echo "has_file_changes=true" >> $GITHUB_ENV
fi
- name: Rule 1 - Check PR draft or conflict status
run: |
# Check if PR is in draft mode
IS_DRAFT="${{ github.event.pull_request.draft }}"
# Check if PR has 'conflict' label
HAS_CONFLICT_LABEL="false"
LABELS='${{ toJson(github.event.pull_request.labels) }}'
if echo "$LABELS" | jq -r '.[].name' | grep -q "^conflict$"; then
HAS_CONFLICT_LABEL="true"
fi
# Set draft_or_conflict variable
if [[ "$IS_DRAFT" == "true" || "$HAS_CONFLICT_LABEL" == "true" ]]; then
echo "draft_or_conflict=true" >> $GITHUB_ENV
echo "✅ Rule 1: PR is in draft mode or has conflict label - setting draft_or_conflict=true"
else
echo "draft_or_conflict=false" >> $GITHUB_ENV
echo "✅ Rule 1: PR is ready and has no conflict label - setting draft_or_conflict=false"
fi
echo "Draft status: $IS_DRAFT"
echo "Has conflict label: $HAS_CONFLICT_LABEL"
echo "Result: draft_or_conflict = $draft_or_conflict"
- name: Rule 2 - Check labels
run: |
# Check if PR has P0 or P1 labels
HAS_P0_P1_LABEL="false"
LABELS='${{ toJson(github.event.pull_request.labels) }}'
if echo "$LABELS" | jq -r '.[].name' | grep -E "^(P0|P1)$" > /dev/null; then
HAS_P0_P1_LABEL="true"
fi
# Check if PR already has force_on_cloud label
echo "HAS_FORCE_ON_CLOUD_LABEL=false" >> $GITHUB_ENV
if echo "$LABELS" | jq -r '.[].name' | grep -q "^force_on_cloud$"; then
HAS_FORCE_ON_CLOUD_LABEL="true"
echo "HAS_FORCE_ON_CLOUD_LABEL=true" >> $GITHUB_ENV
fi
echo "Has P0/P1 label: $HAS_P0_P1_LABEL"
echo "Has force_on_cloud label: $HAS_FORCE_ON_CLOUD_LABEL"
# Add force_on_cloud label if PR has P0/P1 and doesn't already have force_on_cloud
if [[ "$HAS_P0_P1_LABEL" == "true" && "$HAS_FORCE_ON_CLOUD_LABEL" == "false" ]]; then
echo "✅ Rule 2: PR has P0 or P1 label - adding force_on_cloud label"
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels" \
-d '{"labels":["force_on_cloud"]}'
elif [[ "$HAS_P0_P1_LABEL" == "true" && "$HAS_FORCE_ON_CLOUD_LABEL" == "true" ]]; then
echo "✅ Rule 2: PR has P0 or P1 label and already has force_on_cloud label - no action needed"
else
echo "✅ Rule 2: PR does not have P0 or P1 label - no force_on_cloud label needed"
fi
SKIP_UNIT_TEST_CUSTOM="false"
if echo "$LABELS" | jq -r '.[].name' | grep -q "^ci/skip_unit-tests_custom$"; then
SKIP_UNIT_TEST_CUSTOM="true"
fi
echo "SKIP_UNIT_TEST_CUSTOM=$SKIP_UNIT_TEST_CUSTOM" >> $GITHUB_ENV
- name: Rule 3 - Analyze changed files and set build requirements
run: |
# Get list of changed files
CHANGED_FILES=$(git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.event.pull_request.head.sha }})
echo "Changed files:"
echo "$CHANGED_FILES"
echo ""
# Initialize all requirements to false
REQUIRE_BUILD="false"
REQUIRE_DTEST="false"
REQUIRE_UNITTEST="false"
REQUIRE_ARTIFACTS="false"
REQUIRE_SCYLLA_GDB="false"
# Check each file against patterns
while IFS= read -r file; do
if [[ -n "$file" ]]; then
echo "Checking file: $file"
# Build pattern: ^(?!scripts\/pull_github_pr.sh).*$
# Everything except scripts/pull_github_pr.sh
if [[ "$file" != "scripts/pull_github_pr.sh" ]]; then
REQUIRE_BUILD="true"
echo " ✓ Matches build pattern"
fi
# Dtest pattern: ^(?!test(.py|\/)|dist\/docker\/|dist\/common\/scripts\/).*$
# Everything except test files, dist/docker/, dist/common/scripts/
if [[ ! "$file" =~ ^test\.(py|/).*$ ]] && [[ ! "$file" =~ ^dist/docker/.*$ ]] && [[ ! "$file" =~ ^dist/common/scripts/.*$ ]]; then
REQUIRE_DTEST="true"
echo " ✓ Matches dtest pattern"
fi
# Unittest pattern: ^(?!dist\/docker\/|dist\/common\/scripts).*$
# Everything except dist/docker/, dist/common/scripts/
if [[ ! "$file" =~ ^dist/docker/.*$ ]] && [[ ! "$file" =~ ^dist/common/scripts.*$ ]]; then
REQUIRE_UNITTEST="true"
echo " ✓ Matches unittest pattern"
fi
# Artifacts pattern: ^(?:dist|tools\/toolchain).*$
# Files starting with dist or tools/toolchain
if [[ "$file" =~ ^dist.*$ ]] || [[ "$file" =~ ^tools/toolchain.*$ ]]; then
REQUIRE_ARTIFACTS="true"
echo " ✓ Matches artifacts pattern"
fi
# Scylla GDB pattern: ^(scylla-gdb.py).*$
# Files starting with scylla-gdb.py
if [[ "$file" =~ ^scylla-gdb\.py.*$ ]]; then
REQUIRE_SCYLLA_GDB="true"
echo " ✓ Matches scylla_gdb pattern"
fi
fi
done <<< "$CHANGED_FILES"
# Set environment variables
echo "requireBuild=$REQUIRE_BUILD" >> $GITHUB_ENV
echo "requireDtest=$REQUIRE_DTEST" >> $GITHUB_ENV
echo "requireUnittest=$REQUIRE_UNITTEST" >> $GITHUB_ENV
echo "requireArtifacts=$REQUIRE_ARTIFACTS" >> $GITHUB_ENV
echo "requireScyllaGdb=$REQUIRE_SCYLLA_GDB" >> $GITHUB_ENV
echo ""
echo "✅ Rule 3: File analysis complete"
echo "Build required: $REQUIRE_BUILD"
echo "Dtest required: $REQUIRE_DTEST"
echo "Unittest required: $REQUIRE_UNITTEST"
echo "Artifacts required: $REQUIRE_ARTIFACTS"
echo "Scylla GDB required: $REQUIRE_SCYLLA_GDB"
- name: Determine Jenkins Job Name
run: |
if [[ "${{ github.ref_name }}" == "next" ]]; then
FOLDER_NAME="scylla-master"
elif [[ "${{ github.ref_name }}" == "next-enterprise" ]]; then
FOLDER_NAME="scylla-enterprise"
else
VERSION=$(echo "${{ github.ref_name }}" | awk -F'-' '{print $2}')
if [[ "$VERSION" =~ ^202[0-4]\.[0-9]+$ ]]; then
FOLDER_NAME="enterprise-$VERSION"
elif [[ "$VERSION" =~ ^[0-9]+\.[0-9]+$ ]]; then
FOLDER_NAME="scylla-$VERSION"
fi
fi
echo "JOB_NAME=${FOLDER_NAME}/job/scylla-ci" >> $GITHUB_ENV
- name: Trigger Jenkins Job
if: env.draft_or_conflict == 'false' && env.has_file_changes == 'true' && github.action == 'opened' || github.action == 'reopened'
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
echo "Triggering Jenkins Job: $JOB_NAME"
curl -X POST \
"$JENKINS_URL/job/$JOB_NAME/buildWithParameters? \
PR_NUMBER=$PR_NUMBER& \
RUN_DTEST=$REQUIRE_DTEST& \
RUN_ONLY_SCYLLA_GDB=$REQUIRE_SCYLLA_GDB& \
RUN_UNIT_TEST=$REQUIRE_UNITTEST& \
FORCE_ON_CLOUD=$HAS_FORCE_ON_CLOUD_LABEL& \
SKIP_UNIT_TEST_CUSTOM=$SKIP_UNIT_TEST_CUSTOM& \
RUN_ARTIFACT_TESTS=$REQUIRE_ARTIFACTS" \
--fail \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" \
-i -v
trigger-ci-via-comment:
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI Jenkins Job
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/$JOB_NAME/buildWithParameters?PR_NUMBER=$PR_NUMBER" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

1
.gitignore vendored
View File

@@ -37,3 +37,4 @@ clang_build
.idea/
nuke
rust/target

View File

@@ -116,7 +116,6 @@ list(APPEND absl_cxx_flags
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
list(APPEND ABSL_GCC_FLAGS ${absl_cxx_flags})
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
list(APPEND absl_cxx_flags "-Wno-deprecated-builtins")
list(APPEND ABSL_LLVM_FLAGS ${absl_cxx_flags})
endif()
set(ABSL_DEFAULT_LINKOPTS
@@ -164,45 +163,7 @@ file(MAKE_DIRECTORY "${scylla_gen_build_dir}")
include(add_version_library)
generate_scylla_version()
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
add_library(scylla-precompiled-header STATIC exported_templates.cc)
target_link_libraries(scylla-precompiled-header PRIVATE
absl::headers
absl::btree
absl::hash
absl::raw_hash_set
Seastar::seastar
Snappy::snappy
systemd
ZLIB::ZLIB
lz4::lz4_static
zstd::zstd_static)
if (Scylla_USE_PRECOMPILED_HEADER)
set(Scylla_USE_PRECOMPILED_HEADER_USE ON)
find_program(DISTCC_EXEC NAMES distcc OPTIONAL)
if (DISTCC_EXEC)
if(DEFINED ENV{DISTCC_HOSTS})
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
message(STATUS "Disabling precompiled header usage because distcc exists and DISTCC_HOSTS is set, assuming you're using distributed compilation.")
else()
file(REAL_PATH "~/.distcc/hosts" DIST_CC_HOSTS_PATH EXPAND_TILDE)
if (EXISTS ${DIST_CC_HOSTS_PATH})
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
message(STATUS "Disabling precompiled header usage because distcc and ~/.distcc/hosts exists, assuming you're using distributed compilation.")
endif()
endif()
endif()
if (Scylla_USE_PRECOMPILED_HEADER_USE)
message(STATUS "Using precompiled header for Scylla - remember to add `sloppiness = pch_defines,time_macros` to ccache.conf, if you're using ccache.")
target_precompile_headers(scylla-precompiled-header PRIVATE "stdafx.hh")
target_compile_definitions(scylla-precompiled-header PRIVATE SCYLLA_USE_PRECOMPILED_HEADER)
endif()
else()
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
endif()
add_library(scylla-main STATIC)
target_sources(scylla-main
PRIVATE
absl-flat_hash_map.cc
@@ -247,7 +208,6 @@ target_link_libraries(scylla-main
ZLIB::ZLIB
lz4::lz4_static
zstd::zstd_static
scylla-precompiled-header
)
option(Scylla_CHECK_HEADERS

View File

@@ -1,182 +0,0 @@
# SCYLLA_ASSERT to scylla_assert() Conversion Summary
## Objective
Replace crash-inducing `SCYLLA_ASSERT` with exception-throwing `scylla_assert()` to prevent cluster-wide crashes and maintain availability.
## What Was Done
### 1. Infrastructure Implementation ✓
Created new `scylla_assert()` macro in `utils/assert.hh`:
- Based on `on_internal_error()` for exception-based error handling
- Supports optional custom error messages via variadic arguments
- Uses `seastar::format()` for string formatting
- Compatible with C++23 standard (uses `__VA_OPT__`)
**Key difference from SCYLLA_ASSERT:**
```cpp
// Old: Crashes the process immediately
SCYLLA_ASSERT(condition);
// New: Throws exception (or aborts based on config)
scylla_assert(condition);
scylla_assert(condition, "custom error message: {}", value);
```
### 2. Comprehensive Analysis ✓
Analyzed entire codebase to identify safe vs unsafe conversion locations:
**Statistics:**
- Total SCYLLA_ASSERT usages: ~1307 (including tests)
- Non-test usages: ~886
- **Unsafe to convert**: 223 usages (25%)
- In noexcept functions: 187 usages across 50 files
- In destructors: 36 usages across 25 files
- **Safe to convert**: ~668 usages (75%)
- **Converted in this PR**: 112 usages (16.8% of safe conversions)
### 3. Documentation ✓
Created comprehensive documentation:
1. **Conversion Guide** (`docs/dev/scylla_assert_conversion.md`)
- Explains safe vs unsafe contexts
- Provides conversion strategy
- Lists all completed conversions
- Includes testing guidance
2. **Unsafe Locations Report** (`docs/dev/unsafe_scylla_assert_locations.md`)
- Detailed listing of 223 unsafe locations
- Organized by file with line numbers
- Separated into noexcept and destructor categories
### 4. Sample Conversions ✓
Converted 112 safe SCYLLA_ASSERT usages across 32 files as demonstration:
| File | Conversions | Context |
|------|------------|---------|
| db/large_data_handler.{cc,hh} | 5 | Future-returning functions |
| db/schema_applier.cc | 1 | Coroutine function |
| db/system_distributed_keyspace.cc | 1 | Regular function |
| db/commitlog/commitlog_replayer.cc | 1 | Coroutine function |
| db/view/row_locking.cc | 2 | Regular function |
| db/size_estimates_virtual_reader.cc | 1 | Lambda in coroutine |
| db/corrupt_data_handler.cc | 2 | Lambdas in future-returning function |
| raft/tracker.cc | 2 | Unreachable code (switch defaults) |
| service/topology_coordinator.cc | 11 | Coroutine functions (topology operations) |
| service/storage_service.cc | 28 | Critical node lifecycle operations |
| sstables/* (22 files) | 58 | SSTable operations (read/write/compress/index) |
All conversions were in **safe contexts** (non-noexcept, non-destructor functions). 3 assertions in storage_service.cc remain as SCYLLA_ASSERT (in noexcept functions).
## Why These Cannot Be Converted
### Unsafe Context #1: noexcept Functions (187 usages)
**Problem**: Throwing from noexcept causes `std::terminate()`, same as crash.
**Example** (from `locator/production_snitch_base.hh`):
```cpp
virtual bool prefer_local() const noexcept override {
SCYLLA_ASSERT(_backreference != nullptr); // Cannot convert!
return _backreference->prefer_local();
}
```
**Solution for these**: Keep as SCYLLA_ASSERT or use `on_fatal_internal_error()`.
### Unsafe Context #2: Destructors (36 usages)
**Problem**: Destructors are implicitly noexcept, throwing causes `std::terminate()`.
**Example** (from `utils/file_lock.cc`):
```cpp
~file_lock() noexcept {
if (_fd.get() != -1) {
SCYLLA_ASSERT(_fd.get() != -1); // Cannot convert!
auto r = ::flock(_fd.get(), LOCK_UN);
SCYLLA_ASSERT(r == 0); // Cannot convert!
}
}
```
**Solution for these**: Keep as SCYLLA_ASSERT.
## Benefits of scylla_assert()
1. **Prevents Cluster-Wide Crashes**
- Exception can be caught and handled gracefully
- Failed node doesn't bring down entire cluster
2. **Maintains Availability**
- Service can continue with degraded functionality
- Better than complete crash
3. **Better Error Reporting**
- Includes backtrace via `on_internal_error()`
- Supports custom error messages
- Configurable abort-on-error for testing
4. **Backward Compatible**
- SCYLLA_ASSERT still exists for unsafe contexts
- Can be gradually adopted
## Testing
- Created manual test in `test/manual/test_scylla_assert.cc`
- Verifies passing and failing assertions
- Tests custom error messages
- Code review passed with improvements made
## Next Steps (Future Work)
1. **Gradual Conversion**
- Convert remaining ~653 safe SCYLLA_ASSERT usages incrementally
- Prioritize high-impact code paths first
2. **Review noexcept Functions**
- Evaluate if some can be made non-noexcept
- Consider using `on_fatal_internal_error()` where appropriate
3. **Integration Testing**
- Run full test suite with conversions
- Monitor for any unexpected behavior
- Validate exception propagation
4. **Automated Analysis Tool**
- Create tool to identify safe conversion candidates
- Generate conversion patches automatically
- Track conversion progress
## Files Modified in This PR
### Core Implementation
- `utils/assert.hh` - Added scylla_assert() macro
### Conversions
- `db/large_data_handler.cc`
- `db/large_data_handler.hh`
- `db/schema_applier.cc`
- `db/system_distributed_keyspace.cc`
- `db/commitlog/commitlog_replayer.cc`
- `db/view/row_locking.cc`
- `db/size_estimates_virtual_reader.cc`
- `db/corrupt_data_handler.cc`
- `raft/tracker.cc`
- `service/topology_coordinator.cc`
- `service/storage_service.cc`
- `sstables/` (22 files across trie/, mx/, and core sstables)
### Documentation
- `docs/dev/scylla_assert_conversion.md`
- `docs/dev/unsafe_scylla_assert_locations.md`
- `test/manual/test_scylla_assert.cc`
## Conclusion
This PR establishes the infrastructure and methodology for replacing SCYLLA_ASSERT with scylla_assert() to improve cluster availability. The sample conversions demonstrate the approach, while comprehensive documentation enables future work.
**Key Achievement**: Provided a safe path forward for converting 75% (~668) of SCYLLA_ASSERT usages to exception-based assertions, while clearly documenting the 25% (~223) that must remain as crash-inducing assertions due to language constraints. Converted 112 usages as demonstration (16.8% of safe conversions), prioritizing critical files like storage_service.cc (node lifecycle) and all sstables files (data persistence), with ~556 remaining.

View File

@@ -34,8 +34,5 @@ target_link_libraries(alternator
idl
absl::headers)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(alternator REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers alternator
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -136,7 +136,6 @@ future<> controller::start_server() {
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
_config.alternator_max_users_query_size_in_trace_output,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);

View File

@@ -109,20 +109,6 @@ extern const sstring TTL_TAG_KEY("system:ttl_attribute");
// following ones are base table's keys added as needed or range key list will be empty.
static const sstring SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY("system:spurious_range_key_added_to_gsi_and_user_didnt_specify_range_key");
// The following tags also have the "system:" prefix but are NOT used
// by Alternator to store table properties - only the user ever writes to
// them, as a way to configure the table. As such, these tags are writable
// (and readable) by the user, and not hidden by tag_key_is_internal().
// The reason why both hidden (internal) and user-configurable tags share the
// same "system:" prefix is historic.
// Setting the tag with a numeric value will enable a specific initial number
// of tablets (setting the value to 0 means enabling tablets with
// an automatic selection of the best number of tablets).
// Setting this tag to any non-numeric value (e.g., an empty string or the
// word "none") will ask to disable tablets.
static constexpr auto INITIAL_TABLETS_TAG_KEY = "system:initial_tablets";
enum class table_status {
active = 0,
@@ -145,8 +131,7 @@ static std::string_view table_status_to_sstring(table_status tbl_status) {
return "UNKNOWN";
}
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type,
const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat, const db::tablets_mode_t::mode tablets_mode);
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type, const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat);
static map_type attrs_type() {
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
@@ -260,8 +245,7 @@ executor::executor(gms::gossiper& gossiper,
_mm(mm),
_sdks(sdks),
_cdc_metadata(cdc_metadata),
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization),
_warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization),
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()),
_ssg(ssg),
_parsed_expression_cache(std::make_unique<parsed::expression_cache>(
parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard},
@@ -888,7 +872,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
schema_ptr schema = get_table(_proxy, request);
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
rjson::value response = rjson::empty_object();
@@ -897,37 +881,15 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
co_return rjson::print(std::move(response));
}
// This function increments the authorization_failures counter, and may also
// log a warn-level message and/or throw an access_denied exception, depending
// on what enforce_authorization and warn_authorization are set to.
// Note that if enforce_authorization is false, this function will return
// without throwing. So a caller that doesn't want to continue after an
// authorization_error must explicitly return after calling this function.
static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) {
stats.authorization_failures++;
if (enforce_authorization) {
if (warn_authorization) {
elogger.warn("alternator_warn_authorization=true: {}", msg);
}
throw api_error::access_denied(std::move(msg));
} else {
if (warn_authorization) {
elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg);
}
}
}
// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY,
// SELECT, DROP, etc.) on the given table. When permission is denied an
// appropriate user-readable api_error::access_denied is thrown.
future<> verify_permission(
bool enforce_authorization,
bool warn_authorization,
const service::client_state& client_state,
const schema_ptr& schema,
auth::permission permission_to_check,
alternator::stats& stats) {
if (!enforce_authorization && !warn_authorization) {
auth::permission permission_to_check) {
if (!enforce_authorization) {
co_return;
}
// Unfortunately, the fix for issue #23218 did not modify the function
@@ -942,33 +904,31 @@ future<> verify_permission(
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
throw api_error::access_denied(fmt::format(
"Write access denied on internal table {}.{} to role {} because it is not a superuser",
schema->ks_name(), schema->cf_name(), username));
co_return;
}
}
auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name());
if (!client_state.user() || !client_state.user()->name ||
!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
sstring username = "<anonymous>";
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
// Using exceptions for errors makes this function faster in the
// success path (when the operation is allowed).
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
"{} access on table {}.{} is denied to role {}, client address {}",
throw api_error::access_denied(format(
"{} access on table {}.{} is denied to role {}",
auth::permissions::to_string(permission_to_check),
schema->ks_name(), schema->cf_name(), username, client_state.get_client_address()));
schema->ks_name(), schema->cf_name(), username));
}
}
// Similar to verify_permission() above, but just for CREATE operations.
// Those do not operate on any specific table, so require permissions on
// ALL KEYSPACES instead of any specific table.
static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) {
if (!enforce_authorization && !warn_authorization) {
future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) {
if (!enforce_authorization) {
co_return;
}
auto resource = auth::resource(auth::resource_kind::data);
@@ -977,7 +937,7 @@ static future<> verify_create_permission(bool enforce_authorization, bool warn_a
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
throw api_error::access_denied(format(
"CREATE access on ALL KEYSPACES is denied to role {}", username));
}
}
@@ -989,12 +949,12 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
std::string table_name = get_table_name(request);
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
tracing::add_alternator_table_name(trace_state, table_name);
tracing::add_table_name(trace_state, keyspace_name, table_name);
auto& p = _proxy.container();
schema_ptr schema = get_table(_proxy, request);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP);
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
size_t retries = mm.get_concurrent_ddl_retries();
for (;;) {
@@ -1008,8 +968,8 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
throw api_error::resource_not_found(fmt::format("Requested resource not found: Table: {} not found", table_name));
}
auto m = co_await service::prepare_column_family_drop_announcement(p.local(), keyspace_name, table_name, group0_guard.write_timestamp(), service::drop_views::yes);
auto m2 = co_await service::prepare_keyspace_drop_announcement(p.local(), keyspace_name, group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_drop_announcement(_proxy, keyspace_name, table_name, group0_guard.write_timestamp(), service::drop_views::yes);
auto m2 = co_await service::prepare_keyspace_drop_announcement(_proxy, keyspace_name, group0_guard.write_timestamp());
std::move(m2.begin(), m2.end(), std::back_inserter(m));
@@ -1246,13 +1206,12 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
// Alternator uses tags whose keys start with the "system:" prefix for
// internal purposes. Those should not be readable by ListTagsOfResource,
// nor writable with TagResource or UntagResource (see #24098).
// Only a few specific system tags, currently only "system:write_isolation"
// and "system:initial_tablets", are deliberately intended to be set and read
// by the user, so are not considered "internal".
// Only a few specific system tags, currently only system:write_isolation,
// are deliberately intended to be set and read by the user, so are not
// considered "internal".
static bool tag_key_is_internal(std::string_view tag_key) {
return tag_key.starts_with("system:")
&& tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY
&& tag_key != INITIAL_TABLETS_TAG_KEY;
return tag_key.starts_with("system:") &&
tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY;
}
enum class update_tags_action { add_tags, delete_tags };
@@ -1333,7 +1292,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
if (tags->Size() < 1) {
co_return api_error::validation("The number of tags must be at least 1") ;
}
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
});
@@ -1354,7 +1313,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++;
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
});
@@ -1557,8 +1516,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
}
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
@@ -1583,7 +1541,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
std::unordered_set<std::string> unused_attribute_definitions =
validate_attribute_definitions("", *attribute_definitions);
tracing::add_alternator_table_name(trace_state, table_name);
tracing::add_table_name(trace_state, keyspace_name, table_name);
schema_builder builder(keyspace_name, table_name);
auto [hash_key, range_key] = parse_key_schema(request, "");
@@ -1764,7 +1722,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
set_table_creation_time(tags_map, db_clock::now());
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
co_await verify_create_permission(enforce_authorization, client_state);
schema_ptr schema = builder.build();
for (auto& view_builder : view_builders) {
@@ -1785,7 +1743,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
utils::chunked_vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features());
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
if (stream_specification && stream_specification->IsObject()) {
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
@@ -1795,7 +1753,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
if (rs->uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
"If you want to use streams, create a table with vnodes by setting the tag 'system:initial_tablets' set to 'none'.");
"If you want to use streams, create a table with vnodes by setting the tag 'experimental:initial_tablets' set to 'none'.");
}
}
}
@@ -1865,10 +1823,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization);
});
}
@@ -1921,7 +1878,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
verify_billing_mode(request);
}
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()]
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
schema_ptr schema;
size_t retries = mm.get_concurrent_ddl_retries();
@@ -1930,7 +1887,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
schema_ptr tab = get_table(p.local(), request);
tracing::add_alternator_table_name(gt, tab->cf_name());
tracing::add_table_name(gt, tab->ks_name(), tab->cf_name());
// the ugly but harmless conversion to string_view here is because
// Seastar's sstring is missing a find(std::string_view) :-()
@@ -1952,7 +1909,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
if (stream_enabled->GetBool()) {
if (p.local().local_db().find_keyspace(tab->ks_name()).get_replication_strategy().uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
"If you want to enable streams, re-create this table with vnodes (with the tag 'system:initial_tablets' set to 'none').");
"If you want to enable streams, re-create this table with vnodes (with the tag 'experimental:initial_tablets' set to 'none').");
}
if (tab->cdc_options().enabled()) {
co_return api_error::validation("Table already has an enabled stream: TableName: " + tab->cf_name());
@@ -2092,7 +2049,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified");
}
co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats);
co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER);
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
for (view_ptr view : new_views) {
auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp());
@@ -2624,14 +2581,14 @@ std::optional<service::cas_shard> rmw_operation::shard_for_execute(bool needs_re
// Build the return value from the different RMW operations (UpdateItem,
// PutItem, DeleteItem). All these return nothing by default, but can
// optionally return Attributes if requested via the ReturnValues option.
static executor::request_return_type rmw_operation_return(rjson::value&& attributes, const consumed_capacity_counter& consumed_capacity, uint64_t& metric) {
static future<executor::request_return_type> rmw_operation_return(rjson::value&& attributes, const consumed_capacity_counter& consumed_capacity, uint64_t& metric) {
rjson::value ret = rjson::empty_object();
consumed_capacity.add_consumed_capacity_to_response_if_needed(ret);
metric += consumed_capacity.get_consumed_capacity_units();
if (!attributes.IsNull()) {
rjson::add(ret, "Attributes", std::move(attributes));
}
return rjson::print(std::move(ret));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
}
static future<std::unique_ptr<rjson::value>> get_previous_item(
@@ -2697,10 +2654,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
stats& global_stats,
stats& per_table_stats,
uint64_t& wcu_total) {
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = schema()->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
auto cdc_opts = cdc::per_request_options{};
if (needs_read_before_write) {
if (_write_isolation == write_isolation::FORBID_RMW) {
throw api_error::validation("Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
@@ -2731,6 +2685,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
if (!cas_shard) {
on_internal_error(elogger, "cas_shard is not set");
}
// If we're still here, we need to do this write using LWT:
global_stats.write_using_lwt++;
per_table_stats.write_using_lwt++;
@@ -2745,7 +2700,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
if (!is_applied) {
return make_ready_future<executor::request_return_type>(api_error::conditional_check_failed("The conditional request failed", std::move(_return_attributes)));
}
return make_ready_future<executor::request_return_type>(rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total));
return rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total);
});
}
@@ -2859,10 +2814,10 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
elogger.trace("put_item {}", request);
auto op = make_shared<put_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -2963,10 +2918,10 @@ future<executor::request_return_type> executor::delete_item(client_state& client
auto op = make_shared<delete_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *(op->schema()));
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -3057,9 +3012,6 @@ static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, serv
auto timeout = executor::default_timeout();
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
@@ -3110,10 +3062,8 @@ static future<> do_batch_write(service::storage_proxy& proxy,
utils::chunked_vector<mutation> mutations;
mutations.reserve(mutation_builders.size());
api::timestamp_type now = api::new_timestamp();
bool any_cdc_enabled = false;
for (auto& b : mutation_builders) {
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
@@ -3122,10 +3072,7 @@ static future<> do_batch_write(service::storage_proxy& proxy,
std::move(permit),
db::allow_per_partition_rate_limit::yes,
false,
cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
});
cdc::per_request_options{});
} else {
// Do the write via LWT:
// Multiple mutations may be destined for the same partition, adding
@@ -3215,7 +3162,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
per_table_stats->api_operations.batch_write_item++;
per_table_stats->api_operations.batch_write_item_batch_total += it->value.Size();
per_table_stats->api_operations.batch_write_item_histogram.add(it->value.Size());
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
1, primary_key_hash{schema}, primary_key_equal{schema});
@@ -3252,7 +3199,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema));
}
for (const auto& b : mutation_builders) {
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats);
co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY);
}
// If alternator_force_read_before_write is true we will first get the previous item size
// and only then do send the mutation.
@@ -4475,10 +4422,10 @@ future<executor::request_return_type> executor::update_item(client_state& client
elogger.trace("update_item {}", request);
auto op = make_shared<update_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -4556,7 +4503,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
schema_ptr schema = get_table(_proxy, request);
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *schema);
per_table_stats->api_operations.get_item++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value& query_key = request["Key"];
db::consistency_level cl = get_read_consistency(request);
@@ -4589,7 +4536,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem");
rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
service::storage_proxy::coordinator_query_result qr =
co_await _proxy.query(
schema, std::move(command), std::move(partition_ranges), cl,
@@ -4705,7 +4652,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
uint batch_size = 0;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs(get_table_from_batch_request(_proxy, it));
tracing::add_alternator_table_name(trace_state, rs.schema->cf_name());
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
rs.cl = get_read_consistency(it->value);
std::unordered_set<std::string> used_attribute_names;
rs.attrs_to_get = ::make_shared<const std::optional<attrs_to_get>>(calculate_attrs_to_get(it->value, *_parsed_expression_cache, used_attribute_names));
@@ -4721,7 +4668,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
}
for (const table_requests& tr : requests) {
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats);
co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT);
}
_stats.api_operations.batch_get_item_batch_total += batch_size;
@@ -5141,15 +5088,13 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
}
auto pos = paging_state.get_position_in_partition();
if (pos.has_key()) {
// Alternator itself allows at most one column in clustering key, but
// user can use Alternator api to access system tables which might have
// multiple clustering key columns. So we need to handle that case here.
auto cdef_it = schema.clustering_key_columns().begin();
for(const auto &exploded_ck : pos.key().explode()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef_it->name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef_it->name_as_text()];
rjson::add_with_string_name(key_entry, type_to_string(cdef_it->type), json_key_column_value(exploded_ck, *cdef_it));
++cdef_it;
auto exploded_ck = pos.key().explode();
auto exploded_ck_it = exploded_ck.begin();
for (const column_definition& cdef : schema.clustering_key_columns()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()];
rjson::add_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_ck_it, cdef));
++exploded_ck_it;
}
}
// To avoid possible conflicts (and thus having to reserve these names) we
@@ -5183,11 +5128,10 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
filter filter,
query::partition_slice::option_set custom_opts,
service::client_state& client_state,
alternator::stats& stats,
cql3::cql_stats& cql_stats,
tracing::trace_state_ptr trace_state,
service_permit permit,
bool enforce_authorization,
bool warn_authorization) {
bool enforce_authorization) {
lw_shared_ptr<service::pager::paging_state> old_paging_state = nullptr;
tracing::trace(trace_state, "Performing a database query");
@@ -5214,7 +5158,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
old_paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
}
co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats);
co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT);
auto regular_columns =
table_schema->regular_columns() | std::views::transform(&column_definition::id)
@@ -5250,9 +5194,9 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state));
}
if (has_filter) {
stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
// update our "filtered_row_matched_total" for all the rows matched, despited the filter
stats.cql_stats.filtered_rows_matched_total += size;
cql_stats.filtered_rows_matched_total += size;
}
if (opt_items) {
if (opt_items->size() >= max_items_for_rapidjson_array) {
@@ -5309,7 +5253,6 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
elogger.trace("Scanning {}", request);
auto [schema, table_type] = get_table_or_view(_proxy, request);
tracing::add_alternator_table_name(trace_state, schema->cf_name());
get_stats_from_schema(_proxy, *schema)->api_operations.scan++;
auto segment = get_int_attribute(request, "Segment");
auto total_segments = get_int_attribute(request, "TotalSegments");
@@ -5377,7 +5320,7 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan");
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization);
std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization);
}
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
@@ -5789,7 +5732,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
auto [schema, table_type] = get_table_or_view(_proxy, request);
get_stats_from_schema(_proxy, *schema)->api_operations.query++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey");
db::consistency_level cl = get_read_consistency(request);
@@ -5858,7 +5801,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
query::partition_slice::option_set opts;
opts.set_if<query::partition_slice::option::reversed>(!forward);
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization);
std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization);
}
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {
@@ -5986,20 +5929,22 @@ future<executor::request_return_type> executor::describe_continuous_backups(clie
// of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3.
// A smaller cluster (presumably, a test only), gets RF=1. The user may
// manually create the keyspace to override this predefined behavior.
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type ts,
const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat, const db::tablets_mode_t::mode tablets_mode) {
// Whether to use tablets for the table (actually for the keyspace of the
// table) is determined by tablets_mode (taken from the configuration
// option "tablets_mode_for_new_keyspaces"), as well as the presence and
// the value of a per-table tag system:initial_tablets
// (INITIAL_TABLETS_TAG_KEY).
// Setting the tag with a numeric value will enable a specific initial number
// of tablets (setting the value to 0 means enabling tablets with
// an automatic selection of the best number of tablets).
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type ts, const std::map<sstring, sstring>& tags_map, const gms::feature_service& feat) {
// Even if the "tablets" experimental feature is available, we currently
// do not enable tablets by default on Alternator tables because LWT is
// not yet fully supported with tablets.
// The user can override the choice of whether or not to use tablets at
// table-creation time by supplying the following tag with a numeric value
// (setting the value to 0 means enabling tablets with automatic selection
// of the best number of tablets).
// Setting this tag to any non-numeric value (e.g., an empty string or the
// word "none") will ask to disable tablets.
// When vnodes are asked for by the tag value, but tablets are enforced by config,
// throw an exception to the client.
// If we make this tag a permanent feature, it will get a "system:" prefix -
// until then we give it the "experimental:" prefix to not commit to it.
static constexpr auto INITIAL_TABLETS_TAG_KEY = "experimental:initial_tablets";
// initial_tablets currently defaults to unset, so tablets will not be
// used by default on new Alternator tables. Change this initialization
// to 0 enable tablets by default, with automatic number of tablets.
std::optional<unsigned> initial_tablets;
if (feat.tablets) {
auto it = tags_map.find(INITIAL_TABLETS_TAG_KEY);
@@ -6010,20 +5955,7 @@ static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_vie
try {
initial_tablets = std::stol(tags_map.at(INITIAL_TABLETS_TAG_KEY));
} catch (...) {
if (tablets_mode == db::tablets_mode_t::mode::enforced) {
throw api_error::validation(format("Tag {} containing non-numerical value requests vnodes, but vnodes are forbidden by configuration option `tablets_mode_for_new_keyspaces: enforced`", INITIAL_TABLETS_TAG_KEY));
}
initial_tablets = std::nullopt;
elogger.trace("Following {} tag containing non-numerical value, Alternator will attempt to create a keyspace {} with vnodes.", INITIAL_TABLETS_TAG_KEY, keyspace_name);
}
} else {
// No per-table tag present, use the value from config
if (tablets_mode == db::tablets_mode_t::mode::enabled || tablets_mode == db::tablets_mode_t::mode::enforced) {
initial_tablets = 0;
elogger.trace("Following the `tablets_mode_for_new_keyspaces` flag from the settings, Alternator will attempt to create a keyspace {} with tablets.", keyspace_name);
} else {
initial_tablets = std::nullopt;
elogger.trace("Following the `tablets_mode_for_new_keyspaces` flag from the settings, Alternator will attempt to create a keyspace {} with vnodes.", keyspace_name);
}
}
}

View File

@@ -139,7 +139,6 @@ class executor : public peering_sharded_service<executor> {
db::system_distributed_keyspace& _sdks;
cdc::metadata& _cdc_metadata;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
// An smp_service_group to be used for limiting the concurrency when
// forwarding Alternator request between shards - if necessary for LWT.
smp_service_group _ssg;
@@ -265,7 +264,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000);
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
// SELECT, DROP, etc.) on the given table. When permission is denied an
// appropriate user-readable api_error::access_denied is thrown.
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
/**
* Make return type for serializing the object "streamed",

View File

@@ -282,23 +282,15 @@ std::string type_to_string(data_type type) {
return it->second;
}
std::optional<bytes> try_get_key_column_value(const rjson::value& item, const column_definition& column) {
bytes get_key_column_value(const rjson::value& item, const column_definition& column) {
std::string column_name = column.name_as_text();
const rjson::value* key_typed_value = rjson::find(item, column_name);
if (!key_typed_value) {
return std::nullopt;
throw api_error::validation(fmt::format("Key column {} not found", column_name));
}
return get_key_from_typed_value(*key_typed_value, column);
}
bytes get_key_column_value(const rjson::value& item, const column_definition& column) {
auto value = try_get_key_column_value(item, column);
if (!value) {
throw api_error::validation(fmt::format("Key column {} not found", column.name_as_text()));
}
return std::move(*value);
}
// Parses the JSON encoding for a key value, which is a map with a single
// entry whose key is the type and the value is the encoded value.
// If this type does not match the desired "type_str", an api_error::validation
@@ -388,38 +380,20 @@ clustering_key ck_from_json(const rjson::value& item, schema_ptr schema) {
return clustering_key::make_empty();
}
std::vector<bytes> raw_ck;
// Note: it's possible to get more than one clustering column here, as
// Alternator can be used to read scylla internal tables.
// FIXME: this is a loop, but we really allow only one clustering key column.
for (const column_definition& cdef : schema->clustering_key_columns()) {
auto raw_value = get_key_column_value(item, cdef);
bytes raw_value = get_key_column_value(item, cdef);
raw_ck.push_back(std::move(raw_value));
}
return clustering_key::from_exploded(raw_ck);
}
clustering_key_prefix ck_prefix_from_json(const rjson::value& item, schema_ptr schema) {
if (schema->clustering_key_size() == 0) {
return clustering_key_prefix::make_empty();
}
std::vector<bytes> raw_ck;
for (const column_definition& cdef : schema->clustering_key_columns()) {
auto raw_value = try_get_key_column_value(item, cdef);
if (!raw_value) {
break;
}
raw_ck.push_back(std::move(*raw_value));
}
return clustering_key_prefix::from_exploded(raw_ck);
}
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema) {
const bool is_alternator_ks = is_alternator_keyspace(schema->ks_name());
if (is_alternator_ks) {
return position_in_partition::for_key(ck_from_json(item, schema));
auto ck = ck_from_json(item, schema);
if (is_alternator_keyspace(schema->ks_name())) {
return position_in_partition::for_key(std::move(ck));
}
const auto region_item = rjson::find(item, scylla_paging_region);
const auto weight_item = rjson::find(item, scylla_paging_weight);
if (bool(region_item) != bool(weight_item)) {
@@ -439,9 +413,8 @@ position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema)
} else {
throw std::runtime_error(fmt::format("Invalid value for weight: {}", weight_view));
}
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(ck_prefix_from_json(item, schema)) : std::nullopt);
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(std::move(ck)) : std::nullopt);
}
auto ck = ck_from_json(item, schema);
if (ck.is_empty()) {
return position_in_partition::for_partition_start();
}

View File

@@ -13,7 +13,6 @@
#include <seastar/http/function_handlers.hh>
#include <seastar/http/short_streams.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/short_streams.hh>
#include "seastarx.hh"
@@ -32,8 +31,6 @@
#include "utils/overloaded_functor.hh"
#include "utils/aws_sigv4.hh"
#include "client_data.hh"
#include "utils/updateable_value.hh"
#include <zlib.h>
static logging::logger slogger("alternator-server");
@@ -273,57 +270,24 @@ protected:
}
};
// This function increments the authentication_failures counter, and may also
// log a warn-level message and/or throw an exception, depending on what
// enforce_authorization and warn_authorization are set to.
// The username and client address are only used for logging purposes -
// they are not included in the error message returned to the client, since
// the client knows who it is.
// Note that if enforce_authorization is false, this function will return
// without throwing. So a caller that doesn't want to continue after an
// authentication_error must explicitly return after calling this function.
template<typename Exception>
static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) {
stats.authentication_failures++;
if (enforce_authorization) {
if (warn_authorization) {
slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address);
}
throw std::move(e);
} else {
if (warn_authorization) {
slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address);
}
}
}
future<std::string> server::verify_signature(const request& req, const chunked_content& content) {
if (!_enforce_authorization.get() && !_warn_authorization.get()) {
if (!_enforce_authorization) {
slogger.debug("Skipping authorization");
return make_ready_future<std::string>();
}
auto host_it = req._headers.find("Host");
if (host_it == req._headers.end()) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature("Host header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::invalid_signature("Host header is mandatory for signature verification");
}
auto authorization_it = req._headers.find("Authorization");
if (authorization_it == req._headers.end()) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::missing_authentication_token("Authorization header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
}
std::string host = host_it->second;
std::string_view authorization_header = authorization_it->second;
auto pos = authorization_header.find_first_of(' ');
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)),
"", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
}
authorization_header.remove_prefix(pos+1);
std::string credential;
@@ -358,9 +322,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential));
}
std::string user(credential_split[0]);
std::string datestamp(credential_split[1]);
@@ -384,7 +346,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
return get_key_from_roles(proxy, as, std::move(username));
};
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
user = std::move(user),
host = std::move(host),
datestamp = std::move(datestamp),
@@ -392,32 +354,18 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
signed_headers_map = std::move(signed_headers_map),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
key_cache::value_ptr key_ptr(nullptr);
try {
key_ptr = key_ptr_fut.get();
} catch (const api_error& e) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
e, user, req.get_client_address());
return std::string();
}
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
std::string signature;
try {
signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method,
datestamp, signed_headers_str, signed_headers_map, &content, region, service, "");
} catch (const std::exception& e) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())),
user, req.get_client_address());
return std::string();
throw api_error::invalid_signature(e.what());
}
if (signature != std::string_view(user_signature)) {
_key_cache.remove(user);
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::unrecognized_client("wrong signature"),
user, req.get_client_address());
return std::string();
throw api_error::unrecognized_client("The security token included in the request is invalid.");
}
return user;
});
@@ -553,106 +501,6 @@ read_entire_stream(input_stream<char>& inp, size_t length_limit) {
co_return ret;
}
// safe_gzip_stream is an exception-safe wrapper for zlib's z_stream.
// The "z_stream" struct is used by zlib to hold state while decompressing a
// stream of data. It allocates memory which must be freed with inflateEnd(),
// which the destructor of this class does.
class safe_gzip_zstream {
z_stream _zs;
public:
safe_gzip_zstream() {
memset(&_zs, 0, sizeof(_zs));
// The strange 16 + WMAX_BITS tells zlib to expect and decode
// a gzip header, not a zlib header.
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
}
~safe_gzip_zstream() {
inflateEnd(&_zs);
}
z_stream* operator->() {
return &_zs;
}
z_stream* get() {
return &_zs;
}
void reset() {
inflateReset(&_zs);
}
};
// ungzip() takes a chunked_content with a gzip-compressed request body,
// uncompresses it, and returns the uncompressed content as a chunked_content.
// If the uncompressed content exceeds length_limit, an error is thrown.
static future<chunked_content>
ungzip(chunked_content&& compressed_body, size_t length_limit) {
chunked_content ret;
// output_buf can be any size - when uncompressing input_buf, it doesn't
// need to fit in a single output_buf, we'll use multiple output_buf for
// a single input_buf if needed.
constexpr size_t OUTPUT_BUF_SIZE = 4096;
temporary_buffer<char> output_buf;
safe_gzip_zstream strm;
bool complete_stream = false; // empty input is not a valid gzip
size_t total_out_bytes = 0;
for (const temporary_buffer<char>& input_buf : compressed_body) {
if (input_buf.empty()) {
continue;
}
complete_stream = false;
strm->next_in = (Bytef*) input_buf.get();
strm->avail_in = (uInt) input_buf.size();
do {
co_await coroutine::maybe_yield();
if (output_buf.empty()) {
output_buf = temporary_buffer<char>(OUTPUT_BUF_SIZE);
}
strm->next_out = (Bytef*) output_buf.get();
strm->avail_out = OUTPUT_BUF_SIZE;
int e = inflate(strm.get(), Z_NO_FLUSH);
size_t out_bytes = OUTPUT_BUF_SIZE - strm->avail_out;
if (out_bytes > 0) {
// If output_buf is nearly full, we save it as-is in ret. But
// if it only has little data, better copy to a small buffer.
if (out_bytes > OUTPUT_BUF_SIZE/2) {
ret.push_back(std::move(output_buf).prefix(out_bytes));
// output_buf is now empty. if this loop finds more input,
// we'll allocate a new output buffer.
} else {
ret.push_back(temporary_buffer<char>(output_buf.get(), out_bytes));
}
total_out_bytes += out_bytes;
if (total_out_bytes > length_limit) {
throw api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", length_limit));
}
}
if (e == Z_STREAM_END) {
// There may be more input after the first gzip stream - in
// either this input_buf or the next one. The additional input
// should be a second concatenated gzip. We need to allow that
// by resetting the gzip stream and continuing the input loop
// until there's no more input.
strm.reset();
if (strm->avail_in == 0) {
complete_stream = true;
break;
}
} else if (e != Z_OK && e != Z_BUF_ERROR) {
// DynamoDB returns an InternalServerError when given a bad
// gzip request body. See test test_broken_gzip_content
throw api_error::internal("Error during gzip decompression of request body");
}
} while (strm->avail_in > 0 || strm->avail_out == 0);
}
if (!complete_stream) {
// The gzip stream was not properly finished with Z_STREAM_END
throw api_error::internal("Truncated gzip in request body");
}
co_return ret;
}
future<executor::request_return_type> server::handle_api_request(std::unique_ptr<request> req) {
_executor._stats.total_operations++;
sstring target = req->get_header("X-Amz-Target");
@@ -690,21 +538,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
units.return_units(mem_estimate - new_mem_estimate);
}
auto username = co_await verify_signature(*req, content);
// If the request is compressed, uncompress it now, after we checked
// the signature (the signature is computed on the compressed content).
// We apply the request_content_length_limit again to the uncompressed
// content - we don't want to allow a tiny compressed request to
// expand to a huge uncompressed request.
sstring content_encoding = req->get_header("Content-Encoding");
if (content_encoding == "gzip") {
content = co_await ungzip(std::move(content), request_content_length_limit);
} else if (!content_encoding.empty()) {
// DynamoDB returns a 500 error for unsupported Content-Encoding.
// I'm not sure if this is the best error code, but let's do it too.
// See the test test_garbage_content_encoding confirming this case.
co_return api_error::internal("Unsupported Content-Encoding");
}
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
@@ -785,6 +618,7 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
, _auth_service(auth_service)
, _sl_controller(sl_controller)
, _key_cache(1024, 1min, slogger)
, _enforce_authorization(false)
, _max_users_query_size_in_trace_output(1024)
, _enabled_servers{}
, _pending_requests("alternator::server::pending_requests")
@@ -866,11 +700,10 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
_enforce_authorization = std::move(enforce_authorization);
_warn_authorization = std::move(warn_authorization);
_max_concurrent_requests = std::move(max_concurrent_requests);
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
if (!port && !https_port) {

View File

@@ -47,7 +47,6 @@ class server : public peering_sharded_service<server> {
key_cache _key_cache;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
utils::updateable_value<uint64_t> _max_users_query_size_in_trace_output;
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server>, 2> _enabled_servers;
named_gate _pending_requests;
@@ -100,7 +99,7 @@ public:
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
// get_client_data() is called (on each shard separately) when the virtual

View File

@@ -188,16 +188,6 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses,
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty()
});
// Only register the following metrics for the global metrics, not per-table
if (!has_table) {
metrics.add_group("alternator", {
seastar::metrics::make_counter("authentication_failures", stats.authentication_failures,
seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_counter("authorization_failures", stats.authorization_failures,
seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
});
}
}
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) {

View File

@@ -105,17 +105,6 @@ public:
// The sizes are the the written items' sizes grouped per table.
utils::estimated_histogram batch_write_item_op_size_kb{30};
} operation_sizes;
// Count of authentication and authorization failures, counted if either
// alternator_enforce_authorization or alternator_warn_authorization are
// set to true. If both are false, no authentication or authorization
// checks are performed, so failures are not recognized or counted.
// "authentication" failure means the request was not signed with a valid
// user and key combination. "authorization" failure means the request was
// authenticated to a valid user - but this user did not have permissions
// to perform the operation (considering RBAC settings and the user's
// superuser status).
uint64_t authentication_failures = 0;
uint64_t authorization_failures = 0;
// Miscellaneous event counters
uint64_t total_operations = 0;
uint64_t unsupported_operations = 0;

View File

@@ -827,7 +827,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
partition_key pk = iter.shard.id.to_partition_key(*schema);
@@ -1073,7 +1073,9 @@ bool executor::add_stream_options(const rjson::value& stream_specification, sche
}
if (stream_enabled->GetBool()) {
if (!sp.features().alternator_streams) {
auto db = sp.data_dictionary();
if (!db.features().alternator_streams) {
throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster.");
}

View File

@@ -68,7 +68,7 @@ extern const sstring TTL_TAG_KEY;
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.update_time_to_live++;
if (!_proxy.features().alternator_ttl) {
if (!_proxy.data_dictionary().features().alternator_ttl) {
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
}
@@ -95,7 +95,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {
@@ -753,7 +753,7 @@ static future<bool> scan_table(
auto my_host_id = erm->get_topology().my_host_id();
const auto &tablet_map = erm->get_token_metadata().tablets().get_tablet_map(s->id());
for (std::optional tablet = tablet_map.first_tablet(); tablet; tablet = tablet_map.next_tablet(*tablet)) {
auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet, erm->get_topology());
auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet);
// check if this is the primary replica for the current tablet
if (tablet_primary_replica.host == my_host_id && tablet_primary_replica.shard == this_shard_id()) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);

View File

@@ -106,8 +106,5 @@ target_link_libraries(api
wasmtime_bindings
absl::headers)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(api REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers api
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -220,25 +220,6 @@
}
]
},
{
"path":"/storage_service/nodes/excluded",
"operations":[
{
"method":"GET",
"summary":"Retrieve host ids of nodes which are marked as excluded",
"type":"array",
"items":{
"type":"string"
},
"nickname":"get_excluded_nodes",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/storage_service/nodes/joining",
"operations":[
@@ -961,14 +942,6 @@
"type":"string",
"paramType":"query",
"enum": ["all", "dc", "rack", "node"]
},
{
"name":"primary_replica_only",
"description":"Load the sstables and stream to the primary replica node within the scope, if one is specified. If not, stream to the global primary replica.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
@@ -1055,7 +1028,7 @@
]
},
{
"path":"/storage_service/cleanup_all/",
"path":"/storage_service/cleanup_all",
"operations":[
{
"method":"POST",
@@ -1065,30 +1038,6 @@
"produces":[
"application/json"
],
"parameters":[
{
"name":"global",
"description":"true if cleanup of entire cluster is requested",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/mark_node_as_clean",
"operations":[
{
"method":"POST",
"summary":"Mark the node as clean. After that the node will not be considered as needing cleanup during automatic cleanup which is triggered by some topology operations",
"type":"void",
"nickname":"reset_cleanup_needed",
"produces":[
"application/json"
],
"parameters":[]
}
]
@@ -1622,30 +1571,6 @@
}
]
},
{
"path":"/storage_service/exclude_node",
"operations":[
{
"method":"POST",
"summary":"Marks the node as permanently down (excluded).",
"type":"void",
"nickname":"exclude_node",
"produces":[
"application/json"
],
"parameters":[
{
"name":"hosts",
"description":"Comma-separated list of host ids to exclude",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/removal_status",
"operations":[

View File

@@ -42,14 +42,6 @@
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"consider_only_existing_data",
"description":"Set to \"true\" to flush all memtables and force tombstone garbage collection to check only the sstables being compacted (false by default). The memtable, commitlog and other uncompacted sstables will not be checked during tombstone garbage collection.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}

View File

@@ -66,13 +66,6 @@ static future<json::json_return_type> get_cf_stats(sharded<replica::database>&
}, std::plus<int64_t>());
}
static future<json::json_return_type> get_cf_stats(sharded<replica::database>& db,
std::function<int64_t(const replica::column_family_stats&)> f) {
return map_reduce_cf(db, int64_t(0), [f](const replica::column_family& cf) {
return f(cf.get_stats());
}, std::plus<int64_t>());
}
static future<json::json_return_type> for_tables_on_all_shards(sharded<replica::database>& db, std::vector<table_info> tables, std::function<future<>(replica::table&)> set) {
return do_with(std::move(tables), [&db, set] (const std::vector<table_info>& tables) {
return db.invoke_on_all([&tables, set] (replica::database& db) {
@@ -1073,14 +1066,10 @@ void set_column_family(http_context& ctx, routes& r, sharded<replica::database>&
});
ss::get_load.set(r, [&db] (std::unique_ptr<http::request> req) {
return get_cf_stats(db, [](const replica::column_family_stats& stats) {
return stats.live_disk_space_used.on_disk;
});
return get_cf_stats(db, &replica::column_family_stats::live_disk_space_used);
});
ss::get_metrics_load.set(r, [&db] (std::unique_ptr<http::request> req) {
return get_cf_stats(db, [](const replica::column_family_stats& stats) {
return stats.live_disk_space_used.on_disk;
});
return get_cf_stats(db, &replica::column_family_stats::live_disk_space_used);
});
ss::get_keyspaces.set(r, [&db] (const_req req) {

View File

@@ -20,7 +20,6 @@
#include "utils/hash.hh"
#include <optional>
#include <sstream>
#include <stdexcept>
#include <time.h>
#include <algorithm>
#include <functional>
@@ -505,7 +504,6 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto bucket = req->get_query_param("bucket");
auto prefix = req->get_query_param("prefix");
auto scope = parse_stream_scope(req->get_query_param("scope"));
auto primary_replica_only = validate_bool_x(req->get_query_param("primary_replica_only"), false);
rjson::chunked_content content = co_await util::read_entire_stream(*req->content_stream);
rjson::value parsed = rjson::parse(std::move(content));
@@ -515,7 +513,7 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto sstables = parsed.GetArray() |
std::views::transform([] (const auto& s) { return sstring(rjson::to_string_view(s)); }) |
std::ranges::to<std::vector>();
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket, scope, primary_replica_only);
auto task_id = co_await sst_loader.local().download_new_sstables(keyspace, table, prefix, std::move(sstables), endpoint, bucket, scope);
co_return json::json_return_type(fmt::to_string(task_id));
});
@@ -765,14 +763,8 @@ rest_cdc_streams_check_and_repair(sharded<service::storage_service>& ss, std::un
static
future<json::json_return_type>
rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
bool global = true;
if (auto global_param = req->get_query_param("global"); !global_param.empty()) {
global = validate_bool(global_param);
}
apilog.info("cleanup_all global={}", global);
auto done = !global ? false : co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
apilog.info("cleanup_all");
auto done = co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<bool> {
if (!ss.is_topology_coordinator_enabled()) {
co_return false;
}
@@ -782,35 +774,14 @@ rest_cleanup_all(http_context& ctx, sharded<service::storage_service>& ss, std::
if (done) {
co_return json::json_return_type(0);
}
// fall back to the local cleanup if topology coordinator is not enabled or local cleanup is requested
// fall back to the local global cleanup if topology coordinator is not enabled
auto& db = ctx.db;
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::global_cleanup_compaction_task_impl>({}, db);
co_await task->done();
// Mark this node as clean
co_await ss.invoke_on(0, [] (service::storage_service& ss) -> future<> {
if (ss.is_topology_coordinator_enabled()) {
co_await ss.reset_cleanup_needed();
}
});
co_return json::json_return_type(0);
}
static
future<json::json_return_type>
rest_reset_cleanup_needed(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
apilog.info("reset_cleanup_needed");
co_await ss.invoke_on(0, [] (service::storage_service& ss) {
if (!ss.is_topology_coordinator_enabled()) {
throw std::runtime_error("mark_node_as_clean is only supported when topology over raft is enabled");
}
return ss.reset_cleanup_needed();
});
co_return json_void();
}
static
future<json::json_return_type>
rest_force_flush(http_context& ctx, std::unique_ptr<http::request> req) {
@@ -873,25 +844,6 @@ rest_remove_node(sharded<service::storage_service>& ss, std::unique_ptr<http::re
});
}
static
future<json::json_return_type>
rest_exclude_node(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto hosts = utils::split_comma_separated_list(req->get_query_param("hosts"))
| std::views::transform([] (const sstring& s) { return locator::host_id(utils::UUID(s)); })
| std::ranges::to<std::vector<locator::host_id>>();
auto& topo = ss.local().get_token_metadata().get_topology();
for (auto host : hosts) {
if (!topo.has_node(host)) {
throw bad_param_exception(fmt::format("Host ID {} does not belong to this cluster", host));
}
}
apilog.info("exclude_node: hosts={}", hosts);
co_await ss.local().mark_excluded(hosts);
co_return json_void();
}
static
future<json::json_return_type>
rest_get_removal_status(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
@@ -1812,13 +1764,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::get_natural_endpoints_v2.set(r, rest_bind(rest_get_natural_endpoints_v2, ctx, ss));
ss::cdc_streams_check_and_repair.set(r, rest_bind(rest_cdc_streams_check_and_repair, ss));
ss::cleanup_all.set(r, rest_bind(rest_cleanup_all, ctx, ss));
ss::reset_cleanup_needed.set(r, rest_bind(rest_reset_cleanup_needed, ctx, ss));
ss::force_flush.set(r, rest_bind(rest_force_flush, ctx));
ss::force_keyspace_flush.set(r, rest_bind(rest_force_keyspace_flush, ctx));
ss::decommission.set(r, rest_bind(rest_decommission, ss));
ss::move.set(r, rest_bind(rest_move, ss));
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss));
ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss));
ss::set_logging_level.set(r, rest_bind(rest_set_logging_level));
@@ -1891,13 +1841,11 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::get_natural_endpoints.unset(r);
ss::cdc_streams_check_and_repair.unset(r);
ss::cleanup_all.unset(r);
ss::reset_cleanup_needed.unset(r);
ss::force_flush.unset(r);
ss::force_keyspace_flush.unset(r);
ss::decommission.unset(r);
ss::move.unset(r);
ss::remove_node.unset(r);
ss::exclude_node.unset(r);
ss::get_removal_status.unset(r);
ss::force_remove_completion.unset(r);
ss::set_logging_level.unset(r);

View File

@@ -38,78 +38,76 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
};
}
static future<shared_ptr<compaction::major_keyspace_compaction_task_impl>> force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush && !consider_only_existing_data) {
fmopt = compaction::flush_mode::skip;
}
return compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
}
static future<shared_ptr<compaction::upgrade_sstables_compaction_task_impl>> upgrade_sstables(http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) {
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
return compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
}
static future<shared_ptr<compaction::cleanup_keyspace_compaction_task_impl>> force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
if (rs.is_local() || !rs.is_vnode_based()) {
auto reason = rs.is_local() ? "require" : "support";
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
co_return nullptr;
}
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
co_return co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
}
void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& snap_ctl) {
t::force_keyspace_compaction_async.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
auto& db = ctx.db;
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
apilog.debug("force_keyspace_compaction_async: keyspace={} tables={}, flush={}", keyspace, table_infos, flush);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush) {
fmopt = compaction::flush_mode::skip;
}
auto task = co_await compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt);
co_return json::json_return_type(task->get_status().id.to_sstring());
});
ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
auto& db = ctx.db;
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush && !consider_only_existing_data) {
fmopt = compaction::flush_mode::skip;
}
auto task = co_await compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
co_await task->done();
co_return json_void();
});
t::force_keyspace_cleanup_async.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
tasks::task_id id = tasks::task_id::create_null_id();
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
id = task->get_status().id;
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
apilog.info("force_keyspace_cleanup_async: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup_async: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
co_return json::json_return_type(id.to_sstring());
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>({}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
co_return json::json_return_type(task->get_status().id.to_sstring());
});
ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
co_await task->done();
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
if (rs.is_local() || !rs.is_vnode_based()) {
auto reason = rs.is_local() ? "require" : "support";
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
co_return json::json_return_type(0);
}
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
co_await task->done();
co_return json::json_return_type(0);
});
@@ -131,12 +129,25 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
}));
t::upgrade_sstables_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
co_return json::json_return_type(task->get_status().id.to_sstring());
}));
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
co_await task->done();
co_return json::json_return_type(0);
}));

View File

@@ -62,17 +62,6 @@ void set_token_metadata(http_context& ctx, routes& r, sharded<locator::shared_to
return addr | std::ranges::to<std::vector>();
});
ss::get_excluded_nodes.set(r, [&tm](const_req req) {
const auto& local_tm = *tm.local().get();
std::vector<sstring> eps;
local_tm.get_topology().for_each_node([&] (auto& node) {
if (node.is_excluded()) {
eps.push_back(node.host_id().to_sstring());
}
});
return eps;
});
ss::get_joining_nodes.set(r, [&tm, &g](const_req req) {
const auto& local_tm = *tm.local().get();
const auto& points = local_tm.get_bootstrap_tokens();
@@ -141,7 +130,6 @@ void unset_token_metadata(http_context& ctx, routes& r) {
ss::get_leaving_nodes.unset(r);
ss::get_moving_nodes.unset(r);
ss::get_joining_nodes.unset(r);
ss::get_excluded_nodes.unset(r);
ss::get_host_id_map.unset(r);
httpd::endpoint_snitch_info_json::get_datacenter.unset(r);
httpd::endpoint_snitch_info_json::get_rack.unset(r);

View File

@@ -5,7 +5,6 @@ target_sources(scylla_audit
PRIVATE
audit.cc
audit_cf_storage_helper.cc
audit_composite_storage_helper.cc
audit_syslog_storage_helper.cc)
target_include_directories(scylla_audit
PUBLIC
@@ -17,7 +16,4 @@ target_link_libraries(scylla_audit
PRIVATE
cql3)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(scylla_audit REUSE_FROM scylla-precompiled-header)
endif()
add_whole_archive(audit scylla_audit)

View File

@@ -13,11 +13,9 @@
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "storage_helper.hh"
#include "audit_cf_storage_helper.hh"
#include "audit_syslog_storage_helper.hh"
#include "audit_composite_storage_helper.hh"
#include "audit.hh"
#include "../db/config.hh"
#include "utils/class_registrator.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
@@ -28,47 +26,6 @@ namespace audit {
logging::logger logger("audit");
static std::set<sstring> parse_audit_modes(const sstring& data) {
std::set<sstring> result;
if (!data.empty()) {
std::vector<sstring> audit_modes;
boost::split(audit_modes, data, boost::is_any_of(","));
if (audit_modes.empty()) {
return {};
}
for (sstring& audit_mode : audit_modes) {
boost::trim(audit_mode);
if (audit_mode == "none") {
return {};
}
if (audit_mode != "table" && audit_mode != "syslog") {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", audit_mode));
}
result.insert(std::move(audit_mode));
}
}
return result;
}
static std::unique_ptr<storage_helper> create_storage_helper(const std::set<sstring>& audit_modes, cql3::query_processor& qp, service::migration_manager& mm) {
SCYLLA_ASSERT(!audit_modes.empty() && !audit_modes.contains("none"));
std::vector<std::unique_ptr<storage_helper>> helpers;
for (const sstring& audit_mode : audit_modes) {
if (audit_mode == "table") {
helpers.emplace_back(std::make_unique<audit_cf_storage_helper>(qp, mm));
} else if (audit_mode == "syslog") {
helpers.emplace_back(std::make_unique<audit_syslog_storage_helper>(qp, mm));
}
}
SCYLLA_ASSERT(!helpers.empty());
if (helpers.size() == 1) {
return std::move(helpers.front());
}
return std::make_unique<audit_composite_storage_helper>(std::move(helpers));
}
static sstring category_to_string(statement_category category)
{
switch (category) {
@@ -146,9 +103,7 @@ static std::set<sstring> parse_audit_keyspaces(const sstring& data) {
}
audit::audit(locator::shared_token_metadata& token_metadata,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
sstring&& storage_helper_name,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
@@ -157,21 +112,28 @@ audit::audit(locator::shared_token_metadata& token_metadata,
, _audited_keyspaces(std::move(audited_keyspaces))
, _audited_tables(std::move(audited_tables))
, _audited_categories(std::move(audited_categories))
, _storage_helper_class_name(std::move(storage_helper_name))
, _cfg(cfg)
, _cfg_keyspaces_observer(cfg.audit_keyspaces.observe([this] (sstring const& new_value){ update_config<std::set<sstring>>(new_value, parse_audit_keyspaces, _audited_keyspaces); }))
, _cfg_tables_observer(cfg.audit_tables.observe([this] (sstring const& new_value){ update_config<std::map<sstring, std::set<sstring>>>(new_value, parse_audit_tables, _audited_tables); }))
, _cfg_categories_observer(cfg.audit_categories.observe([this] (sstring const& new_value){ update_config<category_set>(new_value, parse_audit_categories, _audited_categories); }))
{
_storage_helper_ptr = create_storage_helper(std::move(audit_modes), qp, mm);
}
{ }
audit::~audit() = default;
future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
std::set<sstring> audit_modes = parse_audit_modes(cfg.audit());
if (audit_modes.empty()) {
future<> audit::create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm) {
sstring storage_helper_name;
if (cfg.audit() == "table") {
storage_helper_name = "audit_cf_storage_helper";
} else if (cfg.audit() == "syslog") {
storage_helper_name = "audit_syslog_storage_helper";
} else if (cfg.audit() == "none") {
// Audit is off
logger.info("Audit is disabled");
return make_ready_future<>();
} else {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", cfg.audit()));
}
category_set audited_categories = parse_audit_categories(cfg.audit_categories());
std::map<sstring, std::set<sstring>> audited_tables = parse_audit_tables(cfg.audit_tables());
@@ -181,20 +143,19 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables());
return audit_instance().start(std::ref(stm),
std::ref(qp),
std::ref(mm),
std::move(audit_modes),
std::move(storage_helper_name),
std::move(audited_keyspaces),
std::move(audited_tables),
std::move(audited_categories),
std::cref(cfg))
.then([&cfg] {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit.start(cfg);
});
std::cref(cfg));
}
future<> audit::start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg, &qp, &mm] (audit& local_audit) {
return local_audit.start(cfg, qp.local(), mm.local());
});
}
@@ -220,7 +181,15 @@ audit_info_ptr audit::create_no_audit_info() {
return audit_info_ptr();
}
future<> audit::start(const db::config& cfg) {
future<> audit::start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm) {
try {
_storage_helper_ptr = create_object<storage_helper>(_storage_helper_class_name, qp, mm);
} catch (no_such_class& e) {
logger.error("Can't create audit storage helper {}: not supported", _storage_helper_class_name);
throw;
} catch (...) {
throw;
}
return _storage_helper_ptr->start(cfg);
}

View File

@@ -102,6 +102,7 @@ class audit final : public seastar::async_sharded_service<audit> {
std::map<sstring, std::set<sstring>> _audited_tables;
category_set _audited_categories;
sstring _storage_helper_class_name;
std::unique_ptr<storage_helper> _storage_helper_ptr;
const db::config& _cfg;
@@ -124,20 +125,18 @@ public:
static audit& local_audit_instance() {
return audit_instance().local();
}
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm);
static future<> start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table);
static audit_info_ptr create_no_audit_info();
audit(locator::shared_token_metadata& stm,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
audit(locator::shared_token_metadata& stm, sstring&& storage_helper_name,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
const db::config& cfg);
~audit();
future<> start(const db::config& cfg);
future<> start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm);
future<> stop();
future<> shutdown();
bool should_log(const audit_info* audit_info) const;

View File

@@ -11,6 +11,7 @@
#include "cql3/query_processor.hh"
#include "data_dictionary/keyspace_metadata.hh"
#include "utils/UUID_gen.hh"
#include "utils/class_registrator.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "service/migration_manager.hh"
@@ -197,4 +198,7 @@ cql3::query_options audit_cf_storage_helper::make_login_data(socket_address node
return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
using registry = class_registrator<storage_helper, audit_cf_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_cf_storage_helper");
}

View File

@@ -1,68 +0,0 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/loop.hh>
#include <seastar/core/future-util.hh>
#include "audit/audit_composite_storage_helper.hh"
#include "utils/class_registrator.hh"
namespace audit {
audit_composite_storage_helper::audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&& storage_helpers)
: _storage_helpers(std::move(storage_helpers))
{}
future<> audit_composite_storage_helper::start(const db::config& cfg) {
auto res = seastar::parallel_for_each(
_storage_helpers,
[&cfg] (std::unique_ptr<storage_helper>& h) {
return h->start(cfg);
}
);
return res;
}
future<> audit_composite_storage_helper::stop() {
auto res = seastar::parallel_for_each(
_storage_helpers,
[] (std::unique_ptr<storage_helper>& h) {
return h->stop();
}
);
return res;
}
future<> audit_composite_storage_helper::write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[audit_info, node_ip, client_ip, cl, &username, error](std::unique_ptr<storage_helper>& h) {
return h->write(audit_info, node_ip, client_ip, cl, username, error);
}
);
}
future<> audit_composite_storage_helper::write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[&username, node_ip, client_ip, error](std::unique_ptr<storage_helper>& h) {
return h->write_login(username, node_ip, client_ip, error);
}
);
}
} // namespace audit

View File

@@ -1,37 +0,0 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "audit/audit.hh"
#include <seastar/core/future.hh>
#include "storage_helper.hh"
namespace audit {
class audit_composite_storage_helper : public storage_helper {
std::vector<std::unique_ptr<storage_helper>> _storage_helpers;
public:
explicit audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&&);
virtual ~audit_composite_storage_helper() = default;
virtual future<> start(const db::config& cfg) override;
virtual future<> stop() override;
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) override;
virtual future<> write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) override;
};
} // namespace audit

View File

@@ -21,6 +21,7 @@
#include <fmt/chrono.h>
#include "cql3/query_processor.hh"
#include "utils/class_registrator.hh"
namespace cql3 {
@@ -142,4 +143,7 @@ future<> audit_syslog_storage_helper::write_login(const sstring& username,
co_await syslog_send_helper(msg.c_str());
}
using registry = class_registrator<storage_helper, audit_syslog_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_syslog_storage_helper");
}

View File

@@ -9,7 +9,6 @@ target_sources(scylla_auth
allow_all_authorizer.cc
authenticated_user.cc
authenticator.cc
cache.cc
certificate_authenticator.cc
common.cc
default_authorizer.cc
@@ -45,8 +44,5 @@ target_link_libraries(scylla_auth
add_whole_archive(auth scylla_auth)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(scylla_auth REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers scylla_auth
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -23,7 +23,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -12,7 +12,6 @@
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
@@ -30,7 +29,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&) {
}
virtual future<> start() override {

View File

@@ -1,180 +0,0 @@
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "auth/cache.hh"
#include "auth/common.hh"
#include "auth/roles-metadata.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
auto it = _roles.find(role);
if (it == _roles.end()) {
return {};
}
return it->second;
}
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
auto rec = make_lw_shared<role_record>();
rec->version = _current_version;
auto fetch = [this, &role](const sstring& q) {
return _qp.execute_internal(q, db::consistency_level::LOCAL_ONE,
internal_distributed_query_state(), {role},
cql3::query_processor::cache_internal::yes);
};
// roles
{
static const sstring q = format("SELECT * FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, meta::roles_table::name);
auto rs = co_await fetch(q);
if (!rs->empty()) {
auto& r = rs->one();
rec->is_superuser = r.get_or<bool>("is_superuser", false);
rec->can_login = r.get_or<bool>("can_login", false);
rec->salted_hash = r.get_or<sstring>("salted_hash", "");
if (r.has("member_of")) {
auto mo = r.get_set<sstring>("member_of");
rec->member_of.insert(
std::make_move_iterator(mo.begin()),
std::make_move_iterator(mo.end()));
}
} else {
// role got deleted
co_return nullptr;
}
}
// members
{
static const sstring q = format("SELECT role, member FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_MEMBERS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->members.insert(r.get_as<sstring>("member"));
co_await coroutine::maybe_yield();
}
}
// attributes
{
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->attributes[r.get_as<sstring>("name")] =
r.get_as<sstring>("value");
co_await coroutine::maybe_yield();
}
}
// permissions
{
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
auto resource = r.get_as<sstring>("resource");
auto perms_strings = r.get_set<sstring>("permissions");
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
auto pset = permissions::from_strings(perms_set);
rec->permissions[std::move(resource)] = std::move(pset);
co_await coroutine::maybe_yield();
}
}
co_return rec;
}
future<> cache::prune_all() noexcept {
for (auto it = _roles.begin(); it != _roles.end(); ) {
if (it->second->version != _current_version) {
_roles.erase(it++);
co_await coroutine::maybe_yield();
} else {
++it;
}
}
co_return;
}
future<> cache::load_all() {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
++_current_version;
logger.info("Loading all roles");
const uint32_t page_size = 128;
auto loader = [this](const cql3::untyped_result_set::row& r) -> future<stop_iteration> {
const auto name = r.get_as<sstring>("role");
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
}
co_return stop_iteration::no;
};
co_await _qp.query_internal(format("SELECT * FROM {}.{}",
db::system_keyspace::NAME, meta::roles_table::name),
db::consistency_level::LOCAL_ONE, {}, page_size, loader);
co_await prune_all();
for (const auto& [name, role] : _roles) {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
c._current_version = _current_version;
co_await c.prune_all();
});
}
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
} else {
_roles.erase(name);
}
co_await distribute_role(name, role);
}
}
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
if (!role_ptr) {
c._roles.erase(name);
return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c._roles[name] = std::move(role_copy);
});
}
bool cache::includes_table(const table_id& id) noexcept {
return id == db::system_keyspace::roles()->id()
|| id == db::system_keyspace::role_members()->id()
|| id == db::system_keyspace::role_attributes()->id()
|| id == db::system_keyspace::role_permissions()->id();
}
} // namespace auth

View File

@@ -1,61 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <absl/container/flat_hash_map.h>
#include "auth/permission.hh"
#include "auth/common.hh"
namespace cql3 { class query_processor; }
namespace auth {
class cache : public peering_sharded_service<cache> {
public:
using role_name_t = sstring;
using version_tag_t = char;
struct role_record {
bool can_login = false;
bool is_superuser = false;
std::unordered_set<role_name_t> member_of;
std::unordered_set<role_name_t> members;
sstring salted_hash;
std::unordered_map<sstring, sstring> attributes;
std::unordered_map<sstring, permission_set> permissions;
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
static bool includes_table(const table_id&) noexcept;
private:
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
};
} // namespace auth

View File

@@ -48,10 +48,6 @@ extern constinit const std::string_view AUTH_PACKAGE_NAME;
} // namespace meta
constexpr std::string_view PERMISSIONS_CF = "role_permissions";
constexpr std::string_view ROLE_MEMBERS_CF = "role_members";
constexpr std::string_view ROLE_ATTRIBUTES_CF = "role_attributes";
// This is a helper to check whether auth-v2 is on.
bool legacy_mode(cql3::query_processor& qp);

View File

@@ -37,6 +37,7 @@ std::string_view default_authorizer::qualified_java_name() const {
static constexpr std::string_view ROLE_NAME = "role";
static constexpr std::string_view RESOURCE_NAME = "resource";
static constexpr std::string_view PERMISSIONS_NAME = "permissions";
static constexpr std::string_view PERMISSIONS_CF = "role_permissions";
static logging::logger alogger("default_authorizer");

View File

@@ -83,18 +83,17 @@ static const class_registrator<
ldap_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(ldap_role_manager_full_name);
::service::migration_manager&> registration(ldap_role_manager_full_name);
ldap_role_manager::ldap_role_manager(
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
: _std_mgr(qp, rg0c, mm, cache), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
: _std_mgr(qp, rg0c, mm), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
, _bind_password(bind_password)
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this))) {
}
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
: ldap_role_manager(
qp.db().get_config().ldap_url_template(),
qp.db().get_config().ldap_attr_role(),
@@ -102,8 +101,7 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
qp.db().get_config().ldap_bind_passwd(),
qp,
rg0c,
mm,
cache) {
mm) {
}
std::string_view ldap_role_manager::qualified_java_name() const noexcept {

View File

@@ -14,7 +14,6 @@
#include "ent/ldap/ldap_connection.hh"
#include "standard_role_manager.hh"
#include "auth/cache.hh"
namespace auth {
@@ -44,13 +43,12 @@ class ldap_role_manager : public role_manager {
std::string_view bind_password, ///< LDAP bind credentials.
cql3::query_processor& qp, ///< Passed to standard_role_manager.
::service::raft_group0_client& rg0c, ///< Passed to standard_role_manager.
::service::migration_manager& mm, ///< Passed to standard_role_manager.
cache& cache ///< Passed to standard_role_manager.
::service::migration_manager& mm ///< Passed to standard_role_manager.
);
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
/// class_registrator<role_manager>.
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm);
/// Thrown when query-template parsing fails.
struct url_error : public std::runtime_error {

View File

@@ -11,7 +11,6 @@
#include <seastar/core/future.hh>
#include <stdexcept>
#include <string_view>
#include "auth/cache.hh"
#include "cql3/description.hh"
#include "utils/class_registrator.hh"
@@ -24,8 +23,7 @@ static const class_registrator<
maintenance_socket_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(sstring{maintenance_socket_role_manager_name});
::service::migration_manager&> registration(sstring{maintenance_socket_role_manager_name});
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {

View File

@@ -8,7 +8,6 @@
#pragma once
#include "auth/cache.hh"
#include "auth/resource.hh"
#include "auth/role_manager.hh"
#include <seastar/core/future.hh>
@@ -30,7 +29,7 @@ extern const std::string_view maintenance_socket_role_manager_name;
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
class maintenance_socket_role_manager final : public role_manager {
public:
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {}
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -49,7 +49,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -64,11 +63,10 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
, _hashing_worker(hashing_worker)
@@ -317,20 +315,11 @@ future<authenticated_user> password_authenticator::authenticate(
const sstring password = credentials.at(PASSWORD_KEY);
try {
std::optional<sstring> salted_hash;
if (legacy_mode(_qp)) {
salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
} else {
auto role = _cache.get(username);
if (!role || role->salted_hash.empty()) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
salted_hash = role->salted_hash;
const std::optional<sstring> salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash = std::move(salted_hash)]{
return passwords::check(password, *salted_hash);
});
if (!password_match) {

View File

@@ -16,7 +16,6 @@
#include "db/consistency_level_type.hh"
#include "auth/authenticator.hh"
#include "auth/passwords.hh"
#include "auth/cache.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
@@ -42,7 +41,6 @@ class password_authenticator : public authenticator {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser; // default superuser name from the config (may or may not be present in roles table)
@@ -55,7 +53,7 @@ public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
~password_authenticator();

View File

@@ -35,10 +35,9 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -11,7 +11,6 @@
#pragma once
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
@@ -30,7 +29,7 @@ namespace auth {
class saslauthd_authenticator : public authenticator {
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
public:
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
future<> start() override;

View File

@@ -17,7 +17,6 @@
#include <chrono>
#include <seastar/core/future-util.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
@@ -158,7 +157,6 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
service::service(
utils::loading_cache_config c,
cache& cache,
cql3::query_processor& qp,
::service::raft_group0_client& g0,
::service::migration_notifier& mn,
@@ -168,7 +166,6 @@ service::service(
maintenance_socket_enabled used_by_maintenance_socket)
: _loading_cache_config(std::move(c))
, _permissions_cache(nullptr)
, _cache(cache)
, _qp(qp)
, _group0_client(g0)
, _mnotifier(mn)
@@ -191,17 +188,15 @@ service::service(
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache,
utils::alien_worker& hashing_worker)
: service(
std::move(c),
cache,
qp,
g0,
mn,
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm),
used_by_maintenance_socket) {
}
@@ -237,9 +232,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
auto auth_version = co_await sys_ks.get_auth_version();
// version is set in query processor to be easily available in various places we call auth::legacy_mode check.
_qp.auth_version = auth_version;
if (this_shard_id() == 0) {
co_await _cache.load_all();
}
if (!_used_by_maintenance_socket) {
// this legacy keyspace is only used by cqlsh
// it's needed when executing `list roles` or `list users`

View File

@@ -21,7 +21,6 @@
#include "auth/authorizer.hh"
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "auth/cache.hh"
#include "auth/role_manager.hh"
#include "auth/common.hh"
#include "cql3/description.hh"
@@ -78,7 +77,6 @@ public:
class service final : public seastar::peering_sharded_service<service> {
utils::loading_cache_config _loading_cache_config;
std::unique_ptr<permissions_cache> _permissions_cache;
cache& _cache;
cql3::query_processor& _qp;
@@ -109,7 +107,6 @@ class service final : public seastar::peering_sharded_service<service> {
public:
service(
utils::loading_cache_config,
cache& cache,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_notifier&,
@@ -131,7 +128,6 @@ public:
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&,
utils::alien_worker&);
future<> start(::service::migration_manager&, db::system_keyspace&);

View File

@@ -41,6 +41,21 @@
namespace auth {
namespace meta {
namespace role_members_table {
constexpr std::string_view name{"role_members" , 12};
}
namespace role_attributes_table {
constexpr std::string_view name{"role_attributes", 15};
}
}
static logging::logger log("standard_role_manager");
@@ -49,8 +64,7 @@ static const class_registrator<
standard_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
::service::migration_manager&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
sstring name;
@@ -107,11 +121,10 @@ static bool has_can_login(const cql3::untyped_result_set_row& row) {
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob_unfragmented("can_login")).is_null());
}
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
{}
@@ -123,7 +136,7 @@ std::string_view standard_role_manager::qualified_java_name() const noexcept {
const resource_set& standard_role_manager::protected_resources() const {
static const resource_set resources({
make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name),
make_data_resource(meta::legacy::AUTH_KS, ROLE_MEMBERS_CF)});
make_data_resource(meta::legacy::AUTH_KS, meta::role_members_table::name)});
return resources;
}
@@ -147,7 +160,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY (role, member)"
")",
meta::legacy::AUTH_KS,
ROLE_MEMBERS_CF);
meta::role_members_table::name);
static const sstring create_role_attributes_query = seastar::format(
"CREATE TABLE {}.{} ("
" role text,"
@@ -156,7 +169,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY(role, name)"
")",
meta::legacy::AUTH_KS,
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
return when_all_succeed(
create_legacy_metadata_table_if_missing(
meta::roles_table::name,
@@ -164,12 +177,12 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
create_roles_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
ROLE_MEMBERS_CF,
meta::role_members_table::name,
_qp,
create_role_members_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
ROLE_ATTRIBUTES_CF,
meta::role_attributes_table::name,
_qp,
create_role_attributes_query,
_migration_manager)).discard_result();
@@ -416,7 +429,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto revoke_from_members = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("SELECT member FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
const auto members = co_await _qp.execute_internal(
query,
consistency_for_role(role_name),
@@ -448,7 +461,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto remove_attributes_of = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name)},
cql3::query_processor::cache_internal::yes).discard_result();
@@ -504,7 +517,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::add: {
const sstring insert_query = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
co_return co_await _qp.execute_internal(
insert_query,
consistency_for_role(role_name),
@@ -516,7 +529,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::remove: {
const sstring delete_query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
co_return co_await _qp.execute_internal(
delete_query,
consistency_for_role(role_name),
@@ -554,12 +567,12 @@ standard_role_manager::modify_membership(
case membership_change::add:
modify_role_members = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
break;
case membership_change::remove:
modify_role_members = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
break;
default:
on_internal_error(log, format("unknown membership_change value: {}", int(ch)));
@@ -653,7 +666,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
const sstring query = seastar::format("SELECT * FROM {}.{}",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
const auto results = co_await _qp.execute_internal(
query,
@@ -718,21 +731,15 @@ future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
}
future<bool> standard_role_manager::can_login(std::string_view role_name) {
if (legacy_mode(_qp)) {
const auto r = co_await require_record(_qp, role_name);
co_return r.can_login;
}
auto role = _cache.get(sstring(role_name));
if (!role) {
throw nonexistant_role(role_name);
}
co_return role->can_login;
return require_record(_qp, role_name).then([](record r) {
return r.can_login;
});
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
if (!result_set->empty()) {
const cql3::untyped_result_set_row &row = result_set->one();
@@ -763,7 +770,7 @@ future<> standard_role_manager::set_attribute(std::string_view role_name, std::s
}
const sstring query = seastar::format("INSERT INTO {}.{} (role, name, value) VALUES (?, ?, ?)",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {
@@ -778,7 +785,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
}
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {

View File

@@ -10,7 +10,6 @@
#include "auth/common.hh"
#include "auth/role_manager.hh"
#include "auth/cache.hh"
#include <string_view>
@@ -37,14 +36,13 @@ class standard_role_manager final : public role_manager {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser;
shared_promise<> _superuser_created_promise;
public:
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -13,7 +13,6 @@
#include "auth/authorizer.hh"
#include "auth/default_authorizer.hh"
#include "auth/password_authenticator.hh"
#include "auth/cache.hh"
#include "auth/permission.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
@@ -38,8 +37,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, hashing_worker)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -241,7 +240,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&,
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<

View File

@@ -15,7 +15,6 @@
#include <cmath>
#include "seastarx.hh"
#include "backlog_controller_fwd.hh"
// Simple proportional controller to adjust shares for processes for which a backlog can be clearly
// defined.
@@ -129,21 +128,11 @@ public:
static constexpr unsigned normalization_factor = 30;
static constexpr float disable_backlog = std::numeric_limits<double>::infinity();
static constexpr float backlog_disabled(float backlog) { return std::isinf(backlog); }
static inline const std::vector<backlog_controller::control_point> default_control_points = {
backlog_controller::control_point{0.0, 50}, {1.5, 100}, {normalization_factor, default_compaction_maximum_shares}};
compaction_controller(backlog_controller::scheduling_group sg, float static_shares, std::optional<float> max_shares,
std::chrono::milliseconds interval, std::function<float()> current_backlog)
compaction_controller(backlog_controller::scheduling_group sg, float static_shares, std::chrono::milliseconds interval, std::function<float()> current_backlog)
: backlog_controller(std::move(sg), std::move(interval),
default_control_points,
std::vector<backlog_controller::control_point>({{0.0, 50}, {1.5, 100} , {normalization_factor, 1000}}),
std::move(current_backlog),
static_shares
)
{
if (max_shares) {
set_max_shares(*max_shares);
}
}
// Updates the maximum output value for control points.
void set_max_shares(float max_shares);
{}
};

View File

@@ -1,13 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <cstdint>
static constexpr uint64_t default_compaction_maximum_shares = 1000;

View File

@@ -17,8 +17,5 @@ target_link_libraries(cdc
PRIVATE
replica)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(cdc REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers cdc
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -1209,7 +1209,7 @@ future<mutation> create_table_streams_mutation(table_id table, db_clock::time_po
co_return std::move(m);
}
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const std::vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
auto s = db::system_keyspace::cdc_streams_state();
mutation m(s, partition_key::from_single_value(*s,
@@ -1252,24 +1252,24 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
tables_to_process = _cdc_metadata.get_tables_with_cdc_tablet_streams() | std::ranges::to<std::unordered_set<table_id>>();
}
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) -> future<> {
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) -> future<> {
if (tables) {
for (auto table : *tables) {
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
return f(table, base_ts, std::move(base_stream_set));
});
}
} else {
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
return f(table, base_ts, std::move(base_stream_set));
});
}
};
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
table_streams new_table_map;
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector<cdc::stream_id> stream_set) {
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, std::vector<cdc::stream_id> stream_set) {
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(stream_tp.time_since_epoch()).count();
new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)};
};
@@ -1345,7 +1345,7 @@ future<> generation_service::query_cdc_timestamps(table_id table, bool ascending
}
}
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
const auto& all_tables = _cdc_metadata.get_all_tablet_streams();
auto table_it = all_tables.find(table);
if (table_it == all_tables.end()) {
@@ -1402,8 +1402,8 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
co_return;
}
utils::chunked_vector<cdc::stream_id> new_streams;
co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count());
std::vector<cdc::stream_id> new_streams;
new_streams.reserve(new_tablet_map.tablet_count());
for (auto tid : new_tablet_map.tablet_ids()) {
new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0);
co_await coroutine::maybe_yield();
@@ -1425,7 +1425,7 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
muts.emplace_back(std::move(mut));
}
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
utils::chunked_vector<mutation> muts;
muts.reserve(2);

View File

@@ -143,12 +143,12 @@ stream_state read_stream_state(int8_t val);
struct committed_stream_set {
db_clock::time_point ts;
utils::chunked_vector<cdc::stream_id> streams;
std::vector<cdc::stream_id> streams;
};
struct cdc_stream_diff {
utils::chunked_vector<stream_id> closed_streams;
utils::chunked_vector<stream_id> opened_streams;
std::vector<stream_id> closed_streams;
std::vector<stream_id> opened_streams;
};
using table_streams = std::map<api::timestamp_type, committed_stream_set>;
@@ -220,11 +220,11 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const locator::tablet_map&, api::timestamp_type);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const utils::chunked_vector<cdc::stream_id>&, api::timestamp_type);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const std::vector<cdc::stream_id>&, api::timestamp_type);
utils::chunked_vector<mutation> make_drop_table_streams_mutations(table_id, api::timestamp_type ts);
future<mutation> get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts);
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
table_streams::const_iterator get_new_base_for_gc(const table_streams&, std::chrono::seconds ttl);
} // namespace cdc

View File

@@ -149,7 +149,7 @@ public:
future<> load_cdc_tablet_streams(std::optional<std::unordered_set<table_id>> changed_tables);
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> generate_tablet_resize_update(utils::chunked_vector<canonical_mutation>& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts);

View File

@@ -25,7 +25,6 @@
#include "locator/abstract_replication_strategy.hh"
#include "locator/topology.hh"
#include "replica/database.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "gms/feature_service.hh"
#include "schema/schema.hh"
@@ -69,15 +68,10 @@ shared_ptr<locator::abstract_replication_strategy> generate_replication_strategy
return locator::abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), params, topo);
}
// When dropping a column from a CDC log table, we set the drop timestamp
// `column_drop_leeway` seconds into the future to ensure that for writes concurrent
// with column drop, the write timestamp is before the column drop timestamp.
constexpr auto column_drop_leeway = std::chrono::seconds(5);
} // anonymous namespace
namespace cdc {
static schema_ptr create_log_schema(const schema&, const replica::database&, const keyspace_metadata&, api::timestamp_type,
static schema_ptr create_log_schema(const schema&, const replica::database&, const keyspace_metadata&,
std::optional<table_id> = {}, schema_ptr = nullptr);
}
@@ -189,7 +183,7 @@ public:
muts.emplace_back(std::move(mut));
}
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms, api::timestamp_type ts) override {
void on_pre_create_column_families(const keyspace_metadata& ksm, std::vector<schema_ptr>& cfms) override {
std::vector<schema_ptr> new_cfms;
for (auto sp : cfms) {
@@ -208,7 +202,7 @@ public:
}
// in seastar thread
auto log_schema = create_log_schema(schema, db, ksm, ts);
auto log_schema = create_log_schema(schema, db, ksm);
new_cfms.push_back(std::move(log_schema));
}
@@ -255,7 +249,7 @@ public:
}
std::optional<table_id> maybe_id = log_schema ? std::make_optional(log_schema->id()) : std::nullopt;
auto new_log_schema = create_log_schema(new_schema, db, *keyspace.metadata(), timestamp, std::move(maybe_id), log_schema);
auto new_log_schema = create_log_schema(new_schema, db, *keyspace.metadata(), std::move(maybe_id), log_schema);
auto log_mut = log_schema
? db::schema_tables::make_update_table_mutations(_ctxt._proxy, keyspace.metadata(), log_schema, new_log_schema, timestamp)
@@ -587,9 +581,11 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
}
static void set_default_properties_log_table(schema_builder& b, const schema& s,
const replica::database& db, const keyspace_metadata& ksm)
static schema_ptr create_log_schema(const schema& s, const replica::database& db,
const keyspace_metadata& ksm, std::optional<table_id> uuid, schema_ptr old)
{
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner(cdc::cdc_partitioner::classname);
b.set_compaction_strategy(compaction::compaction_strategy_type::time_window);
b.set_comment(fmt::format("CDC log for {}.{}", s.ks_name(), s.cf_name()));
auto ttl_seconds = s.cdc_options().ttl();
@@ -615,44 +611,13 @@ static void set_default_properties_log_table(schema_builder& b, const schema& s,
std::to_string(std::max(1, window_seconds / 2))},
});
}
b.set_caching_options(caching_options::get_disabled_caching_options());
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, db.get_token_metadata(), false));
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
}
static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
const api::timestamp_type timestamp, const schema_ptr old)
{
b.with_column(log_meta_column_name_bytes("stream_id"), bytes_type, column_kind::partition_key);
b.with_column(log_meta_column_name_bytes("time"), timeuuid_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
auto validate_new_column = [&] (const sstring& name) {
// When dropping a column from a CDC log table, we set the drop timestamp to be
// `column_drop_leeway` seconds into the future (see `create_log_schema`).
// Therefore, when recreating a column with the same name, we need to validate
// that it's not recreated too soon and that the drop timestamp has passed.
if (old && old->dropped_columns().contains(name)) {
const auto& drop_info = old->dropped_columns().at(name);
auto create_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(timestamp));
auto drop_time = api::timestamp_clock::time_point(api::timestamp_clock::duration(drop_info.timestamp));
if (drop_time > create_time) {
throw exceptions::invalid_request_exception(format("Cannot add column {} because a column with the same name was dropped too recently. Please retry after {} seconds",
name, std::chrono::duration_cast<std::chrono::seconds>(drop_time - create_time).count() + 1));
}
}
};
auto add_column = [&] (sstring name, data_type type) {
validate_new_column(name);
b.with_column(to_bytes(name), type);
};
b.set_caching_options(caching_options::get_disabled_caching_options());
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
for (const auto& column : columns) {
auto type = column.type;
@@ -674,9 +639,9 @@ static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
}
));
}
add_column(log_data_column_name(column.name_as_text()), type);
b.with_column(log_data_column_name_bytes(column.name()), type);
if (is_data_col) {
add_column(log_data_column_deleted_name(column.name_as_text()), boolean_type);
b.with_column(log_data_column_deleted_name_bytes(column.name()), boolean_type);
}
if (column.type->is_multi_cell()) {
auto dtype = visit(*type, make_visitor(
@@ -692,7 +657,7 @@ static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
throw std::invalid_argument("Should not reach");
}
));
add_column(log_data_column_deleted_elements_name(column.name_as_text()), dtype);
b.with_column(log_data_column_deleted_elements_name_bytes(column.name()), dtype);
}
}
};
@@ -700,28 +665,15 @@ static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
add_columns(s.clustering_key_columns());
add_columns(s.static_columns(), true);
add_columns(s.regular_columns(), true);
}
static schema_ptr create_log_schema(const schema& s, const replica::database& db,
const keyspace_metadata& ksm, api::timestamp_type timestamp, std::optional<table_id> uuid, schema_ptr old)
{
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner(cdc::cdc_partitioner::classname);
if (old) {
// If the user reattaches the log table, do not change its properties.
b.set_properties(old->get_properties());
} else {
set_default_properties_log_table(b, s, db, ksm);
}
add_columns_to_cdc_log(b, s, timestamp, old);
if (uuid) {
b.set_uuid(*uuid);
}
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, db.get_token_metadata()));
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
/**
* #10473 - if we are redefining the log table, we need to ensure any dropped
* columns are registered in "dropped_columns" table, otherwise clients will not
@@ -731,8 +683,7 @@ static schema_ptr create_log_schema(const schema& s, const replica::database& db
// not super efficient, but we don't do this often.
for (auto& col : old->all_columns()) {
if (!b.has_column({col.name(), col.name_as_text() })) {
auto drop_ts = api::timestamp_clock::now() + column_drop_leeway;
b.without_column(col.name_as_text(), col.type, drop_ts.time_since_epoch().count());
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
}
}
}
@@ -952,6 +903,9 @@ static managed_bytes merge(const abstract_type& type, const managed_bytes_opt& p
throw std::runtime_error(format("cdc merge: unknown type {}", type.name()));
}
using cell_map = std::unordered_map<const column_definition*, managed_bytes_opt>;
using row_states_map = std::unordered_map<clustering_key, cell_map, clustering_key::hashing, clustering_key::equality>;
static managed_bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) {
if (state) {
if (auto it = state->find(&cdef); it != state->end()) {
@@ -961,12 +915,7 @@ static managed_bytes_opt get_col_from_row_state(const cell_map* state, const col
return std::nullopt;
}
cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck) {
auto it = row_states.find(ck);
return it == row_states.end() ? nullptr : &it->second;
}
const cell_map* get_row_state(const row_states_map& row_states, const clustering_key& ck) {
static cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck) {
auto it = row_states.find(ck);
return it == row_states.end() ? nullptr : &it->second;
}
@@ -1436,8 +1385,6 @@ struct process_change_visitor {
row_states_map& _clustering_row_states;
cell_map& _static_row_state;
const bool _is_update = false;
const bool _generate_delta_values = true;
void static_row_cells(auto&& visit_row_cells) {
@@ -1461,13 +1408,12 @@ struct process_change_visitor {
struct clustering_row_cells_visitor : public process_row_visitor {
operation _cdc_op = operation::update;
operation _marker_op = operation::insert;
using process_row_visitor::process_row_visitor;
void marker(const row_marker& rm) {
_ttl_column = get_ttl(rm);
_cdc_op = _marker_op;
_cdc_op = operation::insert;
}
};
@@ -1475,9 +1421,6 @@ struct process_change_visitor {
log_ck, _touched_parts, _builder,
_enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey),
_clustering_row_states, _generate_delta_values);
if (_is_update && _request_options.alternator) {
v._marker_op = operation::update;
}
visit_row_cells(v);
if (_enable_updating_state) {
@@ -1631,11 +1574,6 @@ private:
row_states_map _clustering_row_states;
cell_map _static_row_state;
// True if the mutated row existed before applying the mutation. In other
// words, if the preimage is enabled and it isn't empty (otherwise, we
// assume that the row is non-existent). Used for Alternator Streams (see
// #6918).
bool _is_update = false;
const bool _uses_tablets;
@@ -1652,7 +1590,7 @@ public:
: _ctx(ctx)
, _schema(std::move(s))
, _dk(std::move(dk))
, _log_schema(_schema->cdc_schema() ? _schema->cdc_schema() : ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
, _log_schema(ctx._proxy.get_db().local().find_schema(_schema->ks_name(), log_name(_schema->cf_name())))
, _options(options)
, _clustering_row_states(0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema))
, _uses_tablets(ctx._proxy.get_db().local().find_keyspace(_schema->ks_name()).uses_tablets())
@@ -1762,7 +1700,6 @@ public:
._enable_updating_state = _enable_updating_state,
._clustering_row_states = _clustering_row_states,
._static_row_state = _static_row_state,
._is_update = _is_update,
._generate_delta_values = generate_delta_values(_builder->base_schema())
};
cdc::inspect_mutation(m, v);
@@ -1773,10 +1710,6 @@ public:
_builder->end_record();
}
const row_states_map& clustering_row_states() const override {
return _clustering_row_states;
}
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<utils::chunked_vector<mutation>, stats::part_type_set> finish() && {
@@ -1900,7 +1833,6 @@ public:
_static_row_state[&c] = std::move(*maybe_cell_view);
}
}
_is_update = true;
}
if (static_only) {
@@ -1988,7 +1920,6 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
return make_ready_future<>();
}
const bool alternator_increased_compatibility = options.alternator && options.alternator_streams_increased_compatibility;
transformer trans(_ctxt, s, m.decorated_key(), options);
auto f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(nullptr);
@@ -1996,7 +1927,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
// Preimage has been fetched by upper layers.
tracing::trace(tr_state, "CDC: Using a prefetched preimage");
f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(options.preimage);
} else if (s->cdc_options().preimage() || s->cdc_options().postimage() || alternator_increased_compatibility) {
} else if (s->cdc_options().preimage() || s->cdc_options().postimage()) {
// Note: further improvement here would be to coalesce the pre-image selects into one
// if a batch contains several modifications to the same table. Otoh, batch is rare(?)
// so this is premature.
@@ -2013,7 +1944,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key());
}
return f.then([alternator_increased_compatibility, trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
auto& m = mutations[idx];
auto& s = m.schema();
@@ -2028,13 +1959,13 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
details.had_preimage |= preimage;
details.had_postimage |= postimage;
tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key());
if (should_split(m, options)) {
if (should_split(m)) {
tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key());
details.was_split = true;
process_changes_with_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
process_changes_with_splitting(m, trans, preimage, postimage);
} else {
tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key());
process_changes_without_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
process_changes_without_splitting(m, trans, preimage, postimage);
}
auto [log_mut, touched_parts] = std::move(trans).finish();
const int generated_count = log_mut.size();

View File

@@ -52,9 +52,6 @@ class database;
namespace cdc {
using cell_map = std::unordered_map<const column_definition*, managed_bytes_opt>;
using row_states_map = std::unordered_map<clustering_key, cell_map, clustering_key::hashing, clustering_key::equality>;
// cdc log table operation
enum class operation : int8_t {
// note: these values will eventually be read by a third party, probably not privvy to this
@@ -76,14 +73,6 @@ struct per_request_options {
// Scylla. Currently, only TTL expiration implementation for Alternator
// uses this.
const bool is_system_originated = false;
// True if this mutation was emitted by Alternator.
const bool alternator = false;
// Sacrifice performance for the sake of better compatibility with DynamoDB
// Streams. It's important for correctness that
// alternator_streams_increased_compatibility config flag be read once per
// request, because it's live-updateable. As a result, the flag may change
// between reads.
const bool alternator_streams_increased_compatibility = false;
};
struct operation_result_tracker;
@@ -153,7 +142,4 @@ bool is_cdc_metacolumn_name(const sstring& name);
utils::UUID generate_timeuuid(api::timestamp_type t);
cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck);
const cell_map* get_row_state(const row_states_map& row_states, const clustering_key& ck);
} // namespace cdc

View File

@@ -54,7 +54,7 @@ cdc::stream_id get_stream(
}
static cdc::stream_id get_stream(
const utils::chunked_vector<cdc::stream_id>& streams,
const std::vector<cdc::stream_id>& streams,
dht::token tok) {
if (streams.empty()) {
on_internal_error(cdc_log, "get_stream: streams empty");
@@ -159,7 +159,7 @@ cdc::stream_id cdc::metadata::get_vnode_stream(api::timestamp_type ts, dht::toke
return ret;
}
const utils::chunked_vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
const std::vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
auto now = api::new_timestamp();
if (ts > now + get_generation_leeway().count()) {
throw exceptions::invalid_request_exception(seastar::format(
@@ -259,10 +259,10 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
return !it->second;
}
future<utils::chunked_vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
utils::chunked_vector<cdc::stream_id> opened,
const utils::chunked_vector<cdc::stream_id>& closed) {
future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
const std::vector<cdc::stream_id>& prev_stream_set,
std::vector<cdc::stream_id> opened,
const std::vector<cdc::stream_id>& closed) {
if (closed.size() == prev_stream_set.size()) {
// all previous streams are closed, so the next stream set is just the opened streams.
@@ -273,8 +273,8 @@ future<utils::chunked_vector<cdc::stream_id>> cdc::metadata::construct_next_stre
// streams and removing the closed streams. we assume each stream set is
// sorted by token, and the result is sorted as well.
utils::chunked_vector<cdc::stream_id> next_stream_set;
co_await utils::reserve_gently(next_stream_set, prev_stream_set.size() + opened.size() - closed.size());
std::vector<cdc::stream_id> next_stream_set;
next_stream_set.reserve(prev_stream_set.size() + opened.size() - closed.size());
auto next_prev = prev_stream_set.begin();
auto next_closed = closed.begin();
@@ -318,8 +318,8 @@ std::vector<table_id> cdc::metadata::get_tables_with_cdc_tablet_streams() const
return _tablet_streams | std::views::keys | std::ranges::to<std::vector<table_id>>();
}
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const utils::chunked_vector<stream_id>& before, const utils::chunked_vector<stream_id>& after) {
utils::chunked_vector<stream_id> closed, opened;
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const std::vector<stream_id>& before, const std::vector<stream_id>& after) {
std::vector<stream_id> closed, opened;
auto before_it = before.begin();
auto after_it = after.begin();

View File

@@ -49,7 +49,7 @@ class metadata final {
container_t::const_iterator gen_used_at(api::timestamp_type ts) const;
const utils::chunked_vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
const std::vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
public:
/* Is a generation with the given timestamp already known or obsolete? It is obsolete if and only if
@@ -111,14 +111,14 @@ public:
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
static future<utils::chunked_vector<stream_id>> construct_next_stream_set(
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
utils::chunked_vector<cdc::stream_id> opened,
const utils::chunked_vector<cdc::stream_id>& closed);
static future<std::vector<stream_id>> construct_next_stream_set(
const std::vector<cdc::stream_id>& prev_stream_set,
std::vector<cdc::stream_id> opened,
const std::vector<cdc::stream_id>& closed);
static future<cdc_stream_diff> generate_stream_diff(
const utils::chunked_vector<stream_id>& before,
const utils::chunked_vector<stream_id>& after);
const std::vector<stream_id>& before,
const std::vector<stream_id>& after);
};

View File

@@ -6,28 +6,15 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "bytes.hh"
#include "bytes_fwd.hh"
#include "mutation/atomic_cell.hh"
#include "mutation/atomic_cell_or_collection.hh"
#include "mutation/collection_mutation.hh"
#include "mutation/mutation.hh"
#include "mutation/tombstone.hh"
#include "schema/schema.hh"
#include "seastar/core/sstring.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "types/user.hh"
#include "split.hh"
#include "log.hh"
#include "change_visitor.hh"
#include "utils/managed_bytes.hh"
#include <string_view>
#include <unordered_map>
extern logging::logger cdc_log;
struct atomic_column_update {
column_id id;
@@ -503,8 +490,6 @@ struct should_split_visitor {
// Otherwise we store the change's ttl.
std::optional<gc_clock::duration> _ttl = std::nullopt;
virtual ~should_split_visitor() = default;
inline bool finished() const { return _result; }
inline void stop() { _result = true; }
@@ -527,7 +512,7 @@ struct should_split_visitor {
void collection_tombstone(const tombstone& t) { visit(t.timestamp + 1); }
virtual void live_collection_cell(bytes_view, const atomic_cell_view& cell) {
void live_collection_cell(bytes_view, const atomic_cell_view& cell) {
if (_had_row_marker) {
// nonatomic updates cannot be expressed with an INSERT.
return stop();
@@ -537,7 +522,7 @@ struct should_split_visitor {
void dead_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); }
void collection_column(const column_definition&, auto&& visit_collection) { visit_collection(*this); }
virtual void marker(const row_marker& rm) {
void marker(const row_marker& rm) {
_had_row_marker = true;
visit(rm.timestamp(), get_ttl(rm));
}
@@ -578,29 +563,7 @@ struct should_split_visitor {
}
};
// This is the same as the above, but it doesn't split a row marker away from
// an update. As a result, updates that create an item appear as a single log
// row.
class alternator_should_split_visitor : public should_split_visitor {
public:
~alternator_should_split_visitor() override = default;
void live_collection_cell(bytes_view, const atomic_cell_view& cell) override {
visit(cell.timestamp());
}
void marker(const row_marker& rm) override {
visit(rm.timestamp());
}
};
bool should_split(const mutation& m, const per_request_options& options) {
if (options.alternator) {
alternator_should_split_visitor v;
cdc::inspect_mutation(m, v);
return v._result || v._ts == api::missing_timestamp;
}
bool should_split(const mutation& m) {
should_split_visitor v;
cdc::inspect_mutation(m, v);
@@ -610,109 +573,8 @@ bool should_split(const mutation& m, const per_request_options& options) {
|| v._ts == api::missing_timestamp;
}
// Returns true if the row state and the atomic and nonatomic entries represent
// an equivalent item.
static bool entries_match_row_state(const schema_ptr& base_schema, const cell_map& row_state, const std::vector<atomic_column_update>& atomic_entries,
std::vector<nonatomic_column_update>& nonatomic_entries) {
for (const auto& update : atomic_entries) {
const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id);
const auto it = row_state.find(&cdef);
if (it == row_state.end()) {
return false;
}
if (to_managed_bytes_opt(update.cell.value().linearize()) != it->second) {
return false;
}
}
if (nonatomic_entries.empty()) {
return true;
}
for (const auto& update : nonatomic_entries) {
const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id);
const auto it = row_state.find(&cdef);
if (it == row_state.end()) {
return false;
}
// The only collection used by Alternator is a non-frozen map.
auto current_raw_map = cdef.type->deserialize(*it->second);
map_type_impl::native_type current_values = value_cast<map_type_impl::native_type>(current_raw_map);
if (current_values.size() != update.cells.size()) {
return false;
}
std::unordered_map<sstring_view, bytes> current_values_map;
for (const auto& entry : current_values) {
const auto attr_name = std::string_view(value_cast<sstring>(entry.first));
current_values_map[attr_name] = value_cast<bytes>(entry.second);
}
for (const auto& [key, value] : update.cells) {
const auto key_str = to_string_view(key);
if (!value.is_live()) {
if (current_values_map.contains(key_str)) {
return false;
}
} else if (current_values_map[key_str] != value.value().linearize()) {
return false;
}
}
}
return true;
}
bool should_skip(batch& changes, const mutation& base_mutation, change_processor& processor) {
const schema_ptr& base_schema = base_mutation.schema();
// Alternator doesn't use static updates and clustered range deletions.
if (!changes.static_updates.empty() || !changes.clustered_range_deletions.empty()) {
return false;
}
for (clustered_row_insert& u : changes.clustered_inserts) {
const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key);
if (!row_state) {
return false;
}
if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) {
return false;
}
}
for (clustered_row_update& u : changes.clustered_updates) {
const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key);
if (!row_state) {
return false;
}
if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) {
return false;
}
}
// Skip only if the row being deleted does not exist (i.e. the deletion is a no-op).
for (const auto& row_deletion : changes.clustered_row_deletions) {
if (processor.clustering_row_states().contains(row_deletion.key)) {
return false;
}
}
// Don't skip if the item exists.
//
// Increased DynamoDB Streams compatibility guarantees that single-item
// operations will read the item and store it in the clustering row states.
// If it is not found there, we may skip CDC. This is safe as long as the
// assumptions of this operation's write isolation are not violated.
if (changes.partition_deletions && processor.clustering_row_states().contains(clustering_key::make_empty())) {
return false;
}
cdc_log.trace("Skipping CDC log for mutation {}", base_mutation);
return true;
}
void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) {
bool enable_preimage, bool enable_postimage) {
const auto base_schema = base_mutation.schema();
auto changes = extract_changes(base_mutation);
auto pk = base_mutation.key();
@@ -724,6 +586,9 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
const auto last_timestamp = changes.rbegin()->first;
for (auto& [change_ts, btch] : changes) {
const bool is_last = change_ts == last_timestamp;
processor.begin_timestamp(change_ts, is_last);
clustered_column_set affected_clustered_columns_per_row{clustering_key::less_compare(*base_schema)};
one_kind_column_set affected_static_columns{base_schema->static_columns_count()};
@@ -732,12 +597,6 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
affected_clustered_columns_per_row = btch.get_affected_clustered_columns_per_row(*base_mutation.schema());
}
if (alternator_strict_compatibility && should_skip(btch, base_mutation, processor)) {
continue;
}
const bool is_last = change_ts == last_timestamp;
processor.begin_timestamp(change_ts, is_last);
if (enable_preimage) {
if (affected_static_columns.count() > 0) {
processor.produce_preimage(nullptr, affected_static_columns);
@@ -825,13 +684,7 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
}
void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) {
if (alternator_strict_compatibility) {
auto changes = extract_changes(base_mutation);
if (should_skip(changes.begin()->second, base_mutation, processor)) {
return;
}
}
bool enable_preimage, bool enable_postimage) {
auto ts = find_timestamp(base_mutation);
processor.begin_timestamp(ts, true);

View File

@@ -9,7 +9,6 @@
#pragma once
#include <boost/dynamic_bitset.hpp> // IWYU pragma: keep
#include "cdc/log.hh"
#include "replica/database_fwd.hh"
#include "mutation/timestamp.hh"
@@ -66,14 +65,12 @@ public:
// Tells processor we have reached end of record - last part
// of a given timestamp batch
virtual void end_record() = 0;
virtual const row_states_map& clustering_row_states() const = 0;
};
bool should_split(const mutation& base_mutation, const per_request_options& options);
bool should_split(const mutation& base_mutation);
void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility);
bool enable_preimage, bool enable_postimage);
void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility);
bool enable_preimage, bool enable_postimage);
}

View File

@@ -21,8 +21,5 @@ target_link_libraries(compaction
mutation_writer
replica)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(compaction REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers compaction
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -867,8 +867,8 @@ auto fmt::formatter<compaction::compaction_task_executor>::format(const compacti
namespace compaction {
inline compaction_controller make_compaction_controller(const compaction_manager::scheduling_group& csg, uint64_t static_shares, std::optional<float> max_shares, std::function<double()> fn) {
return compaction_controller(csg, static_shares, max_shares, 250ms, std::move(fn));
inline compaction_controller make_compaction_controller(const compaction_manager::scheduling_group& csg, uint64_t static_shares, std::function<double()> fn) {
return compaction_controller(csg, static_shares, 250ms, std::move(fn));
}
compaction::compaction_state::~compaction_state() {
@@ -1014,7 +1014,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
, _sys_ks("compaction_manager::system_keyspace")
, _cfg(std::move(cfg))
, _compaction_submission_timer(compaction_sg(), compaction_submission_callback())
, _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), _cfg.max_shares.get(), [this] () -> float {
, _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), [this] () -> float {
_last_backlog = backlog();
auto b = _last_backlog / available_memory();
// This means we are using an unimplemented strategy
@@ -1033,10 +1033,6 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
, _throughput_updater(serialized_action([this] { return update_throughput(throughput_mbs()); }))
, _update_compaction_static_shares_action([this] { return update_static_shares(static_shares()); })
, _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer()))
, _compaction_max_shares_observer(_cfg.max_shares.observe([this] (const float& max_shares) {
cmlog.info("Updating max shares to {}", max_shares);
_compaction_controller.set_max_shares(max_shares);
}))
, _strategy_control(std::make_unique<strategy_control>(*this))
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
@@ -1055,12 +1051,11 @@ compaction_manager::compaction_manager(tasks::task_manager& tm)
, _sys_ks("compaction_manager::system_keyspace")
, _cfg(config{ .available_memory = 1 })
, _compaction_submission_timer(compaction_sg(), compaction_submission_callback())
, _compaction_controller(make_compaction_controller(compaction_sg(), 1, std::nullopt, [] () -> float { return 1.0; }))
, _compaction_controller(make_compaction_controller(compaction_sg(), 1, [] () -> float { return 1.0; }))
, _backlog_manager(_compaction_controller)
, _throughput_updater(serialized_action([this] { return update_throughput(throughput_mbs()); }))
, _update_compaction_static_shares_action([] { return make_ready_future<>(); })
, _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer()))
, _compaction_max_shares_observer(_cfg.max_shares.observe([] (const float& max_shares) {}))
, _strategy_control(std::make_unique<strategy_control>(*this))
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);

View File

@@ -80,7 +80,6 @@ public:
scheduling_group maintenance_sched_group;
size_t available_memory = 0;
utils::updateable_value<float> static_shares = utils::updateable_value<float>(0);
utils::updateable_value<float> max_shares = utils::updateable_value<float>(0);
utils::updateable_value<uint32_t> throughput_mb_per_sec = utils::updateable_value<uint32_t>(0);
std::chrono::seconds flush_all_tables_before_major = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::days(1));
};
@@ -160,7 +159,6 @@ private:
std::optional<utils::observer<uint32_t>> _throughput_option_observer;
serialized_action _update_compaction_static_shares_action;
utils::observer<float> _compaction_static_shares_observer;
utils::observer<float> _compaction_max_shares_observer;
uint64_t _validation_errors = 0;
class strategy_control;
@@ -293,10 +291,6 @@ public:
return _cfg.static_shares.get();
}
float max_shares() const noexcept {
return _cfg.max_shares.get();
}
uint32_t throughput_mbs() const noexcept {
return _cfg.throughput_mb_per_sec.get();
}

View File

@@ -227,7 +227,7 @@ future<> run_table_tasks(replica::database& db, std::vector<table_tasks_info> ta
// Tables will be kept in descending order.
std::ranges::sort(table_tasks, std::greater<>(), [&] (const table_tasks_info& tti) {
try {
return db.find_column_family(tti.ti.id).get_stats().live_disk_space_used.on_disk;
return db.find_column_family(tti.ti.id).get_stats().live_disk_space_used;
} catch (const replica::no_such_column_family& e) {
return int64_t(-1);
}
@@ -281,7 +281,7 @@ future<> run_keyspace_tasks(replica::database& db, std::vector<keyspace_tasks_in
try {
return std::accumulate(kti.table_infos.begin(), kti.table_infos.end(), int64_t(0), [&] (int64_t sum, const table_info& t) {
try {
sum += db.find_column_family(t.id).get_stats().live_disk_space_used.on_disk;
sum += db.find_column_family(t.id).get_stats().live_disk_space_used;
} catch (const replica::no_such_column_family&) {
// ignore
}

View File

@@ -855,7 +855,7 @@ maintenance_socket: ignore
# enable_create_table_with_compact_storage: false
# Control tablets for new keyspaces.
# Can be set to: disabled|enabled|enforced
# Can be set to: disabled|enabled
#
# When enabled, newly created keyspaces will have tablets enabled by default.
# That can be explicitly disabled in the CREATE KEYSPACE query
@@ -888,18 +888,9 @@ rf_rack_valid_keyspaces: false
#
# Vector Store options
#
# HTTP and HTTPS schemes are supported. Port number is mandatory.
# If both `vector_store_primary_uri` and `vector_store_secondary_uri` are unset or empty, vector search is disabled.
#
# A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.
# A comma-separated list of URIs for the vector store using DNS name. Only HTTP schema is supported. Port number is mandatory.
# Default is empty, which means that the vector store is not used.
# vector_store_primary_uri: http://vector-store.dns.name:{port}
#
# A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.
# vector_store_secondary_uri: http://vector-store.dns.name:{port}
#
# Options for encrypted connections to the vector store. These options are used for HTTPS URIs in vector_store_primary_uri and vector_store_secondary_uri.
# vector_store_encryption_options:
# truststore: <not set, use system trust>
#
# io-streaming rate limiting

View File

@@ -445,7 +445,6 @@ ldap_tests = set([
scylla_tests = set([
'test/boost/combined_tests',
'test/boost/UUID_test',
'test/boost/url_parse_test',
'test/boost/advanced_rpc_compressor_test',
'test/boost/allocation_strategy_test',
'test/boost/alternator_unit_test',
@@ -643,30 +642,7 @@ raft_tests = set([
vector_search_tests = set([
'test/vector_search/vector_store_client_test',
'test/vector_search/load_balancer_test',
'test/vector_search/client_test'
])
vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator'
vector_search_validator_deps = set([
'test/vector_search_validator/build-validator',
'test/vector_search_validator/Cargo.toml',
'test/vector_search_validator/crates/validator/Cargo.toml',
'test/vector_search_validator/crates/validator/src/main.rs',
'test/vector_search_validator/crates/validator-scylla/Cargo.toml',
'test/vector_search_validator/crates/validator-scylla/src/lib.rs',
'test/vector_search_validator/crates/validator-scylla/src/cql.rs',
])
vector_store_bin = 'vector-search-validator/bin/vector-store'
vector_store_deps = set([
'test/vector_search_validator/build-env',
'test/vector_search_validator/build-vector-store',
])
vector_search_validator_bins = set([
vector_search_validator_bin,
vector_store_bin,
'test/vector_search/load_balancer_test'
])
wasms = set([
@@ -702,7 +678,7 @@ other = set([
'iotune',
])
all_artifacts = apps | cpp_apps | tests | other | wasms | vector_search_validator_bins
all_artifacts = apps | cpp_apps | tests | other | wasms
arg_parser = argparse.ArgumentParser('Configure scylla', add_help=False, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('--out', dest='buildfile', action='store', default='build.ninja',
@@ -786,7 +762,6 @@ arg_parser.add_argument('--use-cmake', action=argparse.BooleanOptionalAction, de
arg_parser.add_argument('--coverage', action = 'store_true', help = 'Compile scylla with coverage instrumentation')
arg_parser.add_argument('--build-dir', action='store', default='build',
help='Build directory path')
arg_parser.add_argument('--disable-precompiled-header', action='store_true', default=False, help='Disable precompiled header for scylla binary')
arg_parser.add_argument('-h', '--help', action='store_true', help='show this help message and exit')
args = arg_parser.parse_args()
if args.help:
@@ -1196,7 +1171,6 @@ scylla_core = (['message/messaging_service.cc',
'auth/allow_all_authorizer.cc',
'auth/authenticated_user.cc',
'auth/authenticator.cc',
'auth/cache.cc',
'auth/common.cc',
'auth/default_authorizer.cc',
'auth/resource.cc',
@@ -1221,7 +1195,6 @@ scylla_core = (['message/messaging_service.cc',
'table_helper.cc',
'audit/audit.cc',
'audit/audit_cf_storage_helper.cc',
'audit/audit_composite_storage_helper.cc',
'audit/audit_syslog_storage_helper.cc',
'tombstone_gc_options.cc',
'tombstone_gc.cc',
@@ -1292,9 +1265,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/disk_space_monitor.cc',
'vector_search/vector_store_client.cc',
'vector_search/dns.cc',
'vector_search/client.cc',
'vector_search/clients.cc',
'vector_search/truststore.cc'
] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core
)
@@ -1438,8 +1408,6 @@ scylla_tests_dependencies = scylla_core + alternator + idls + scylla_tests_gener
'test/lib/key_utils.cc',
'test/lib/proc_utils.cc',
'test/lib/gcs_fixture.cc',
'test/lib/aws_kms_fixture.cc',
'test/lib/azure_kms_fixture.cc',
]
scylla_raft_dependencies = scylla_raft_core + ['utils/uuid.cc', 'utils/error_injection.cc', 'utils/exceptions.cc']
@@ -1605,7 +1573,6 @@ deps['test/boost/combined_tests'] += [
'test/boost/query_processor_test.cc',
'test/boost/reader_concurrency_semaphore_test.cc',
'test/boost/repair_test.cc',
'test/boost/replicator_test.cc',
'test/boost/restrictions_test.cc',
'test/boost/role_manager_test.cc',
'test/boost/row_cache_test.cc',
@@ -1648,7 +1615,6 @@ deps['test/boost/bytes_ostream_test'] = [
]
deps['test/boost/input_stream_test'] = ['test/boost/input_stream_test.cc']
deps['test/boost/UUID_test'] = ['clocks-impl.cc', 'utils/UUID_gen.cc', 'test/boost/UUID_test.cc', 'utils/uuid.cc', 'utils/dynamic_bitset.cc', 'utils/hashers.cc', 'utils/on_internal_error.cc']
deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test.cc', ]
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']
@@ -1694,7 +1660,6 @@ deps['test/raft/discovery_test'] = ['test/raft/discovery_test.cc',
deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vector_store_client_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
wasm_deps = {}
@@ -2213,15 +2178,7 @@ if os.path.exists(kmipc_lib):
user_cflags += f' -I{kmipc_dir}/include -DHAVE_KMIP'
def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
cxxflags = [
# we need this flag for correct precompiled header handling in connection with ccache (or similar)
# `git` tools don't preserve timestamps, so when using ccache it might be possible to add pch to ccache
# and then later (after for example rebase) get `stdafx.hh` with different timestamp, but the same content.
# this will tell ccache to bring pch from its cache. Later on clang will check if timestamps match and complain.
# Adding `-fpch-validate-input-files-content` tells clang to check content of stdafx.hh if timestamps don't match.
# The flag seems to be present in gcc as well.
"" if args.disable_precompiled_header else '-fpch-validate-input-files-content'
]
cxxflags = []
optimization_level = mode_config['optimization-level']
cxxflags.append(f'-O{optimization_level}')
@@ -2286,7 +2243,6 @@ def write_build_file(f,
scylla_version,
scylla_release,
args):
use_precompiled_header = not args.disable_precompiled_header
warnings = get_warning_options(args.cxx)
rustc_target = pick_rustc_target('wasm32-wasi', 'wasm32-wasip1')
f.write(textwrap.dedent('''\
@@ -2393,10 +2349,7 @@ def write_build_file(f,
for mode in build_modes:
modeval = modes[mode]
seastar_lib_ext = 'so' if modeval['build_seastar_shared_libs'] else 'a'
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
seastar_testing_dep = f'$builddir/{mode}/seastar/libseastar_testing.{seastar_lib_ext}'
abseil_dep = ' '.join(f'$builddir/{mode}/abseil/{lib}' for lib in abseil_libs)
fmt_lib = 'fmt'
f.write(textwrap.dedent('''\
cxx_ld_flags_{mode} = {cxx_ld_flags}
@@ -2409,14 +2362,6 @@ def write_build_file(f,
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags_{mode} $cxxflags $obj_cxxflags -c -o $out $in
description = CXX $out
depfile = $out.d
rule cxx_build_precompiled_header.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags_{mode} $cxxflags $obj_cxxflags -c -o $out $in -Winvalid-pch -fpch-instantiate-templates -Xclang -emit-pch -DSCYLLA_USE_PRECOMPILED_HEADER
description = CXX-PRECOMPILED-HEADER $out
depfile = $out.d
rule cxx_with_pch.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags_{mode} $cxxflags $obj_cxxflags -c -o $out $in -Winvalid-pch -Xclang -include-pch -Xclang $builddir/{mode}/stdafx.hh.pch
description = CXX $out
depfile = $out.d
rule link.{mode}
command = $cxx $ld_flags_{mode} $ldflags -o $out $in $libs $libs_{mode}
description = LINK $out
@@ -2450,7 +2395,7 @@ def write_build_file(f,
$builddir/{mode}/gen/${{stem}}Parser.cpp
description = ANTLR3 $in
rule checkhh.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags $cxxflags_{mode} $obj_cxxflags --include $in -c -o $out $builddir/{mode}/gen/empty.cc -USCYLLA_USE_PRECOMPILED_HEADER
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags $cxxflags_{mode} $obj_cxxflags --include $in -c -o $out $builddir/{mode}/gen/empty.cc
description = CHECKHH $in
depfile = $out.d
rule test.{mode}
@@ -2464,11 +2409,10 @@ def write_build_file(f,
description = RUST_LIB $out
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, **modeval))
f.write(
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
'build {mode}-build: phony {artifacts} {wasms}\n'.format(
mode=mode,
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms - vector_search_validator_bins)]),
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms)]),
wasms = str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & wasms)]),
vector_search_validator_bins=str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & vector_search_validator_bins)]),
)
)
if profile_recipe := modes[mode].get('profile_recipe'):
@@ -2477,7 +2421,6 @@ def write_build_file(f,
include_dist_target = f'dist-{mode}' if args.enable_dist is None or args.enable_dist else ''
f.write(f'build {mode}: phony {include_cxx_target} {include_dist_target}\n')
compiles = {}
compiles_with_pch = set()
swaggers = set()
serializers = {}
ragels = {}
@@ -2492,16 +2435,16 @@ def write_build_file(f,
# object code. And we enable LTO when linking the main Scylla executable, while disable
# it when linking anything else.
seastar_lib_ext = 'so' if modeval['build_seastar_shared_libs'] else 'a'
for binary in sorted(build_artifacts):
if modeval['is_profile'] and binary != "scylla":
# Just to avoid clutter in build.ninja
continue
profile_dep = modes[mode].get('profile_target', "")
if binary in other or binary in wasms or binary in vector_search_validator_bins:
if binary in other or binary in wasms:
continue
srcs = deps[binary]
# 'scylla'
objs = ['$builddir/' + mode + '/' + src.replace('.cc', '.o')
for src in srcs
if src.endswith('.cc')]
@@ -2537,6 +2480,9 @@ def write_build_file(f,
continue
do_lto = modes[mode]['has_lto'] and binary in lto_binaries
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
seastar_testing_dep = f'$builddir/{mode}/seastar/libseastar_testing.{seastar_lib_ext}'
abseil_dep = ' '.join(f'$builddir/{mode}/abseil/{lib}' for lib in abseil_libs)
seastar_testing_libs = f'$seastar_testing_libs_{mode}'
local_libs = f'$seastar_libs_{mode} $libs'
@@ -2546,7 +2492,6 @@ def write_build_file(f,
local_libs += ' -flto=thin -ffat-lto-objects'
else:
local_libs += ' -fno-lto'
use_pch = use_precompiled_header and binary == 'scylla'
if binary in tests:
if binary in pure_boost_tests:
local_libs += ' ' + maybe_static(args.staticboost, '-lboost_unit_test_framework')
@@ -2575,8 +2520,6 @@ def write_build_file(f,
if src.endswith('.cc'):
obj = '$builddir/' + mode + '/' + src.replace('.cc', '.o')
compiles[obj] = src
if use_pch:
compiles_with_pch.add(obj)
elif src.endswith('.idl.hh'):
hh = '$builddir/' + mode + '/gen/' + src.replace('.idl.hh', '.dist.hh')
serializers[hh] = src
@@ -2609,11 +2552,10 @@ def write_build_file(f,
)
f.write(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms} {vector_search_validator_bins} \n'.format(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms}\n'.format(
mode=mode,
test_executables=' '.join(['$builddir/{}/{}'.format(mode, binary) for binary in sorted(tests)]),
wasms=' '.join([f'$builddir/{binary}' for binary in sorted(wasms)]),
vector_search_validator_bins=' '.join([f'$builddir/{binary}' for binary in sorted(vector_search_validator_bins)]),
)
)
f.write(
@@ -2656,9 +2598,7 @@ def write_build_file(f,
src = compiles[obj]
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
abseil_dep = ' '.join(f'$builddir/{mode}/abseil/{lib}' for lib in abseil_libs)
pch_dep = f'$builddir/{mode}/stdafx.hh.pch' if obj in compiles_with_pch else ''
cxx_cmd = 'cxx_with_pch' if obj in compiles_with_pch else 'cxx'
f.write(f'build {obj}: {cxx_cmd}.{mode} {src} | {profile_dep} {seastar_dep} {abseil_dep} {gen_headers_dep} {pch_dep}\n')
f.write(f'build {obj}: cxx.{mode} {src} | {profile_dep} || {seastar_dep} {abseil_dep} {gen_headers_dep}\n')
if src in modeval['per_src_extra_cxxflags']:
f.write(' cxxflags = {seastar_cflags} $cxxflags $cxxflags_{mode} {extra_cxxflags}\n'.format(mode=mode, extra_cxxflags=modeval["per_src_extra_cxxflags"][src], **modeval))
for swagger in swaggers:
@@ -2719,8 +2659,6 @@ def write_build_file(f,
f.write(' target = {lib}\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
f.write(f'build $builddir/{mode}/stdafx.hh.pch: cxx_build_precompiled_header.{mode} stdafx.hh | {profile_dep} {seastar_dep} {abseil_dep} {gen_headers_dep} {pch_dep}\n')
f.write('build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
@@ -2784,19 +2722,6 @@ def write_build_file(f,
'build compiler-training: phony {}\n'.format(' '.join(['{mode}-compiler-training'.format(mode=mode) for mode in default_modes]))
)
f.write(textwrap.dedent(f'''\
rule build-vector-search-validator
command = test/vector_search_validator/build-validator $builddir
rule build-vector-store
command = test/vector_search_validator/build-vector-store $builddir
'''))
f.write(
'build $builddir/{vector_search_validator_bin}: build-vector-search-validator {}\n'.format(' '.join([dep for dep in sorted(vector_search_validator_deps)]), vector_search_validator_bin=vector_search_validator_bin)
)
f.write(
'build $builddir/{vector_store_bin}: build-vector-store {}\n'.format(' '.join([dep for dep in sorted(vector_store_deps)]), vector_store_bin=vector_store_bin)
)
f.write(textwrap.dedent(f'''\
build dist-unified-tar: phony {' '.join([f'$builddir/{mode}/dist/tar/{scylla_product}-unified-{scylla_version}-{scylla_release}.{arch}.tar.gz' for mode in default_modes])}
build dist-unified: phony dist-unified-tar
@@ -3010,7 +2935,7 @@ def configure_using_cmake(args):
'CMAKE_DEFAULT_CONFIGS': selected_configs,
'CMAKE_C_COMPILER': args.cc,
'CMAKE_CXX_COMPILER': args.cxx,
'CMAKE_CXX_FLAGS': args.user_cflags + ("" if args.disable_precompiled_header else " -fpch-validate-input-files-content"),
'CMAKE_CXX_FLAGS': args.user_cflags,
'CMAKE_EXE_LINKER_FLAGS': args.user_ldflags,
'CMAKE_EXPORT_COMPILE_COMMANDS': 'ON',
'Scylla_CHECK_HEADERS': 'ON',
@@ -3019,7 +2944,6 @@ def configure_using_cmake(args):
'Scylla_TEST_REPEAT': args.test_repeat,
'Scylla_ENABLE_LTO': 'ON' if args.lto else 'OFF',
'Scylla_WITH_DEBUG_INFO' : 'ON' if args.debuginfo else 'OFF',
'Scylla_USE_PRECOMPILED_HEADER': 'OFF' if args.disable_precompiled_header else 'ON',
}
if args.date_stamp:
settings['Scylla_DATE_STAMP'] = args.date_stamp

View File

@@ -138,8 +138,5 @@ target_link_libraries(cql3
lang
transport)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(cql3 REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers cql3
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -575,15 +575,6 @@ usingTimeoutServiceLevelClauseObjective[std::unique_ptr<cql3::attributes::raw>&
| serviceLevel sl_name=serviceLevelOrRoleName { attrs->service_level = std::move(sl_name); }
;
usingTimeoutConcurrencyClause[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_USING usingTimeoutConcurrencyClauseObjective[attrs] ( K_AND usingTimeoutConcurrencyClauseObjective[attrs] )*
;
usingTimeoutConcurrencyClauseObjective[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_TIMEOUT to=term { attrs->timeout = std::move(to); }
| K_CONCURRENCY c=term { attrs->concurrency = std::move(c); }
;
/**
* UPDATE <CF>
* USING TIMESTAMP <long>
@@ -675,7 +666,7 @@ pruneMaterializedViewStatement returns [std::unique_ptr<raw::select_statement> e
auto attrs = std::make_unique<cql3::attributes::raw>();
expression wclause = conjunction{};
}
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingTimeoutConcurrencyClause[attrs] )?
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingClause[attrs] )?
{
auto params = make_lw_shared<raw::select_statement::parameters>(std::move(orderings), is_distinct, allow_filtering, statement_subtype, bypass_cache);
return std::make_unique<raw::select_statement>(std::move(cf), std::move(params),
@@ -1569,10 +1560,6 @@ serviceLevelOrRoleName returns [sstring name]
| t=QUOTED_NAME { $name = sstring($t.text); }
| k=unreserved_keyword { $name = k;
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower);}
// The literal `default` will not be parsed by any of the previous
// rules, so we need to cover it manually. Needed by CREATE SERVICE
// LEVEL and ATTACH SERVICE LEVEL.
| t=K_DEFAULT { $name = sstring("default"); }
| QMARK {add_recognition_error("Bind variables cannot be used for service levels or role names");}
;
@@ -2379,7 +2366,6 @@ K_LIKE: L I K E;
K_TIMEOUT: T I M E O U T;
K_PRUNE: P R U N E;
K_CONCURRENCY: C O N C U R R E N C Y;
K_EXECUTE: E X E C U T E;

View File

@@ -20,21 +20,19 @@
namespace cql3 {
std::unique_ptr<attributes> attributes::none() {
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}, {}}};
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}}};
}
attributes::attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency)
std::optional<sstring> service_level)
: _timestamp_unset_guard(timestamp)
, _timestamp{std::move(timestamp)}
, _time_to_live_unset_guard(time_to_live)
, _time_to_live{std::move(time_to_live)}
, _timeout{std::move(timeout)}
, _service_level(std::move(service_level))
, _concurrency{std::move(concurrency)}
{ }
bool attributes::is_timestamp_set() const {
@@ -53,10 +51,6 @@ bool attributes::is_service_level_set() const {
return bool(_service_level);
}
bool attributes::is_concurrency_set() const {
return bool(_concurrency);
}
int64_t attributes::get_timestamp(int64_t now, const query_options& options) {
if (!_timestamp.has_value() || _timestamp_unset_guard.is_unset(options)) {
return now;
@@ -129,27 +123,6 @@ qos::service_level_options attributes::get_service_level(qos::service_level_cont
return sl_controller.get_service_level(sl_name).slo;
}
std::optional<int32_t> attributes::get_concurrency(const query_options& options) const {
if (!_concurrency.has_value()) {
return std::nullopt;
}
cql3::raw_value concurrency_raw = expr::evaluate(*_concurrency, options);
if (concurrency_raw.is_null()) {
throw exceptions::invalid_request_exception("Invalid null value of concurrency");
}
int32_t concurrency;
try {
concurrency = concurrency_raw.view().validate_and_deserialize<int32_t>(*int32_type);
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception("Invalid concurrency value");
}
if (concurrency <= 0) {
throw exceptions::invalid_request_exception("Concurrency must be a positive integer");
}
return concurrency;
}
void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timestamp.has_value()) {
expr::fill_prepare_context(*_timestamp, ctx);
@@ -160,13 +133,10 @@ void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timeout.has_value()) {
expr::fill_prepare_context(*_timeout, ctx);
}
if (_concurrency.has_value()) {
expr::fill_prepare_context(*_concurrency, ctx);
}
}
std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const {
std::optional<expr::expression> ts, ttl, to, conc;
std::optional<expr::expression> ts, ttl, to;
if (timestamp.has_value()) {
ts = prepare_expression(*timestamp, db, ks_name, nullptr, timestamp_receiver(ks_name, cf_name));
@@ -183,12 +153,7 @@ std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database d
verify_no_aggregate_functions(*timeout, "USING clause");
}
if (concurrency.has_value()) {
conc = prepare_expression(*concurrency, db, ks_name, nullptr, concurrency_receiver(ks_name, cf_name));
verify_no_aggregate_functions(*concurrency, "USING clause");
}
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level), std::move(conc)}};
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level)}};
}
lw_shared_ptr<column_specification> attributes::raw::timestamp_receiver(const sstring& ks_name, const sstring& cf_name) const {
@@ -203,8 +168,4 @@ lw_shared_ptr<column_specification> attributes::raw::timeout_receiver(const sstr
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[timeout]", true), duration_type);
}
lw_shared_ptr<column_specification> attributes::raw::concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const {
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[concurrency]", true), data_type_for<int32_t>());
}
}

View File

@@ -36,15 +36,13 @@ private:
std::optional<cql3::expr::expression> _time_to_live;
std::optional<cql3::expr::expression> _timeout;
std::optional<sstring> _service_level;
std::optional<cql3::expr::expression> _concurrency;
public:
static std::unique_ptr<attributes> none();
private:
attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency);
std::optional<sstring> service_level);
public:
bool is_timestamp_set() const;
@@ -54,8 +52,6 @@ public:
bool is_service_level_set() const;
bool is_concurrency_set() const;
int64_t get_timestamp(int64_t now, const query_options& options);
std::optional<int32_t> get_time_to_live(const query_options& options);
@@ -64,8 +60,6 @@ public:
qos::service_level_options get_service_level(qos::service_level_controller& sl_controller) const;
std::optional<int32_t> get_concurrency(const query_options& options) const;
void fill_prepare_context(prepare_context& ctx);
class raw final {
@@ -74,7 +68,6 @@ public:
std::optional<cql3::expr::expression> time_to_live;
std::optional<cql3::expr::expression> timeout;
std::optional<sstring> service_level;
std::optional<cql3::expr::expression> concurrency;
std::unique_ptr<attributes> prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const;
private:
@@ -83,8 +76,6 @@ public:
lw_shared_ptr<column_specification> time_to_live_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> timeout_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const;
};
};

View File

@@ -1349,7 +1349,7 @@ static managed_bytes reserialize_value(View value_bytes,
if (type.is_map()) {
std::vector<std::pair<managed_bytes, managed_bytes>> elements = partially_deserialize_map(value_bytes);
const map_type_impl& mapt = dynamic_cast<const map_type_impl&>(type);
const map_type_impl mapt = dynamic_cast<const map_type_impl&>(type);
const abstract_type& key_type = mapt.get_keys_type()->without_reversed();
const abstract_type& value_type = mapt.get_values_type()->without_reversed();
@@ -1391,7 +1391,7 @@ static managed_bytes reserialize_value(View value_bytes,
const vector_type_impl& vtype = dynamic_cast<const vector_type_impl&>(type);
std::vector<managed_bytes> elements = vtype.split_fragmented(value_bytes);
const auto& elements_type = vtype.get_elements_type()->without_reversed();
auto elements_type = vtype.get_elements_type()->without_reversed();
if (elements_type.bound_value_needs_to_be_reserialized()) {
for (size_t i = 0; i < elements.size(); i++) {

View File

@@ -37,12 +37,6 @@ future<::shared_ptr<cql_transport::messages::result_message>>
alter_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &, std::optional<service::group0_guard> guard) const {
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, cannot be altered",
qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
service::group0_batch mc{std::move(guard)};
validate_shares_option(qp, _slo);
qos::service_level& sl = state.get_service_level_controller().get_service_level(_service_level);

View File

@@ -422,14 +422,7 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
throw exceptions::invalid_request_exception(format("The synchronous_updates option is only applicable to materialized views, not to base tables"));
}
if (is_cdc_log_table) {
auto gc_opts = _properties->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on CDC log tables.");
}
}
_properties->apply_to_builder(cfm, std::move(schema_extensions), db, keyspace(), !is_cdc_log_table);
_properties->apply_to_builder(cfm, std::move(schema_extensions), db, keyspace());
}
break;

View File

@@ -55,29 +55,8 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
_properties->validate(db, keyspace(), schema_extensions);
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
return false;
}
auto base_schema = db.find_schema(schema->view_info()->base_id());
if (!base_schema) {
return false;
}
return std::ranges::equal(
schema->partition_key_columns(),
base_schema->partition_key_columns(),
[](const column_definition& a, const column_definition& b) { return a.name() == b.name(); });
}();
if (is_colocated) {
auto gc_opts = _properties->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables.");
}
}
auto builder = schema_builder(schema);
_properties->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
_properties->apply_to_builder(builder, std::move(schema_extensions), db, keyspace());
if (builder.get_gc_grace_seconds() == 0) {
throw exceptions::invalid_request_exception(

View File

@@ -43,14 +43,6 @@ attach_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &,
std::optional<service::group0_guard> guard) const {
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, cannot be "
"attached to a role. If you want to detach an attached service level, "
"use the DETACH SERVICE LEVEL statement",
qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
auto sli = co_await state.get_service_level_controller().get_distributed_service_level(_service_level);
if (sli.empty()) {
throw qos::nonexistant_service_level_exception(_service_level);

View File

@@ -145,7 +145,9 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
}
compression_parameters cp(*compression_options);
cp.validate(compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)));
cp.validate(
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
}
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);
@@ -293,7 +295,7 @@ std::optional<db::tablet_options::map_type> cf_prop_defs::get_tablet_options() c
return std::nullopt;
}
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name, bool supports_repair) const {
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name) const {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));
}
@@ -379,7 +381,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
}
// Set default tombstone_gc mode.
if (!schema_extensions.contains(tombstone_gc_extension::NAME)) {
auto ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, ks_name, supports_repair));
auto ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, ks_name));
schema_extensions.emplace(tombstone_gc_extension::NAME, std::move(ext));
}
builder.set_extensions(std::move(schema_extensions));

View File

@@ -110,7 +110,7 @@ public:
bool get_synchronous_updates_flag() const;
std::optional<db::tablet_options::map_type> get_tablet_options() const;
void apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name, bool supports_repair) const;
void apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name) const;
void validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const;
};

View File

@@ -201,14 +201,7 @@ view_ptr create_index_statement::create_view_for_index(const schema_ptr schema,
"";
builder.with_view_info(schema, false, where_clause);
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
return false;
}
return im.local();
}();
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, schema->ks_name(), !is_colocated));
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, schema->ks_name()));
builder.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
// A local secondary index should be backed by a *synchronous* view,
@@ -279,15 +272,11 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
}
// Regular secondary indexes require rf-rack-validity.
// Custom indexes need to validate this property themselves, if they need it.
if (!_properties || !_properties->custom_class) {
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
validate_for_local_index(*schema);
@@ -303,7 +292,7 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("Non-supported custom class \'{}\' provided", *(_properties->custom_class)));
}
auto custom_index = (*custom_index_factory)();
custom_index->validate(*schema, *_properties, targets, db.features(), db);
custom_index->validate(*schema, *_properties, targets, db.features());
_properties->index_version = custom_index->index_version(*schema);
}

View File

@@ -112,8 +112,14 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
ksm->strategy_name(),
locator::replication_strategy_params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option()),
tmptr->get_topology());
if (rs->uses_tablets() && ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
if (rs->uses_tablets()) {
warnings.push_back(
"Tables in this keyspace will be replicated using Tablets "
"and will not support counters features. To use counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
if (ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
}
}
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to create an RF-rack-invalid keyspace.

View File

@@ -45,12 +45,6 @@ create_service_level_statement::execute(query_processor& qp,
throw exceptions::invalid_request_exception("Names starting with '$' are reserved for internal tenants. Use a different name.");
}
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, already exists "
"and cannot be created", qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
service::group0_batch mc{std::move(guard)};
validate_shares_option(qp, _slo);

View File

@@ -128,7 +128,7 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
builder.set_compressor_params(db.get_config().sstable_compression_user_table_options());
}
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true);
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace());
}
void create_table_statement::add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, const std::vector<data_type>& types, column_kind kind) const
@@ -222,7 +222,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
throw exceptions::invalid_request_exception("Cannot set default_time_to_live on a table with counters");
}
if (ks_uses_tablets && pt.is_counter() && !db.features().counters_with_tablets) {
if (ks_uses_tablets && pt.is_counter()) {
throw exceptions::invalid_request_exception(format("Cannot use the 'counter' type for table {}.{}: Counters are not yet supported with tablets", keyspace(), cf_name));
}

View File

@@ -373,30 +373,7 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
db::view::create_virtual_column(builder, def->name(), def->type);
}
}
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
return false;
}
if (target_partition_keys.size() != schema->partition_key_columns().size()) {
return false;
}
for (size_t i = 0; i < target_partition_keys.size(); ++i) {
if (target_partition_keys[i] != &schema->partition_key_columns()[i]) {
return false;
}
}
return true;
}();
if (is_colocated) {
auto gc_opts = _properties.properties()->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables.");
}
}
_properties.properties()->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
_properties.properties()->apply_to_builder(builder, std::move(schema_extensions), db, keyspace());
if (builder.default_time_to_live().count() > 0) {
throw exceptions::invalid_request_exception(

View File

@@ -34,11 +34,6 @@ drop_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &,
std::optional<service::group0_guard> guard) const {
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, cannot be dropped",
qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
service::group0_batch mc{std::move(guard)};
auto& sl = state.get_service_level_controller();
co_await sl.drop_distributed_service_level(_service_level, _if_exists, mc);

View File

@@ -8,7 +8,6 @@
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "seastar/core/format.hh"
#include "seastar/core/sstring.hh"
#include "utils/assert.hh"
#include "cql3/statements/ks_prop_defs.hh"
@@ -114,17 +113,6 @@ static locator::replication_strategy_config_options prepare_options(
return options;
}
if (uses_tablets) {
for (const auto& opt: old_options) {
if (opt.first == ks_prop_defs::REPLICATION_FACTOR_KEY) {
on_internal_error(logger, format("prepare_options: old_options contains invalid key '{}'", ks_prop_defs::REPLICATION_FACTOR_KEY));
}
if (!options.contains(opt.first)) {
throw exceptions::configuration_exception(fmt::format("Attempted to implicitly drop replicas in datacenter {}. If this is the desired behavior, set replication factor to 0 in {} explicitly.", opt.first, opt.first));
}
}
}
// For users' convenience, expand the 'replication_factor' option into a replication factor for each DC.
// If the user simply switches from another strategy without providing any options,
// but the other strategy used the 'replication_factor' option, it will also be expanded.

View File

@@ -21,7 +21,7 @@ namespace cql3 {
namespace statements {
static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, std::vector<query::clustering_range> clustering_bounds, view_ptr view,
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration, size_t concurrency) {
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration) {
auto key_columns = std::ranges::to<std::vector<const column_definition*>>(
view->all_columns()
| std::views::filter([] (const column_definition& cdef) { return cdef.is_primary_key(); })
@@ -35,7 +35,7 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
auto p = service::pager::query_pagers::ghost_row_deleting_pager(schema_ptr(view), selection, state,
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration, concurrency);
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration);
int32_t page_size = std::max(options.get_page_size(), 1000);
auto now = gc_clock::now();
@@ -62,8 +62,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> prune_materialized
auto timeout_duration = get_timeout(state.get_client_state(), options);
dht::partition_range_vector key_ranges = _restrictions->get_partition_key_ranges(options);
std::vector<query::clustering_range> clustering_bounds = _restrictions->get_clustering_bounds(options);
size_t concurrency = _attrs->is_concurrency_set() ? _attrs->get_concurrency(options).value() : 1;
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration, concurrency).then([] {
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(::make_shared<cql_transport::messages::result_message::void_message>());
});
}

View File

@@ -2016,9 +2016,7 @@ vector_indexed_table_select_statement::vector_indexed_table_select_statement(sch
future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::do_execute(
query_processor& qp, service::query_state& state, const query_options& options) const {
auto limit = get_limit(options, _limit);
auto result = co_await measure_index_latency(*_schema, _index, [this, &qp, &state, &options, &limit](this auto) -> future<shared_ptr<cql_transport::messages::result_message>> {
return measure_index_latency(*_schema, _index, [this, &qp, &state, &options](this auto) -> future<shared_ptr<cql_transport::messages::result_message>> {
tracing::add_table_name(state.get_trace_state(), keyspace(), column_family());
validate_for_read(options.get_consistency());
@@ -2026,28 +2024,22 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
update_stats();
auto limit = get_limit(options, _limit);
if (limit > max_ann_query_limit) {
co_await coroutine::return_exception(exceptions::invalid_request_exception(
fmt::format("Use of ANN OF in an ORDER BY clause requires a LIMIT that is not greater than {}. LIMIT was {}", max_ann_query_limit, limit)));
}
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
auto aoe = abort_on_expiry(timeout);
auto pkeys = co_await qp.vector_store_client().ann(
_schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), limit, aoe.abort_source());
auto as = abort_source();
auto pkeys = co_await qp.vector_store_client().ann(_schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), limit, as);
if (!pkeys.has_value()) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception(std::visit(vector_search::vector_store_client::ann_error_visitor{}, pkeys.error())));
}
co_return co_await query_base_table(qp, state, options, pkeys.value(), timeout);
co_return co_await query_base_table(qp, state, options, pkeys.value());
});
auto page_size = options.get_page_size();
if (page_size > 0 && (uint64_t) page_size < limit) {
result->add_warning("Paging is not supported for Vector Search queries. The entire result set has been returned.");
}
co_return result;
}
void vector_indexed_table_select_statement::update_stats() const {
@@ -2075,10 +2067,10 @@ std::vector<float> vector_indexed_table_select_statement::get_ann_ordering_vecto
return util::to_vector<float>(values);
}
future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::query_base_table(query_processor& qp,
service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys,
lowres_clock::time_point timeout) const {
future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::query_base_table(
query_processor& qp, service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys) const {
auto command = prepare_command_for_base_query(qp, state, options);
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
// For tables without clustering columns, we can optimize by querying
// partition ranges instead of individual primary keys, since the

View File

@@ -389,8 +389,8 @@ private:
std::vector<float> get_ann_ordering_vector(const query_options& options) const;
future<::shared_ptr<cql_transport::messages::result_message>> query_base_table(query_processor& qp, service::query_state& state,
const query_options& options, const std::vector<vector_search::primary_key>& pkeys, lowres_clock::time_point timeout) const;
future<::shared_ptr<cql_transport::messages::result_message>> query_base_table(
query_processor& qp, service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys) const;
future<::shared_ptr<cql_transport::messages::result_message>> query_base_table(query_processor& qp, service::query_state& state,
const query_options& options, lw_shared_ptr<query::read_command> command, lowres_clock::time_point timeout,

View File

@@ -12,8 +12,5 @@ target_link_libraries(data_dictionary
Seastar::seastar
xxHash::xxhash)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(data_dictionary REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers data_dictionary
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -60,8 +60,5 @@ target_link_libraries(db
data_dictionary
cql3)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(db REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers db
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -77,11 +77,9 @@ future<db::all_batches_replayed> db::batchlog_manager::do_batch_log_replay(post_
});
});
}
if (all_replayed == all_batches_replayed::yes) {
co_await bm.container().invoke_on_all([last_replay] (auto& bm) {
bm._last_replay = last_replay;
});
}
co_await bm.container().invoke_on_all([last_replay] (auto& bm) {
bm._last_replay = last_replay;
});
blogger.debug("Batchlog replay on shard {}: done", dest);
co_return all_replayed;
});
@@ -190,7 +188,6 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
all_replayed = all_batches_replayed::no;
co_return stop_iteration::no;
}

View File

@@ -3329,6 +3329,7 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
commit_load_reader_func func;
input_stream<char> fin;
replay_state::impl& state;
input_stream<char> r;
uint64_t id = 0;
size_t pos = 0;
size_t next = 0;
@@ -3461,15 +3462,12 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
clogger.debug("Read {} bytes of data ({}, {})", size, pos, rem);
while (rem < size) {
const auto initial_size = initial.size_bytes();
if (eof) {
auto reason = fmt::format("unexpected EOF, pos={}, rem={}, size={}, alignment={}, initial_size={}",
pos, rem, size, alignment, initial_size);
auto reason = fmt::format("unexpected EOF, rem={}, size={}", rem, size);
throw segment_truncation(std::move(reason), block_boundry);
}
auto block_size = alignment - initial_size;
auto block_size = alignment - initial.size_bytes();
// using a stream is perhaps not 100% effective, but we need to
// potentially address data in pages smaller than the current
// disk/fs we are reading from can handle (but please no).
@@ -3477,9 +3475,8 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
if (tmp.size_bytes() == 0) {
eof = true;
auto reason = fmt::format("read 0 bytes, while tried to read {} bytes. "
"pos={}, rem={}, size={}, alignment={}, initial_size={}",
block_size, pos, rem, size, alignment, initial_size);
auto reason = fmt::format("read 0 bytes, while tried to read {} bytes. rem={}, size={}",
block_size, rem, size);
throw segment_truncation(std::move(reason), block_boundry);
}
@@ -3515,13 +3512,13 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
auto checksum = crc.checksum();
if (check != checksum) {
auto reason = fmt::format("checksums do not match: {:x} vs. {:x}. pos={}, rem={}, size={}, alignment={}, initial_size={}",
check, checksum, pos, rem, size, alignment, initial_size);
auto reason = fmt::format("checksums do not match: {:x} vs. {:x}. rem={}, size={}",
check, checksum, rem, size);
throw segment_data_corruption_error(std::move(reason), alignment);
}
if (id != this->id) {
auto reason = fmt::format("IDs do not match: {} vs. {}. pos={}, rem={}, size={}, alignment={}, initial_size={}",
id, this->id, pos, rem, size, alignment, initial_size);
auto reason = fmt::format("IDs do not match: {} vs. {}. rem={}, size={}",
id, this->id, rem, size);
throw segment_truncation(std::move(reason), pos + rem);
}
}
@@ -3630,10 +3627,6 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
auto old = pos;
pos = next_pos(off);
clogger.trace("Pos {} -> {} ({})", old, pos, off);
// #24346 check eof status whenever we move file pos.
if (pos >= file_size) {
eof = true;
}
}
future<> read_entry() {

View File

@@ -165,7 +165,7 @@ future<> db::commitlog_replayer::impl::init() {
future<db::commitlog_replayer::impl::stats>
db::commitlog_replayer::impl::recover(const commitlog::descriptor& d, const commitlog::replay_state& rpstate) const {
scylla_assert(_column_mappings.local_is_initialized());
SCYLLA_ASSERT(_column_mappings.local_is_initialized());
replay_position rp{d};
auto gp = min_pos(rp.shard_id());

View File

@@ -36,7 +36,6 @@
#include "sstables/compressor.hh"
#include "utils/log.hh"
#include "service/tablet_allocator_fwd.hh"
#include "backlog_controller_fwd.hh"
#include "utils/config_file_impl.hh"
#include "exceptions/exceptions.hh"
#include <seastar/core/metrics_api.hh>
@@ -631,8 +630,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
@@ -1038,9 +1035,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Controls whether traffic between nodes is compressed. The valid values are:\n"
"* all: All traffic is compressed.\n"
"* dc : Traffic between data centers is compressed.\n"
"* rack : Traffic between racks is compressed.\n"
"* none : No compression.",
{"all", "dc", "rack", "none"})
{"all", "dc", "none"})
, internode_compression_zstd_max_cpu_fraction(this, "internode_compression_zstd_max_cpu_fraction", liveness::LiveUpdate, value_status::Used, 0.000,
"ZSTD compression of RPC will consume at most this fraction of each internode_compression_zstd_quota_refresh_period_ms time slice.\n"
"If you wish to try out zstd for RPC compression, 0.05 is a reasonable starting point.")
@@ -1172,17 +1168,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"* default_weight: (Default: 1 **) How many requests are handled during each turn of the RoundRobin.\n"
"* weights: (Default: Keyspace: 1) Takes a list of keyspaces. It sets how many requests are handled during each turn of the RoundRobin, based on the request_scheduler_id.")
/**
* @Group Vector search settings
* @GroupDescription Settings for configuring and tuning vector search functionality.
*/
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
/**
* @Group Security properties
* @GroupDescription Server and client security settings.
*/
@@ -1330,15 +1315,15 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, sstable_format(this, "sstable_format", liveness::LiveUpdate, value_status::Used, "me", "Default sstable file format", {"md", "me", "ms"})
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{},
"Server-global user table compression options. If enabled, all user tables"
"will be compressed using the provided options, unless overridden"
"by compression options in the table schema. The available options are:\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor (default), LZ4WithDictsCompressor, SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
"* compression_level: (Default: 3) Compression level for ZstdCompressor and ZstdWithDictsCompressor. Higher levels provide better compression ratios at the cost of speed. Allowed values are integers between 1 and 22.")
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Deprecated, true,
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
@@ -1440,15 +1425,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.")
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.")
, alternator_streams_increased_compatibility(this, "alternator_streams_increased_compatibility", liveness::LiveUpdate, value_status::Used, false,
"Increases compatibility with DynamoDB Streams at the cost of performance. "
"If enabled, Alternator compares the existing item with the new one during "
"data-modifying operations to determine which event type should be emitted. "
"This penalty is incurred only for tables with Alternator Streams enabled.")
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
"The server-side timeout for completing Alternator API requests.")
, alternator_ttl_period_in_seconds(this, "alternator_ttl_period_in_seconds", value_status::Used,
@@ -1470,6 +1449,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "", "A comma-separated list of vector store node URIs. If not set, vector search is disabled.")
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
@@ -1545,9 +1525,9 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, error_injections_at_startup(this, "error_injections_at_startup", error_injection_value_status, {}, "List of error injections that should be enabled on startup.")
, topology_barrier_stall_detector_threshold_seconds(this, "topology_barrier_stall_detector_threshold_seconds", value_status::Used, 2, "Report sites blocking topology barrier if it takes longer than this.")
, enable_tablets(this, "enable_tablets", value_status::Used, false, "Enable tablets for newly created keyspaces. (deprecated)")
, tablets_mode_for_new_keyspaces(this, "tablets_mode_for_new_keyspaces", liveness::LiveUpdate, value_status::Used, tablets_mode_t::mode::unset, "Control tablets for new keyspaces. Can be set to the following values:\n"
, tablets_mode_for_new_keyspaces(this, "tablets_mode_for_new_keyspaces", value_status::Used, tablets_mode_t::mode::unset, "Control tablets for new keyspaces. Can be set to the following values:\n"
"\tdisabled: New keyspaces use vnodes by default, unless enabled by the tablets={'enabled':true} option\n"
"\tenabled: New keyspaces use tablets by default, unless disabled by the tablets={'enabled':false} option\n"
"\tenabled: New keyspaces use tablets by default, unless disabled by the tablets={'disabled':true} option\n"
"\tenforced: New keyspaces must use tablets. Tablets cannot be disabled using the CREATE KEYSPACE option")
, view_flow_control_delay_limit_in_ms(this, "view_flow_control_delay_limit_in_ms", liveness::LiveUpdate, value_status::Used, 1000,
"The maximal amount of time that materialized-view update flow control may delay responses "

View File

@@ -189,7 +189,6 @@ public:
named_value<bool> auto_adjust_flush_quota;
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<float> compaction_max_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
named_value<sstring> cluster_name;
@@ -344,9 +343,6 @@ public:
named_value<sstring> request_scheduler;
named_value<sstring> request_scheduler_id;
named_value<string_map> request_scheduler_options;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<sstring> authenticator;
named_value<sstring> internode_authenticator;
named_value<sstring> authorizer;
@@ -462,10 +458,8 @@ public:
named_value<uint16_t> alternator_https_port;
named_value<sstring> alternator_address;
named_value<bool> alternator_enforce_authorization;
named_value<bool> alternator_warn_authorization;
named_value<sstring> alternator_write_isolation;
named_value<uint32_t> alternator_streams_time_window_s;
named_value<bool> alternator_streams_increased_compatibility;
named_value<uint32_t> alternator_timeout_in_ms;
named_value<double> alternator_ttl_period_in_seconds;
named_value<sstring> alternator_describe_endpoints;
@@ -474,6 +468,8 @@ public:
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
named_value<sstring> vector_store_primary_uri;
named_value<bool> abort_on_ebadf;
named_value<bool> sanitizer_report_backtrace;

Some files were not shown because too many files have changed in this diff Show More