Compare commits

..

1 Commits

Author SHA1 Message Date
Yaron Kaikov
dcbc6c839d ./github/scripts/auto-backport.py: don't remove backport label when backport process has an error
Today, when the `Fixes` prefix is missing or the developer is not a collaborator with `scylladbbot` we remove the backport labels to prevent the process from starting and notifying the developers.

Developers are worried that removing these backport labels will cause us to forget we need to do these backports. @nyh suggested to add a `scylladbbot/backport_error` label instead

Applied those changes, so when a `Fixes` prefix is missing we will add a `scylladbbot/backport_error` label and stop the process

When a user doesn't accept the invite we will still open the PR but he will not be assigned and will not be able to edit the branch when we have conflicts

Fixes: https://github.com/scylladb/scylla-pkg/issues/4898
Fixes: https://github.com/scylladb/scylla-pkg/issues/4897
2025-03-18 13:58:59 +02:00
2097 changed files with 39813 additions and 147547 deletions

14
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall @ptrsmrn
auth/* @nuivall @ptrsmrn @KrzaQ
# CACHE
row_cache* @tgrabiec
@@ -25,15 +25,15 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall @ptrsmrn
cql3/* @tgrabiec @nuivall @ptrsmrn @KrzaQ
# COUNTERS
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
counters* @nuivall @ptrsmrn @KrzaQ
tests/counter_test* @nuivall @ptrsmrn @KrzaQ
# DOCS
docs/* @annastuchlik @tzach
docs/alternator @annastuchlik @tzach @nyh
docs/alternator @annastuchlik @tzach @nyh @nuivall @ptrsmrn @KrzaQ
# GOSSIP
gms/* @tgrabiec @asias @kbr-scylla
@@ -74,8 +74,8 @@ streaming/* @tgrabiec @asias
service/storage_service.* @tgrabiec @asias
# ALTERNATOR
alternator/* @nyh
test/alternator/* @nyh
alternator/* @nyh @nuivall @ptrsmrn @KrzaQ
test/alternator/* @nyh @nuivall @ptrsmrn @KrzaQ
# HINTED HANDOFF
db/hints/* @piodul @vladzcloudius @eliransin

View File

@@ -1,86 +1,15 @@
name: "Report a bug"
description: "File a bug report."
title: "[Bug]: "
type: "bug"
labels: bug
body:
- type: checkboxes
id: terms
attributes:
label: Code of Conduct
description: "This is Scylla's bug tracker, to be used for reporting bugs only.
This is Scylla's bug tracker, to be used for reporting bugs only.
If you have a question about Scylla, and not a bug, please ask it in
our forum at https://forum.scylladb.com/ or in our slack channel https://slack.scylladb.com/ "
options:
- label: I have read the disclaimer above and am reporting a suspected malfunction in Scylla.
required: true
our mailing-list at scylladb-dev@googlegroups.com or in our slack channel.
- type: input
id: product-version
attributes:
label: product version
description: Scylla version (or git commit hash)
placeholder: ex. scylla-6.1.1
validations:
required: true
- type: input
id: cluster-size
attributes:
label: Cluster Size
validations:
required: true
- type: input
id: os
attributes:
label: OS
placeholder: RHEL/CentOS/Ubuntu/AWS AMI
validations:
required: true
- type: textarea
id: additional-data
attributes:
label: Additional Environmental Data
#description:
placeholder: Add additional data
value: "Platform (physical/VM/cloud instance type/docker):\n
Hardware: sockets= cores= hyperthreading= memory=\n
Disks: (SSD/HDD, count)"
validations:
required: false
- type: textarea
id: reproducer-steps
attributes:
label: Reproduction Steps
placeholder: Describe how to reproduce the problem
value: "The steps to reproduce the problem are:"
validations:
required: true
- type: textarea
id: the-problem
attributes:
label: What is the problem?
placeholder: Describe the problem you found
value: "The problem is that"
validations:
required: true
- type: textarea
id: what-happened
attributes:
label: Expected behavior?
placeholder: Describe what should have happened
value: "I expected that "
validations:
required: true
- type: textarea
id: logs
attributes:
label: Relevant log output
description: Please copy and paste any relevant log output. This will be automatically formatted into code, so no need for backticks.
render: shell
- [] I have read the disclaimer above, and I am reporting a suspected malfunction in Scylla.
*Installation details*
Scylla version (or git commit hash):
Cluster size:
OS (RHEL/CentOS/Ubuntu/AWS AMI):
*Hardware details (for performance issues)* Delete if unneeded
Platform (physical/VM/cloud instance type/docker):
Hardware: sockets= cores= hyperthreading= memory=
Disks: (SSD/HDD, count)

View File

@@ -1,86 +0,0 @@
# ScyllaDB Development Instructions
## Project Context
High-performance distributed NoSQL database. Core values: performance, correctness, readability.
## Build System
### Modern Build (configure.py + ninja)
```bash
# Configure (run once per mode, or when switching modes)
./configure.py --mode=<mode> # mode: dev, debug, release, sanitize
# Build everything
ninja <mode>-build # e.g., ninja dev-build
# Build Scylla binary only (sufficient for Python integration tests)
ninja build/<mode>/scylla
# Build specific test
ninja build/<mode>/test/boost/<test_name>
```
## Running Tests
### C++ Unit Tests
```bash
# Run all tests in a file
./test.py --mode=<mode> test/<suite>/<test_name>.cc
# Run a single test case from a file
./test.py --mode=<mode> test/<suite>/<test_name>.cc::<test_case_name>
# Examples
./test.py --mode=dev test/boost/memtable_test.cc
./test.py --mode=dev test/raft/raft_server_test.cc::test_check_abort_on_client_api
```
**Important:**
- Use full path with `.cc` extension (e.g., `test/boost/test_name.cc`, not `boost/test_name`)
- To run a single test case, append `::<test_case_name>` to the file path
- If you encounter permission issues with cgroup metric gathering, add `--no-gather-metrics` flag
**Rebuilding Tests:**
- test.py does NOT automatically rebuild when test source files are modified
- Many tests are part of composite binaries (e.g., `combined_tests` in test/boost contains multiple test files)
- To find which binary contains a test, check `configure.py` in the repository root (primary source) or `test/<suite>/CMakeLists.txt`
- To rebuild a specific test binary: `ninja build/<mode>/test/<suite>/<binary_name>`
- Examples:
- `ninja build/dev/test/boost/combined_tests` (contains group0_voter_calculator_test.cc and others)
- `ninja build/dev/test/raft/replication_test` (standalone Raft test)
### Python Integration Tests
```bash
# Only requires Scylla binary (full build usually not needed)
ninja build/<mode>/scylla
# Run all tests in a file
./test.py --mode=<mode> <test_path>
# Run a single test case from a file
./test.py --mode=<mode> <test_path>::<test_function_name>
# Examples
./test.py --mode=dev alternator/
./test.py --mode=dev cluster/test_raft_voters::test_raft_limited_voters_retain_coordinator
# Optional flags
./test.py --mode=dev cluster/test_raft_no_quorum -v # Verbose output
./test.py --mode=dev cluster/test_raft_no_quorum --repeat 5 # Repeat test 5 times
```
**Important:**
- Use path without `.py` extension (e.g., `cluster/test_raft_no_quorum`, not `cluster/test_raft_no_quorum.py`)
- To run a single test case, append `::<test_function_name>` to the file path
- Add `-v` for verbose output
- Add `--repeat <num>` to repeat a test multiple times
- After modifying C++ source files, only rebuild the Scylla binary for Python tests - building the entire repository is unnecessary
## Code Philosophy
- Performance matters in hot paths (data read/write, inner loops)
- Self-documenting code through clear naming
- Comments explain "why", not "what"
- Prefer standard library over custom implementations
- Strive for simplicity and clarity, add complexity only when clearly justified
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context

View File

@@ -1,115 +0,0 @@
---
applyTo: "**/*.{cc,hh}"
---
# C++ Guidelines
**Important:** Always match the style and conventions of existing code in the file and directory.
## Memory Management
- Prefer stack allocation whenever possible
- Use `std::unique_ptr` by default for dynamic allocations
- `new`/`delete` are forbidden (use RAII)
- Use `seastar::lw_shared_ptr` or `seastar::shared_ptr` for shared ownership within same shard
- Use `seastar::foreign_ptr` for cross-shard sharing
- Avoid `std::shared_ptr` except when interfacing with external C++ APIs
- Avoid raw pointers except for non-owning references or C API interop
## Seastar Asynchronous Programming
- Use `seastar::future<T>` for all async operations
- Prefer coroutines (`co_await`, `co_return`) over `.then()` chains for readability
- Coroutines are preferred over `seastar::do_with()` for managing temporary state
- In hot paths where futures are ready, continuations may be more efficient than coroutines
- Chain futures with `.then()`, don't block with `.get()` (unless in `seastar::thread` context)
- All I/O must be asynchronous (no blocking calls)
- Use `seastar::gate` for shutdown coordination
- Use `seastar::semaphore` for resource limiting (not `std::mutex`)
- Break long loops with `maybe_yield()` to avoid reactor stalls
## Coroutines
```cpp
seastar::future<T> func() {
auto result = co_await async_operation();
co_return result;
}
```
## Error Handling
- Throw exceptions for errors (futures propagate them automatically)
- In data path: avoid exceptions, use `std::expected` (or `boost::outcome`) instead
- Use standard exceptions (`std::runtime_error`, `std::invalid_argument`)
- Database-specific: throw appropriate schema/query exceptions
## Performance
- Pass large objects by `const&` or `&&` (move semantics)
- Use `std::string_view` for non-owning string references
- Avoid copies: prefer move semantics
- Use `utils::chunked_vector` instead of `std::vector` for large allocations (>128KB)
- Minimize dynamic allocations in hot paths
## Database-Specific Types
- Use `schema_ptr` for schema references
- Use `mutation` and `mutation_partition` for data modifications
- Use `partition_key` and `clustering_key` for keys
- Use `api::timestamp_type` for database timestamps
- Use `gc_clock` for garbage collection timing
## Style
- C++23 standard (prefer modern features, especially coroutines)
- Use `auto` when type is obvious from RHS
- Avoid `auto` when it obscures the type
- Use range-based for loops: `for (const auto& item : container)`
- Use standard algorithms when they clearly simplify code (e.g., replacing 10-line loops)
- Avoid chaining multiple algorithms if a straightforward loop is clearer
- Mark functions and variables `const` whenever possible
- Use scoped enums: `enum class` (not unscoped `enum`)
## Headers
- Use `#pragma once`
- Include order: own header, C++ std, Seastar, Boost, project headers
- Forward declare when possible
- Never `using namespace` in headers (exception: `using namespace seastar` is globally available via `seastarx.hh`)
## Documentation
- Public APIs require clear documentation
- Implementation details should be self-evident from code
- Use `///` or Doxygen `/** */` for public documentation, `//` for implementation notes - follow the existing style
## Naming
- `snake_case` for most identifiers (classes, functions, variables, namespaces)
- Template parameters: `CamelCase` (e.g., `template<typename ValueType>`)
- Member variables: prefix with `_` (e.g., `int _count;`)
- Structs (value-only): no `_` prefix on members
- Constants and `constexpr`: `snake_case` (e.g., `static constexpr int max_size = 100;`)
- Files: `.hh` for headers, `.cc` for source
## Formatting
- 4 spaces indentation, never tabs
- Opening braces on same line as control structure (except namespaces)
- Space after keywords: `if (`, `while (`, `return `
- Whitespace around operators matches precedence: `*a + *b` not `* a+* b`
- Line length: keep reasonable (<160 chars), use continuation lines with double indent if needed
- Brace all nested scopes, even single statements
- Minimal patches: only format code you modify, never reformat entire files
## Logging
- Use structured logging with appropriate levels: DEBUG, INFO, WARN, ERROR
- Include context in log messages (e.g., request IDs)
- Never log sensitive data (credentials, PII)
## Forbidden
- `malloc`/`free`
- `printf` family (use logging or fmt)
- Raw pointers for ownership
- `using namespace` in headers
- Blocking operations: `std::sleep`, `std::read`, `std::mutex` (use Seastar equivalents)
- `std::atomic` (reserved for very special circumstances only)
- Macros (use `inline`, `constexpr`, or templates instead)
## Testing
When modifying existing code, follow TDD: create/update test first, then implement.
- Examine existing tests for style and structure
- Use Boost.Test framework
- Use `SEASTAR_THREAD_TEST_CASE` for Seastar asynchronous tests
- Aim for high code coverage, especially for new features and bug fixes
- Maintain bisectability: all tests must pass in every commit. Mark failing tests with `BOOST_FAIL()` or similar, then fix in subsequent commit

View File

@@ -1,51 +0,0 @@
---
applyTo: "**/*.py"
---
# Python Guidelines
**Important:** Match existing code style. Some directories (like `test/cqlpy` and `test/alternator`) prefer simplicity over type hints and docstrings.
## Style
- Follow PEP 8
- Use type hints for function signatures (unless directory style omits them)
- Use f-strings for formatting
- Line length: 160 characters max
- 4 spaces for indentation
## Imports
Order: standard library, third-party, local imports
```python
import os
import sys
import pytest
from cassandra.cluster import Cluster
from test.utils import setup_keyspace
```
Never use `from module import *`
## Documentation
All public functions/classes need docstrings (unless the current directory conventions omit them):
```python
def my_function(arg1: str, arg2: int) -> bool:
"""
Brief summary of function purpose.
Args:
arg1: Description of first argument.
arg2: Description of second argument.
Returns:
Description of return value.
"""
pass
```
## Testing Best Practices
- Maintain bisectability: all tests must pass in every commit
- Mark currently-failing tests with `@pytest.mark.xfail`, unmark when fixed
- Use descriptive names that convey intent
- Docstrings/comments should explain what the test verifies and why, and if it reproduces a specific issue or how it fits into the larger test suite

View File

@@ -47,29 +47,13 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
draft=is_draft
)
logging.info(f"Pull request created: {backport_pr.html_url}")
labels_to_add = []
priority_labels = {"P0", "P1"}
parent_pr_labels = [label.name for label in pr.labels]
for label in priority_labels:
if label in parent_pr_labels:
labels_to_add.append(label)
labels_to_add.append("force_on_cloud")
logging.info(f"Adding {label} and force_on_cloud labels from parent PR to backport PR")
break # Only apply the highest priority label
if is_collaborator:
backport_pr.add_to_assignees(pr.user)
if is_draft:
labels_to_add.append("conflicts")
backport_pr.add_to_labels("conflicts")
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
pr_comment += "Please resolve them and mark this PR as ready for review"
backport_pr.create_issue_comment(pr_comment)
# Apply all labels at once if we have any
if labels_to_add:
backport_pr.add_to_labels(*labels_to_add)
logging.info(f"Added labels to backport PR: {labels_to_add}")
logging.info(f"Assigned PR to original author: {pr.user}")
return backport_pr
except GithubException as e:
@@ -128,45 +112,29 @@ def backport(repo, pr, version, commits, backport_base_branch, is_collaborator):
is_draft = True
repo_local.git.add(A=True)
repo_local.git.cherry_pick('--continue')
# Check if the branch already exists in the remote fork
remote_refs = repo_local.git.ls_remote('--heads', fork_repo, new_branch_name)
if not remote_refs:
# Branch does not exist, create it with a regular push
repo_local.git.push(fork_repo, new_branch_name)
create_pull_request(repo, new_branch_name, backport_base_branch, pr, backport_pr_title, commits,
is_draft, is_collaborator)
else:
logging.info(f"Remote branch {new_branch_name} already exists in fork. Skipping push.")
repo_local.git.push(fork_repo, new_branch_name, force=True)
create_pull_request(repo, new_branch_name, backport_base_branch, pr, backport_pr_title, commits,
is_draft, is_collaborator)
except GitCommandError as e:
logging.warning(f"GitCommandError: {e}")
def with_github_keyword_prefix(repo, pr):
# GitHub issue pattern: #123, scylladb/scylladb#123, or full GitHub URLs
github_pattern = rf"(?:fix(?:|es|ed))\s*:?\s*(?:(?:(?:{repo.full_name})?#)|https://github\.com/{repo.full_name}/issues/)(\d+)"
# JIRA issue pattern: PKG-92 or https://scylladb.atlassian.net/browse/PKG-92
jira_pattern = r"(?:fix(?:|es|ed))\s*:?\s*(?:(?:https://scylladb\.atlassian\.net/browse/)?([A-Z]+-\d+))"
# Check PR body for GitHub issues
github_match = re.findall(github_pattern, pr.body, re.IGNORECASE)
# Check PR body for JIRA issues
jira_match = re.findall(jira_pattern, pr.body, re.IGNORECASE)
match = github_match or jira_match
if match:
pattern = rf"(?:fix(?:|es|ed))\s*:?\s*(?:(?:(?:{repo.full_name})?#)|https://github\.com/{repo.full_name}/issues/)(\d+)"
match = re.findall(pattern, pr.body, re.IGNORECASE)
if not match:
for commit in pr.get_commits():
match = re.findall(pattern, commit.commit.message, re.IGNORECASE)
if match:
print(f'{pr.number} has a valid close reference in commit message {commit.sha}')
break
if not match:
print(f'No valid close reference for {pr.number}')
return False
else:
return True
for commit in pr.get_commits():
github_match = re.findall(github_pattern, commit.commit.message, re.IGNORECASE)
jira_match = re.findall(jira_pattern, commit.commit.message, re.IGNORECASE)
if github_match or jira_match:
print(f'{pr.number} has a valid close reference in commit message {commit.sha}')
return True
print(f'No valid close reference for {pr.number}')
return False
def main():
args = parse_args()

View File

@@ -30,13 +30,8 @@ def copy_labels_from_linked_issues(repo, pr_number):
try:
issue = repo.get_issue(int(issue_number))
for label in issue.labels:
# Copy ALL labels from issues to PR when PR is opened
pr.add_to_labels(label.name)
print(f"Copied label '{label.name}' from issue #{issue_number} to PR #{pr_number}")
if label.name in ['P0', 'P1']:
pr.add_to_labels('force_on_cloud')
print(f"Added force_on_cloud label to PR #{pr_number} due to {label.name} label")
print(f"All labels from issue #{issue_number} copied to PR #{pr_number}")
print(f"Labels from issue #{issue_number} copied to PR #{pr_number}")
except Exception as e:
print(f"Error processing issue #{issue_number}: {e}")
@@ -79,22 +74,9 @@ def sync_labels(repo, number, label, action, is_issue=False):
target = repo.get_issue(int(pr_or_issue_number))
if action == 'labeled':
target.add_to_labels(label)
if label in ['P0', 'P1'] and is_issue:
# Only add force_on_cloud to PRs when P0/P1 is added to an issue
target.add_to_labels('force_on_cloud')
print(f"Added 'force_on_cloud' label to PR #{pr_or_issue_number} due to {label} label")
print(f"Label '{label}' successfully added.")
elif action == 'unlabeled':
target.remove_from_labels(label)
if label in ['P0', 'P1'] and is_issue:
# Check if any other P0/P1 labels remain before removing force_on_cloud
remaining_priority_labels = [l.name for l in target.labels if l.name in ['P0', 'P1']]
if not remaining_priority_labels:
try:
target.remove_from_labels('force_on_cloud')
print(f"Removed 'force_on_cloud' label from PR #{pr_or_issue_number} as no P0/P1 labels remain")
except Exception as e:
print(f"Warning: Could not remove force_on_cloud label: {e}")
print(f"Label '{label}' successfully removed.")
elif action == 'opened':
copy_labels_from_linked_issues(repo, number)

View File

@@ -1,16 +0,0 @@
{
"problemMatcher": [
{
"owner": "seastar-bad-include",
"severity": "error",
"pattern": [
{
"regexp": "^(.+):(\\d+):(.+)$",
"file": 1,
"line": 2,
"message": 3
}
]
}
]
}

View File

@@ -54,13 +54,10 @@ jobs:
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
run: python .github/scripts/auto-backport.py --repo ${{ github.repository }} --base-branch ${{ github.ref }} --commits ${{ github.event.before }}..${{ github.sha }}
- name: Check if a valid backport label exists and no backport_error
env:
LABELS_JSON: ${{ toJson(github.event.pull_request.labels) }}
id: check_label
run: |
labels_json="$LABELS_JSON"
echo "Checking labels:"
echo "$labels_json" | jq -r '.[].name'
labels_json='${{ toJson(github.event.pull_request.labels) }}'
echo "Checking labels: $(echo "$labels_json" | jq -r '.[].name')"
# Check if a valid backport label exists
if echo "$labels_json" | jq -e 'any(.[] | .name; test("backport/[0-9]+\\.[0-9]+$"))' > /dev/null; then

View File

@@ -1,12 +0,0 @@
name: Call Jira Status In Progress
on:
pull_request_target:
types: [opened]
jobs:
call-jira-status-in-progress:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,12 +0,0 @@
name: Call Jira Status In Review
on:
pull_request_target:
types: [ready_for_review, review_requested]
jobs:
call-jira-status-in-review:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,12 +0,0 @@
name: Call Jira Status Ready For Merge
on:
pull_request_target:
types: [labeled]
jobs:
call-jira-status-update:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,16 +1,9 @@
name: Notify PR Authors of Conflicts
permissions:
issues: write
pull-requests: write
on:
push:
branches:
- 'master'
- 'branch-*'
schedule:
- cron: '0 10 * * 1' # Runs every Monday at 10:00am
- cron: '0 10 * * 1,4' # Runs every Monday and Thursday at 10:00am
workflow_dispatch: # Manual trigger for testing
jobs:
notify_conflict_prs:
@@ -21,134 +14,32 @@ jobs:
uses: actions/github-script@v7
with:
script: |
console.log("Starting conflict reminder script...");
// Print trigger event
if (process.env.GITHUB_EVENT_NAME) {
console.log(`Workflow triggered by: ${process.env.GITHUB_EVENT_NAME}`);
} else {
console.log("Could not determine workflow trigger event.");
}
const isPushEvent = process.env.GITHUB_EVENT_NAME === 'push';
console.log(`isPushEvent: ${isPushEvent}`);
const twoMonthsAgo = new Date();
twoMonthsAgo.setMonth(twoMonthsAgo.getMonth() - 2);
const prs = await github.paginate(github.rest.pulls.list, {
owner: context.repo.owner,
repo: context.repo.repo,
state: 'open',
per_page: 100
});
console.log(`Fetched ${prs.length} open PRs`);
const recentPrs = prs.filter(pr => new Date(pr.created_at) >= twoMonthsAgo);
const validBaseBranches = ['master'];
const branchPrefix = 'branch-';
const oneWeekAgo = new Date();
const conflictLabel = 'conflicts';
oneWeekAgo.setDate(oneWeekAgo.getDate() - 7);
console.log(`One week ago: ${oneWeekAgo.toISOString()}`);
for (const pr of recentPrs) {
console.log(`Checking PR #${pr.number} on base branch '${pr.base.ref}'`);
const isBranchX = pr.base.ref.startsWith(branchPrefix);
const isMaster = validBaseBranches.includes(pr.base.ref);
if (!(isBranchX || isMaster)) {
console.log(`PR #${pr.number} skipped: base branch is not 'master' or does not start with '${branchPrefix}'`);
continue;
}
const threeDaysAgo = new Date();
const conflictLabel = 'conflicts';
threeDaysAgo.setDate(threeDaysAgo.getDate() - 3);
for (const pr of prs) {
if (!pr.base.ref.startsWith(branchPrefix)) continue;
const hasConflictLabel = pr.labels.some(label => label.name === conflictLabel);
if (!hasConflictLabel) continue;
const updatedDate = new Date(pr.updated_at);
console.log(`PR #${pr.number} last updated at: ${updatedDate.toISOString()}`);
if (!isPushEvent && updatedDate >= oneWeekAgo) {
console.log(`PR #${pr.number} skipped: updated within last week`);
continue;
}
if (pr.assignee === null) {
console.log(`PR #${pr.number} skipped: no assignee`);
continue;
}
// Fetch PR details to check mergeability
let { data: prDetails } = await github.rest.pulls.get({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: pr.number,
});
console.log(`PR #${pr.number} mergeable: ${prDetails.mergeable}`);
// Wait and re-fetch if mergeable is null
if (prDetails.mergeable === null) {
console.log(`PR #${pr.number} mergeable is null, waiting 2 seconds and retrying...`);
await new Promise(resolve => setTimeout(resolve, 2000)); // wait 2 seconds
prDetails = (await github.rest.pulls.get({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: pr.number,
})).data;
console.log(`PR #${pr.number} mergeable after retry: ${prDetails.mergeable}`);
}
if (prDetails.mergeable === false) {
const hasConflictLabel = pr.labels.some(label => label.name === conflictLabel);
console.log(`PR #${pr.number} has conflict label: ${hasConflictLabel}`);
// Fetch comments to check for existing notifications
const comments = await github.paginate(github.rest.issues.listComments, {
if (updatedDate >= threeDaysAgo) continue;
if (pr.assignee === null) continue;
const assignee = pr.assignee.login;
if (assignee) {
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: pr.number,
per_page: 100,
body: `@${assignee}, this PR has been open with conflicts. Please resolve the conflicts so we can merge it.`,
});
// Find last notification comment from the bot
const notificationPrefix = `@${pr.assignee.login}, this PR has merge conflicts with the base branch.`;
const lastNotification = comments
.filter(c =>
c.user.type === "Bot" &&
c.body.startsWith(notificationPrefix)
)
.sort((a, b) => new Date(b.created_at) - new Date(a.created_at))[0];
// Check if we should skip notification based on recent notification
let shouldSkipNotification = false;
if (lastNotification) {
const lastNotified = new Date(lastNotification.created_at);
if (lastNotified >= oneWeekAgo) {
console.log(`PR #${pr.number} skipped: last notification was less than 1 week ago`);
shouldSkipNotification = true;
}
}
// Additional check for push events on draft PRs with conflict labels
if (
isPushEvent &&
pr.draft === true &&
hasConflictLabel &&
shouldSkipNotification
) {
continue;
}
if (!hasConflictLabel) {
await github.rest.issues.addLabels({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: pr.number,
labels: [conflictLabel],
});
console.log(`Added 'conflicts' label to PR #${pr.number}`);
}
const assignee = pr.assignee.login;
if (assignee && !shouldSkipNotification) {
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: pr.number,
body: `@${assignee}, this PR has merge conflicts with the base branch. Please resolve the conflicts so we can merge it.`,
});
console.log(`Notified @${assignee} for PR #${pr.number}`);
}
} else {
console.log(`PR #${pr.number} is mergeable, no action needed.`);
}
console.log(`Notified @${assignee} for PR #${pr.number}`);
}
}
console.log(`Total PRs checked: ${prs.length}`);

View File

@@ -1,34 +0,0 @@
name: Docs / Validate metrics
on:
pull_request:
branches:
- master
- enterprise
paths:
- '**/*.cc'
- 'scripts/metrics-config.yml'
- 'scripts/get_description.py'
- 'docs/_ext/scylladb_metrics.py'
jobs:
validate-metrics:
runs-on: ubuntu-latest
name: Check metrics documentation coverage
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
- name: Install dependencies
run: pip install PyYAML
- name: Validate metrics
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml

View File

@@ -11,8 +11,7 @@ env:
CLEANER_OUTPUT_PATH: build/clang-include-cleaner.log
# the "idl" subdirectory does not contain C++ source code. the .hh files in it are
# supposed to be processed by idl-compiler.py, so we don't check them using the cleaner
CLEANER_DIRS: test/unit exceptions alternator api auth cdc compaction db dht gms index lang message mutation mutation_writer node_ops raft redis replica service
SEASTAR_BAD_INCLUDE_OUTPUT_PATH: build/seastar-bad-include.log
CLEANER_DIRS: test/unit exceptions alternator api auth cdc compaction db dht gms index lang message mutation mutation_writer node_ops redis replica
permissions: {}
@@ -81,24 +80,7 @@ jobs:
done
- run: |
echo "::remove-matcher owner=clang-include-cleaner::"
- run: |
echo "::add-matcher::.github/seastar-bad-include.json"
- name: check for seastar includes
run: |
git -c safe.directory="$PWD" \
grep -nE '#include +"seastar/' \
| tee "$SEASTAR_BAD_INCLUDE_OUTPUT_PATH"
- run: |
echo "::remove-matcher owner=seastar-bad-include::"
- uses: actions/upload-artifact@v4
with:
name: Logs
path: |
${{ env.CLEANER_OUTPUT_PATH }}
${{ env.SEASTAR_BAD_INCLUDE_OUTPUT_PATH }}
- name: fail if seastar headers are included as an internal library
run: |
if [ -s "$SEASTAR_BAD_INCLUDE_OUTPUT_PATH" ]; then
echo "::error::Found #include \"seastar/ in the source code. Use angle brackets instead."
exit 1
fi
name: Logs (clang-include-cleaner)
path: "./${{ env.CLEANER_OUTPUT_PATH }}"

View File

@@ -16,13 +16,6 @@ jobs:
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
repository: ${{ github.repository }}
ref: ${{ env.DEFAULT_BRANCH }}
token: ${{ secrets.AUTO_BACKPORT_TOKEN }}
fetch-depth: 1
- name: Mark pull request as ready for review
run: gh pr ready "${{ github.event.pull_request.number }}"
env:

View File

@@ -13,8 +13,6 @@ jobs:
issues: write
pull-requests: write
steps:
- name: Wait for label to be added
run: sleep 1m
- uses: mheap/github-action-required-labels@v5
with:
mode: minimum

View File

@@ -37,13 +37,13 @@ jobs:
run: python .github/scripts/sync_labels.py --repo ${{ github.repository }} --number ${{ github.event.number }} --action ${{ github.event.action }}
- name: Pull request labeled or unlabeled event
if: github.event_name == 'pull_request_target' && (startsWith(github.event.label.name, 'backport/') || github.event.label.name == 'P0' || github.event.label.name == 'P1')
if: github.event_name == 'pull_request_target' && startsWith(github.event.label.name, 'backport/')
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: python .github/scripts/sync_labels.py --repo ${{ github.repository }} --number ${{ github.event.number }} --action ${{ github.event.action }} --label ${{ github.event.label.name }}
- name: Issue labeled or unlabeled event
if: github.event_name == 'issues' && (startsWith(github.event.label.name, 'backport/') || github.event.label.name == 'P0' || github.event.label.name == 'P1')
if: github.event_name == 'issues' && startsWith(github.event.label.name, 'backport/')
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: python .github/scripts/sync_labels.py --repo ${{ github.repository }} --number ${{ github.event.issue.number }} --action ${{ github.event.action }} --is_issue --label ${{ github.event.label.name }}

View File

@@ -1,21 +0,0 @@
name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
jobs:
trigger-jenkins:
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI-Route Jenkins Job
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/releng/job/Scylla-CI-Route/buildWithParameters?PR_NUMBER=$PR_NUMBER&PR_REPO_NAME=$PR_REPO_NAME" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

View File

@@ -1,242 +0,0 @@
name: Trigger next gating
on:
pull_request_target:
types: [opened, reopened, synchronize]
issue_comment:
types: [created]
jobs:
trigger-ci:
runs-on: ubuntu-latest
steps:
- name: Dump GitHub context
env:
GITHUB_CONTEXT: ${{ toJson(github) }}
run: echo "$GITHUB_CONTEXT"
- name: Checkout PR code
uses: actions/checkout@v3
with:
fetch-depth: 0 # Needed to access full history
ref: ${{ github.event.pull_request.head.ref }}
- name: Fetch before commit if needed
run: |
if ! git cat-file -e ${{ github.event.before }} 2>/dev/null; then
echo "Fetching before commit ${{ github.event.before }}"
git fetch --depth=1 origin ${{ github.event.before }}
fi
- name: Compare commits for file changes
if: github.action == 'synchronize'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
echo "Base: ${{ github.event.before }}"
echo "Head: ${{ github.event.after }}"
TREE_BEFORE=$(git show -s --format=%T ${{ github.event.before }})
TREE_AFTER=$(git show -s --format=%T ${{ github.event.after }})
echo "TREE_BEFORE=$TREE_BEFORE" >> $GITHUB_ENV
echo "TREE_AFTER=$TREE_AFTER" >> $GITHUB_ENV
- name: Check if last push has file changes
run: |
if [[ "${{ env.TREE_BEFORE }}" == "${{ env.TREE_AFTER }}" ]]; then
echo "No file changes detected in the last push, only commit message edit."
echo "has_file_changes=false" >> $GITHUB_ENV
else
echo "File changes detected in the last push."
echo "has_file_changes=true" >> $GITHUB_ENV
fi
- name: Rule 1 - Check PR draft or conflict status
run: |
# Check if PR is in draft mode
IS_DRAFT="${{ github.event.pull_request.draft }}"
# Check if PR has 'conflict' label
HAS_CONFLICT_LABEL="false"
LABELS='${{ toJson(github.event.pull_request.labels) }}'
if echo "$LABELS" | jq -r '.[].name' | grep -q "^conflict$"; then
HAS_CONFLICT_LABEL="true"
fi
# Set draft_or_conflict variable
if [[ "$IS_DRAFT" == "true" || "$HAS_CONFLICT_LABEL" == "true" ]]; then
echo "draft_or_conflict=true" >> $GITHUB_ENV
echo "✅ Rule 1: PR is in draft mode or has conflict label - setting draft_or_conflict=true"
else
echo "draft_or_conflict=false" >> $GITHUB_ENV
echo "✅ Rule 1: PR is ready and has no conflict label - setting draft_or_conflict=false"
fi
echo "Draft status: $IS_DRAFT"
echo "Has conflict label: $HAS_CONFLICT_LABEL"
echo "Result: draft_or_conflict = $draft_or_conflict"
- name: Rule 2 - Check labels
run: |
# Check if PR has P0 or P1 labels
HAS_P0_P1_LABEL="false"
LABELS='${{ toJson(github.event.pull_request.labels) }}'
if echo "$LABELS" | jq -r '.[].name' | grep -E "^(P0|P1)$" > /dev/null; then
HAS_P0_P1_LABEL="true"
fi
# Check if PR already has force_on_cloud label
echo "HAS_FORCE_ON_CLOUD_LABEL=false" >> $GITHUB_ENV
if echo "$LABELS" | jq -r '.[].name' | grep -q "^force_on_cloud$"; then
HAS_FORCE_ON_CLOUD_LABEL="true"
echo "HAS_FORCE_ON_CLOUD_LABEL=true" >> $GITHUB_ENV
fi
echo "Has P0/P1 label: $HAS_P0_P1_LABEL"
echo "Has force_on_cloud label: $HAS_FORCE_ON_CLOUD_LABEL"
# Add force_on_cloud label if PR has P0/P1 and doesn't already have force_on_cloud
if [[ "$HAS_P0_P1_LABEL" == "true" && "$HAS_FORCE_ON_CLOUD_LABEL" == "false" ]]; then
echo "✅ Rule 2: PR has P0 or P1 label - adding force_on_cloud label"
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels" \
-d '{"labels":["force_on_cloud"]}'
elif [[ "$HAS_P0_P1_LABEL" == "true" && "$HAS_FORCE_ON_CLOUD_LABEL" == "true" ]]; then
echo "✅ Rule 2: PR has P0 or P1 label and already has force_on_cloud label - no action needed"
else
echo "✅ Rule 2: PR does not have P0 or P1 label - no force_on_cloud label needed"
fi
SKIP_UNIT_TEST_CUSTOM="false"
if echo "$LABELS" | jq -r '.[].name' | grep -q "^ci/skip_unit-tests_custom$"; then
SKIP_UNIT_TEST_CUSTOM="true"
fi
echo "SKIP_UNIT_TEST_CUSTOM=$SKIP_UNIT_TEST_CUSTOM" >> $GITHUB_ENV
- name: Rule 3 - Analyze changed files and set build requirements
run: |
# Get list of changed files
CHANGED_FILES=$(git diff --name-only ${{ github.event.pull_request.base.sha }} ${{ github.event.pull_request.head.sha }})
echo "Changed files:"
echo "$CHANGED_FILES"
echo ""
# Initialize all requirements to false
REQUIRE_BUILD="false"
REQUIRE_DTEST="false"
REQUIRE_UNITTEST="false"
REQUIRE_ARTIFACTS="false"
REQUIRE_SCYLLA_GDB="false"
# Check each file against patterns
while IFS= read -r file; do
if [[ -n "$file" ]]; then
echo "Checking file: $file"
# Build pattern: ^(?!scripts\/pull_github_pr.sh).*$
# Everything except scripts/pull_github_pr.sh
if [[ "$file" != "scripts/pull_github_pr.sh" ]]; then
REQUIRE_BUILD="true"
echo " ✓ Matches build pattern"
fi
# Dtest pattern: ^(?!test(.py|\/)|dist\/docker\/|dist\/common\/scripts\/).*$
# Everything except test files, dist/docker/, dist/common/scripts/
if [[ ! "$file" =~ ^test\.(py|/).*$ ]] && [[ ! "$file" =~ ^dist/docker/.*$ ]] && [[ ! "$file" =~ ^dist/common/scripts/.*$ ]]; then
REQUIRE_DTEST="true"
echo " ✓ Matches dtest pattern"
fi
# Unittest pattern: ^(?!dist\/docker\/|dist\/common\/scripts).*$
# Everything except dist/docker/, dist/common/scripts/
if [[ ! "$file" =~ ^dist/docker/.*$ ]] && [[ ! "$file" =~ ^dist/common/scripts.*$ ]]; then
REQUIRE_UNITTEST="true"
echo " ✓ Matches unittest pattern"
fi
# Artifacts pattern: ^(?:dist|tools\/toolchain).*$
# Files starting with dist or tools/toolchain
if [[ "$file" =~ ^dist.*$ ]] || [[ "$file" =~ ^tools/toolchain.*$ ]]; then
REQUIRE_ARTIFACTS="true"
echo " ✓ Matches artifacts pattern"
fi
# Scylla GDB pattern: ^(scylla-gdb.py).*$
# Files starting with scylla-gdb.py
if [[ "$file" =~ ^scylla-gdb\.py.*$ ]]; then
REQUIRE_SCYLLA_GDB="true"
echo " ✓ Matches scylla_gdb pattern"
fi
fi
done <<< "$CHANGED_FILES"
# Set environment variables
echo "requireBuild=$REQUIRE_BUILD" >> $GITHUB_ENV
echo "requireDtest=$REQUIRE_DTEST" >> $GITHUB_ENV
echo "requireUnittest=$REQUIRE_UNITTEST" >> $GITHUB_ENV
echo "requireArtifacts=$REQUIRE_ARTIFACTS" >> $GITHUB_ENV
echo "requireScyllaGdb=$REQUIRE_SCYLLA_GDB" >> $GITHUB_ENV
echo ""
echo "✅ Rule 3: File analysis complete"
echo "Build required: $REQUIRE_BUILD"
echo "Dtest required: $REQUIRE_DTEST"
echo "Unittest required: $REQUIRE_UNITTEST"
echo "Artifacts required: $REQUIRE_ARTIFACTS"
echo "Scylla GDB required: $REQUIRE_SCYLLA_GDB"
- name: Determine Jenkins Job Name
run: |
if [[ "${{ github.ref_name }}" == "next" ]]; then
FOLDER_NAME="scylla-master"
elif [[ "${{ github.ref_name }}" == "next-enterprise" ]]; then
FOLDER_NAME="scylla-enterprise"
else
VERSION=$(echo "${{ github.ref_name }}" | awk -F'-' '{print $2}')
if [[ "$VERSION" =~ ^202[0-4]\.[0-9]+$ ]]; then
FOLDER_NAME="enterprise-$VERSION"
elif [[ "$VERSION" =~ ^[0-9]+\.[0-9]+$ ]]; then
FOLDER_NAME="scylla-$VERSION"
fi
fi
echo "JOB_NAME=${FOLDER_NAME}/job/scylla-ci" >> $GITHUB_ENV
- name: Trigger Jenkins Job
if: env.draft_or_conflict == 'false' && env.has_file_changes == 'true' && github.action == 'opened' || github.action == 'reopened'
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
echo "Triggering Jenkins Job: $JOB_NAME"
curl -X POST \
"$JENKINS_URL/job/$JOB_NAME/buildWithParameters? \
PR_NUMBER=$PR_NUMBER& \
RUN_DTEST=$REQUIRE_DTEST& \
RUN_ONLY_SCYLLA_GDB=$REQUIRE_SCYLLA_GDB& \
RUN_UNIT_TEST=$REQUIRE_UNITTEST& \
FORCE_ON_CLOUD=$HAS_FORCE_ON_CLOUD_LABEL& \
SKIP_UNIT_TEST_CUSTOM=$SKIP_UNIT_TEST_CUSTOM& \
RUN_ARTIFACT_TESTS=$REQUIRE_ARTIFACTS" \
--fail \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" \
-i -v
trigger-ci-via-comment:
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI Jenkins Job
env:
JENKINS_USER: ${{ secrets.JENKINS_USERNAME }}
JENKINS_API_TOKEN: ${{ secrets.JENKINS_TOKEN }}
JENKINS_URL: "https://jenkins.scylladb.com"
run: |
PR_NUMBER=${{ github.event.issue.number }}
PR_REPO_NAME=${{ github.event.repository.full_name }}
curl -X POST "$JENKINS_URL/job/$JOB_NAME/buildWithParameters?PR_NUMBER=$PR_NUMBER" \
--user "$JENKINS_USER:$JENKINS_API_TOKEN" --fail -i -v

View File

@@ -2,7 +2,7 @@ name: Urgent Issue Reminder
on:
schedule:
- cron: '10 8 * * *' # Runs daily at 8 AM
- cron: '10 8 * * 1' # Runs every Monday at 8 AM
jobs:
reminder:

2
.gitignore vendored
View File

@@ -35,5 +35,3 @@ compile_commands.json
.envrc
clang_build
.idea/
nuke
rust/target

3
.gitmodules vendored
View File

@@ -9,6 +9,9 @@
[submodule "abseil"]
path = abseil
url = ../abseil-cpp
[submodule "scylla-tools"]
path = tools/java
url = ../scylla-tools-java
[submodule "scylla-python3"]
path = tools/python3
url = ../scylla-python3

View File

@@ -49,7 +49,7 @@ include(limit_jobs)
set(CMAKE_CXX_STANDARD "23" CACHE INTERNAL "")
set(CMAKE_CXX_EXTENSIONS ON CACHE INTERNAL "")
set(CMAKE_CXX_SCAN_FOR_MODULES OFF CACHE INTERNAL "")
set(CMAKE_VISIBILITY_INLINES_HIDDEN ON)
set(CMAKE_CXX_VISIBILITY_PRESET hidden)
if(is_multi_config)
find_package(Seastar)
@@ -90,13 +90,13 @@ if(is_multi_config)
add_dependencies(Seastar::seastar_testing Seastar)
else()
set(Seastar_TESTING ON CACHE BOOL "" FORCE)
set(Seastar_API_LEVEL 9 CACHE STRING "" FORCE)
set(Seastar_API_LEVEL 7 CACHE STRING "" FORCE)
set(Seastar_DEPRECATED_OSTREAM_FORMATTERS OFF CACHE BOOL "" FORCE)
set(Seastar_APPS ON CACHE BOOL "" FORCE)
set(Seastar_EXCLUDE_APPS_FROM_ALL ON CACHE BOOL "" FORCE)
set(Seastar_EXCLUDE_TESTS_FROM_ALL ON CACHE BOOL "" FORCE)
set(Seastar_IO_URING ON CACHE BOOL "" FORCE)
set(Seastar_SCHEDULING_GROUPS_COUNT 21 CACHE STRING "" FORCE)
set(Seastar_SCHEDULING_GROUPS_COUNT 19 CACHE STRING "" FORCE)
set(Seastar_UNUSED_RESULT_ERROR ON CACHE BOOL "" FORCE)
add_subdirectory(seastar)
target_compile_definitions (seastar
@@ -116,7 +116,6 @@ list(APPEND absl_cxx_flags
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
list(APPEND ABSL_GCC_FLAGS ${absl_cxx_flags})
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
list(APPEND absl_cxx_flags "-Wno-deprecated-builtins")
list(APPEND ABSL_LLVM_FLAGS ${absl_cxx_flags})
endif()
set(ABSL_DEFAULT_LINKOPTS
@@ -164,68 +163,47 @@ file(MAKE_DIRECTORY "${scylla_gen_build_dir}")
include(add_version_library)
generate_scylla_version()
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
add_library(scylla-precompiled-header STATIC exported_templates.cc)
target_link_libraries(scylla-precompiled-header PRIVATE
absl::headers
absl::btree
absl::hash
absl::raw_hash_set
add_library(scylla-zstd STATIC
zstd.cc)
target_link_libraries(scylla-zstd
PRIVATE
db
Seastar::seastar
Snappy::snappy
systemd
ZLIB::ZLIB
lz4::lz4_static
zstd::zstd_static)
if (Scylla_USE_PRECOMPILED_HEADER)
set(Scylla_USE_PRECOMPILED_HEADER_USE ON)
find_program(DISTCC_EXEC NAMES distcc OPTIONAL)
if (DISTCC_EXEC)
if(DEFINED ENV{DISTCC_HOSTS})
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
message(STATUS "Disabling precompiled header usage because distcc exists and DISTCC_HOSTS is set, assuming you're using distributed compilation.")
else()
file(REAL_PATH "~/.distcc/hosts" DIST_CC_HOSTS_PATH EXPAND_TILDE)
if (EXISTS ${DIST_CC_HOSTS_PATH})
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
message(STATUS "Disabling precompiled header usage because distcc and ~/.distcc/hosts exists, assuming you're using distributed compilation.")
endif()
endif()
endif()
if (Scylla_USE_PRECOMPILED_HEADER_USE)
message(STATUS "Using precompiled header for Scylla - remember to add `sloppiness = pch_defines,time_macros` to ccache.conf, if you're using ccache.")
target_precompile_headers(scylla-precompiled-header PRIVATE "stdafx.hh")
target_compile_definitions(scylla-precompiled-header PRIVATE SCYLLA_USE_PRECOMPILED_HEADER)
endif()
else()
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
endif()
zstd::libzstd)
add_library(scylla-main STATIC)
target_sources(scylla-main
PRIVATE
absl-flat_hash_map.cc
bytes.cc
client_data.cc
clocks-impl.cc
sstable_dict_autotrainer.cc
collection_mutation.cc
compress.cc
converting_mutation_partition_applier.cc
counters.cc
direct_failure_detector/failure_detector.cc
duration.cc
exceptions/exceptions.cc
frozen_schema.cc
generic_server.cc
debug.cc
init.cc
keys/keys.cc
keys.cc
multishard_mutation_query.cc
mutation_query.cc
node_ops/task_manager_module.cc
partition_slice_builder.cc
query/query.cc
querier.cc
query.cc
query_ranges_to_vnodes.cc
query/query-result-set.cc
query-result-set.cc
tombstone_gc_options.cc
tombstone_gc.cc
reader_concurrency_semaphore.cc
reader_concurrency_semaphore_group.cc
schema_mutations.cc
serializer.cc
service/direct_failure_detector/failure_detector.cc
sstables_loader.cc
table_helper.cc
tasks/task_handler.cc
@@ -236,6 +214,7 @@ target_sources(scylla-main
vint-serialization.cc)
target_link_libraries(scylla-main
PRIVATE
"$<LINK_LIBRARY:WHOLE_ARCHIVE,scylla-zstd>"
db
absl::headers
absl::btree
@@ -247,7 +226,6 @@ target_link_libraries(scylla-main
ZLIB::ZLIB
lz4::lz4_static
zstd::zstd_static
scylla-precompiled-header
)
option(Scylla_CHECK_HEADERS
@@ -302,6 +280,7 @@ add_subdirectory(mutation)
add_subdirectory(mutation_writer)
add_subdirectory(node_ops)
add_subdirectory(readers)
add_subdirectory(redis)
add_subdirectory(replica)
add_subdirectory(raft)
add_subdirectory(repair)
@@ -316,7 +295,6 @@ add_subdirectory(tracing)
add_subdirectory(transport)
add_subdirectory(types)
add_subdirectory(utils)
add_subdirectory(vector_search)
add_version_library(scylla_version
release.cc)
@@ -346,6 +324,7 @@ set(scylla_libs
mutation_writer
raft
readers
redis
repair
replica
schema
@@ -358,8 +337,7 @@ set(scylla_libs
tracing
transport
types
utils
vector_search)
utils)
target_link_libraries(scylla PRIVATE
${scylla_libs})
@@ -393,6 +371,3 @@ endif()
if(Scylla_BUILD_INSTRUMENTED)
add_subdirectory(pgo)
endif()
add_executable(patchelf
tools/patchelf.cc)

View File

@@ -12,7 +12,7 @@ Please use the [issue tracker](https://github.com/scylladb/scylla/issues/) to re
## Contributing code to Scylla
Before you can contribute code to Scylla for the first time, you should sign the [Contributor License Agreement](https://www.scylladb.com/open-source/contributor-agreement/) and send the signed form to cla@scylladb.com. You can then submit your changes as patches to the [scylladb-dev mailing list](https://groups.google.com/forum/#!forum/scylladb-dev) or as a pull request to the [Scylla project on github](https://github.com/scylladb/scylla).
Before you can contribute code to Scylla for the first time, you should sign the [Contributor License Agreement](https://www.scylladb.com/open-source/contributor-agreement/) and send the signed form cla@scylladb.com. You can then submit your changes as patches to the [scylladb-dev mailing list](https://groups.google.com/forum/#!forum/scylladb-dev) or as a pull request to the [Scylla project on github](https://github.com/scylladb/scylla).
If you need help formatting or sending patches, [check out these instructions](https://github.com/scylladb/scylla/wiki/Formatting-and-sending-patches).
The Scylla C++ source code uses the [Seastar coding style](https://github.com/scylladb/seastar/blob/master/coding-style.md) so please adhere to that in your patches. Note that Scylla code is written with `using namespace seastar`, so should not explicitly add the `seastar::` prefix to Seastar symbols. You will usually not need to add `using namespace seastar` to new source files, because most Scylla header files have `#include "seastarx.hh"`, which does this.

View File

@@ -43,7 +43,7 @@ $ ./tools/toolchain/dbuild ninja build/release/scylla
$ ./tools/toolchain/dbuild ./build/release/scylla --developer-mode 1
```
Note: do not mix environments - either perform all your work with dbuild, or natively on the host.
Note: do not mix environemtns - either perform all your work with dbuild, or natively on the host.
Note2: you can get to an interactive shell within dbuild by running it without any parameters:
```bash
$ ./tools/toolchain/dbuild
@@ -91,7 +91,7 @@ You can also specify a single mode. For example
$ ninja-build release
```
Will build everything in release mode. The valid modes are
Will build everytihng in release mode. The valid modes are
* Debug: Enables [AddressSanitizer](https://github.com/google/sanitizers/wiki/AddressSanitizer)
and other sanity checks. It has no optimizations, which allows for debugging with tools like
@@ -220,9 +220,28 @@ On a development machine, one might run Scylla as
$ SCYLLA_HOME=$HOME/scylla build/release/scylla --overprovisioned --developer-mode=yes
```
To interact with scylla it is recommended to build our version of
cqlsh. It is available at
https://github.com/scylladb/scylla-cqlsh and is available as a submodule.
To interact with scylla it is recommended to build our versions of
cqlsh and nodetool. They are available at
https://github.com/scylladb/scylla-tools-java and can be built with
```bash
$ sudo ./install-dependencies.sh
$ ant jar
```
cqlsh should work out of the box, but nodetool depends on a running
scylla-jmx (https://github.com/scylladb/scylla-jmx). It can be build
with
```bash
$ mvn package
```
and must be started with
```bash
$ ./scripts/scylla-jmx
```
### Branches and tags
@@ -361,7 +380,7 @@ avoid that the gold linker can be told to create an index with
More info at https://gcc.gnu.org/wiki/DebugFission.
Both options can be enabled by passing `--split-dwarf` to configure.py.
Both options can be enable by passing `--split-dwarf` to configure.py.
Note that distcc is *not* compatible with it, but icecream
(https://github.com/icecc/icecream) is.
@@ -370,7 +389,7 @@ Note that distcc is *not* compatible with it, but icecream
Sometimes Scylla development is closely tied with a feature being developed in Seastar. It can be useful to compile Scylla with a particular check-out of Seastar.
One way to do this is to create a local remote for the Seastar submodule in the Scylla repository:
One way to do this it to create a local remote for the Seastar submodule in the Scylla repository:
```bash
$ cd $HOME/src/scylla

View File

@@ -1,6 +1,9 @@
This project includes code developed by the Apache Software Foundation (http://www.apache.org/),
especially Apache Cassandra.
It includes files from https://github.com/antonblanchard/crc32-vpmsum (author Anton Blanchard <anton@au.ibm.com>, IBM).
These files are located in utils/arch/powerpc/crc32-vpmsum. Their license may be found in licenses/LICENSE-crc32-vpmsum.TXT.
It includes modified code from https://gitbox.apache.org/repos/asf?p=cassandra-dtest.git (owned by The Apache Software Foundation)
It includes modified tests from https://github.com/etcd-io/etcd.git (owned by The etcd Authors)

View File

@@ -18,7 +18,7 @@ Scylla is fairly fussy about its build environment, requiring very recent
versions of the C++23 compiler and of many libraries to build. The document
[HACKING.md](HACKING.md) includes detailed information on building and
developing Scylla, but to get Scylla building quickly on (almost) any build
machine, Scylla offers a [frozen toolchain](tools/toolchain/README.md).
machine, Scylla offers a [frozen toolchain](tools/toolchain/README.md),
This is a pre-configured Docker image which includes recent versions of all
the required compilers, libraries and build tools. Using the frozen toolchain
allows you to avoid changing anything in your build machine to meet Scylla's

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.1.0-dev
VERSION=2025.2.0-dev
if test -f version
then

View File

@@ -1,109 +0,0 @@
# Analysis of unimplemented::cause Enum Values
This document provides an analysis of the `unimplemented::cause` enum values after cleanup.
## Removed Unused Enum Values (20 values removed)
The following enum values had **zero usages** in the codebase and have been removed:
- `LWT` - Lightweight transactions
- `PAGING` - Query result paging
- `AUTH` - Authentication
- `PERMISSIONS` - Permission checking
- `COUNTERS` - Counter columns
- `MIGRATIONS` - Schema migrations
- `GOSSIP` - Gossip protocol
- `TOKEN_RESTRICTION` - Token-based restrictions
- `LEGACY_COMPOSITE_KEYS` - Legacy composite key handling
- `COLLECTION_RANGE_TOMBSTONES` - Collection range tombstones
- `RANGE_DELETES` - Range deletion operations
- `COMPRESSION` - Compression features
- `NONATOMIC` - Non-atomic operations
- `CONSISTENCY` - Consistency level handling
- `WRAP_AROUND` - Token wrap-around handling
- `STORAGE_SERVICE` - Storage service operations
- `SCHEMA_CHANGE` - Schema change operations
- `MIXED_CF` - Mixed column family operations
- `SSTABLE_FORMAT_M` - SSTable format M
## Remaining Enum Values (8 values kept)
### 1. `API` (4 usages)
**Impact**: REST API features that are not fully implemented.
**Usages**:
- `api/column_family.cc:1052` - Fails when `split_output` parameter is used in major compaction
- `api/compaction_manager.cc:100,146,216` - Warns when force_user_defined_compaction or related operations are called
**User Impact**: Some REST API endpoints for compaction management are stubs and will warn or fail.
### 2. `INDEXES` (6 usages)
**Impact**: Secondary index features not fully supported.
**Usages**:
- `api/column_family.cc:433,440,449,456` - Warns about index-related operations
- `cql3/restrictions/statement_restrictions.cc:1158` - Fails when attempting filtering on collection columns without proper indexing
- `cql3/statements/update_statement.cc:149` - Warns about index operations
**User Impact**: Some advanced secondary index features (especially filtering on collections) are not available.
### 3. `TRIGGERS` (2 usages)
**Impact**: Trigger support is not implemented.
**Usages**:
- `db/schema_tables.cc:2017` - Warns when loading trigger metadata from schema tables
- `service/storage_proxy.cc:4166` - Warns when processing trigger-related operations
**User Impact**: Cassandra triggers (stored procedures that execute on data changes) are not supported.
### 4. `METRICS` (1 usage)
**Impact**: Some query processor metrics are not collected.
**Usages**:
- `cql3/query_processor.cc:585` - Warns about missing metrics implementation
**User Impact**: Minor - some internal metrics may not be available.
### 5. `VALIDATION` (4 usages)
**Impact**: Schema validation checks are partially implemented.
**Usages**:
- `cql3/functions/token_fct.hh:38` - Warns about validation in token functions
- `cql3/statements/drop_keyspace_statement.cc:40` - Warns when dropping keyspace
- `cql3/statements/truncate_statement.cc:87` - Warns when truncating table
- `service/migration_manager.cc:750` - Warns during schema migrations
**User Impact**: Some schema validation checks are skipped (with warnings logged).
### 6. `REVERSED` (1 usage)
**Impact**: Reversed type support in CQL protocol.
**Usages**:
- `transport/server.cc:2085` - Fails when trying to use reversed types in CQL protocol
**User Impact**: Reversed types are not supported in the CQL protocol implementation.
### 7. `HINT` (1 usage)
**Impact**: Hint replaying is not implemented.
**Usages**:
- `db/batchlog_manager.cc:251` - Warns when attempting to replay hints
**User Impact**: Cassandra hints (temporary storage of writes when nodes are down) are not supported.
### 8. `SUPER` (2 usages)
**Impact**: Super column families are not supported.
**Usages**:
- `db/legacy_schema_migrator.cc:157` - Fails when encountering super column family in legacy schema
- `db/schema_tables.cc:2288` - Fails when encountering super column family in schema tables
**User Impact**: Super column families (legacy Cassandra feature) will cause errors if encountered in legacy data or schema migrations.
## Summary
- **Removed**: 20 unused enum values (76% reduction)
- **Kept**: 8 actively used enum values (24% remaining)
- **Total lines removed**: ~40 lines from enum definition and switch statement
The remaining enum values represent actual unimplemented features that users may encounter, with varying impacts ranging from warnings (TRIGGERS, METRICS, VALIDATION, HINT) to failures (API split_output, INDEXES on collections, REVERSED types, SUPER tables).

View File

@@ -17,7 +17,6 @@ target_sources(alternator
streams.cc
consumed_capacity.cc
ttl.cc
parsed_expression_cache.cc
${cql_grammar_srcs})
target_include_directories(alternator
PUBLIC
@@ -34,8 +33,5 @@ target_link_libraries(alternator
idl
absl::headers)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(alternator REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers alternator
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -11,6 +11,7 @@
#include "utils/log.hh"
#include <string>
#include <string_view>
#include "bytes.hh"
#include "alternator/auth.hh"
#include <fmt/format.h>
#include "auth/password_authenticator.hh"

View File

@@ -24,7 +24,7 @@ static constexpr uint64_t KB = 1024ULL;
static constexpr uint64_t RCU_BLOCK_SIZE_LENGTH = 4*KB;
static constexpr uint64_t WCU_BLOCK_SIZE_LENGTH = 1*KB;
bool consumed_capacity_counter::should_add_capacity(const rjson::value& request) {
static bool should_add_capacity(const rjson::value& request) {
const rjson::value* return_consumed = rjson::find(request, "ReturnConsumedCapacity");
if (!return_consumed) {
return false;
@@ -62,22 +62,15 @@ static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_by
rcu_consumed_capacity_counter::rcu_consumed_capacity_counter(const rjson::value& request, bool is_quorum) :
consumed_capacity_counter(should_add_capacity(request)),_is_quorum(is_quorum) {
}
uint64_t rcu_consumed_capacity_counter::get_half_units(uint64_t total_bytes, bool is_quorum) noexcept {
return calculate_half_units(RCU_BLOCK_SIZE_LENGTH, total_bytes, is_quorum);
}
uint64_t rcu_consumed_capacity_counter::get_half_units() const noexcept {
return get_half_units(_total_bytes, _is_quorum);
return calculate_half_units(RCU_BLOCK_SIZE_LENGTH, _total_bytes, _is_quorum);
}
uint64_t wcu_consumed_capacity_counter::get_half_units() const noexcept {
return calculate_half_units(WCU_BLOCK_SIZE_LENGTH, _total_bytes, true);
}
uint64_t wcu_consumed_capacity_counter::get_units(uint64_t total_bytes) noexcept {
return calculate_half_units(WCU_BLOCK_SIZE_LENGTH, total_bytes, true) * HALF_UNIT_MULTIPLIER;
}
wcu_consumed_capacity_counter::wcu_consumed_capacity_counter(const rjson::value& request) :
consumed_capacity_counter(should_add_capacity(request)) {
}

View File

@@ -42,25 +42,21 @@ public:
*/
virtual uint64_t get_half_units() const noexcept = 0;
uint64_t _total_bytes = 0;
static bool should_add_capacity(const rjson::value& request);
protected:
bool _should_add_to_reponse = false;
};
class rcu_consumed_capacity_counter : public consumed_capacity_counter {
virtual uint64_t get_half_units() const noexcept;
bool _is_quorum = false;
public:
rcu_consumed_capacity_counter(const rjson::value& request, bool is_quorum);
rcu_consumed_capacity_counter(): consumed_capacity_counter(false), _is_quorum(false){}
virtual uint64_t get_half_units() const noexcept;
static uint64_t get_half_units(uint64_t total_bytes, bool is_quorum) noexcept;
};
class wcu_consumed_capacity_counter : public consumed_capacity_counter {
virtual uint64_t get_half_units() const noexcept;
public:
wcu_consumed_capacity_counter(const rjson::value& request);
static uint64_t get_units(uint64_t total_bytes) noexcept;
};
}

View File

@@ -136,8 +136,6 @@ future<> controller::start_server() {
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
_config.alternator_max_users_query_size_in_trace_output,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
@@ -169,8 +167,4 @@ future<> controller::request_stop_server() {
});
}
future<utils::chunked_vector<client_data>> controller::get_client_data() {
return _server.local().get_client_data();
}
}

View File

@@ -11,7 +11,7 @@
#include <seastar/core/sharded.hh>
#include <seastar/core/smp.hh>
#include "transport/protocol_server.hh"
#include "protocol_server.hh"
namespace service {
class storage_proxy;
@@ -90,10 +90,6 @@ public:
virtual future<> start_server() override;
virtual future<> stop_server() override;
virtual future<> request_stop_server() override;
// This virtual function is called (on each shard separately) when the
// virtual table "system.clients" is read. It is expected to generate a
// list of clients connected to this server (on this shard).
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
};
}

View File

@@ -94,9 +94,6 @@ public:
static api_error internal(std::string msg) {
return api_error("InternalServerError", std::move(msg), http::reply::status_type::internal_server_error);
}
static api_error payload_too_large(std::string msg) {
return api_error("PayloadTooLarge", std::move(msg), status_type::payload_too_large);
}
// Provide the "std::exception" interface, to make it easier to print this
// exception in log messages. Note that this function is *not* used to

File diff suppressed because it is too large Load Diff

View File

@@ -10,8 +10,8 @@
#include <seastar/core/future.hh>
#include "seastarx.hh"
#include <seastar/json/json_elements.hh>
#include <seastar/core/sharded.hh>
#include <seastar/util/noncopyable_function.hh>
#include "service/migration_manager.hh"
#include "service/client_state.hh"
@@ -58,6 +58,33 @@ namespace alternator {
class rmw_operation;
struct make_jsonable : public json::jsonable {
rjson::value _value;
public:
explicit make_jsonable(rjson::value&& value);
std::string to_json() const override;
};
/**
* Make return type for serializing the object "streamed",
* i.e. direct to HTTP output stream. Note: only useful for
* (very) large objects as there are overhead issues with this
* as well, but for massive lists of return objects this can
* help avoid large allocations/many re-allocs
*/
json::json_return_type make_streamed(rjson::value&&);
struct json_string : public json::jsonable {
std::string _value;
public:
explicit json_string(std::string&& value);
std::string to_json() const override;
};
namespace parsed {
class path;
};
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
// Wraps the db::get_tags_of_table and throws if the table is missing the tags extension.
@@ -128,9 +155,6 @@ using attrs_to_get_node = attribute_path_map_node<std::monostate>;
// optional means we should get all attributes, not specific ones.
using attrs_to_get = attribute_path_map<std::monostate>;
namespace parsed {
class expression_cache;
}
class executor : public peering_sharded_service<executor> {
gms::gossiper& _gossiper;
@@ -139,32 +163,14 @@ class executor : public peering_sharded_service<executor> {
db::system_distributed_keyspace& _sdks;
cdc::metadata& _cdc_metadata;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
// An smp_service_group to be used for limiting the concurrency when
// forwarding Alternator request between shards - if necessary for LWT.
smp_service_group _ssg;
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
public:
using client_state = service::client_state;
// request_return_type is the return type of the executor methods, which
// can be one of:
// 1. A string, which is the response body for the request.
// 2. A body_writer, an asynchronous function (returning future<>) that
// takes an output_stream and writes the response body into it.
// 3. An api_error, which is an error response that should be returned to
// the client.
// The body_writer is used for streaming responses, where the response body
// is written in chunks to the output_stream. This allows for efficient
// handling of large responses without needing to allocate a large buffer
// in memory.
using body_writer = noncopyable_function<future<>(output_stream<char>&&)>;
using request_return_type = std::variant<std::string, body_writer, api_error>;
using request_return_type = std::variant<json::json_return_type, api_error>;
stats _stats;
// The metric_groups object holds this stat object's metrics registered
// as long as the stats object is alive.
seastar::metrics::metric_groups _metrics;
static constexpr auto ATTRS_COLUMN_NAME = ":attrs";
static constexpr auto KEYSPACE_NAME_PREFIX = "alternator_";
static constexpr std::string_view INTERNAL_TABLE_PREFIX = ".scylla.alternator.";
@@ -176,7 +182,6 @@ public:
cdc::metadata& cdc_metadata,
smp_service_group ssg,
utils::updateable_value<uint32_t> default_timeout_in_ms);
~executor();
future<request_return_type> create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
future<request_return_type> describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request);
@@ -204,23 +209,26 @@ public:
future<request_return_type> describe_continuous_backups(client_state& client_state, service_permit permit, rjson::value request);
future<> start();
future<> stop();
future<> stop() {
// disconnect from the value source, but keep the value unchanged.
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
return make_ready_future<>();
}
static sstring table_name(const schema&);
static db::timeout_clock::time_point default_timeout();
private:
static thread_local utils::updateable_value<uint32_t> s_default_timeout_in_ms;
public:
static schema_ptr find_table(service::storage_proxy&, std::string_view table_name);
static schema_ptr find_table(service::storage_proxy&, const rjson::value& request);
private:
friend class rmw_operation;
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr);
public:
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&);
static std::optional<rjson::value> describe_single_item(schema_ptr,
const query::partition_slice&,
@@ -229,15 +237,11 @@ public:
const std::optional<attrs_to_get>&,
uint64_t* = nullptr);
// Converts a multi-row selection result to JSON compatible with DynamoDB.
// For each row, this method calls item_callback, which takes the size of
// the item as the parameter.
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
noncopyable_function<void(uint64_t)> item_callback = {});
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get);
static void describe_single_item(const cql3::selection::selection&,
const std::vector<managed_bytes_opt>&,
@@ -246,7 +250,7 @@ public:
uint64_t* item_length_in_bytes = nullptr,
bool = false);
static bool add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp);
static void add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp);
static void supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp);
static void supplement_table_stream_info(rjson::value& descr, const schema& schema, const service::storage_proxy& sp);
};
@@ -265,15 +269,6 @@ bool is_big(const rjson::value& val, int big_size = 100'000);
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
// SELECT, DROP, etc.) on the given table. When permission is denied an
// appropriate user-readable api_error::access_denied is thrown.
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
/**
* Make return type for serializing the object "streamed",
* i.e. direct to HTTP output stream. Note: only useful for
* (very) large objects as there are overhead issues with this
* as well, but for massive lists of return objects this can
* help avoid large allocations/many re-allocs
*/
executor::body_writer make_streamed(rjson::value&&);
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
}

View File

@@ -165,9 +165,7 @@ static std::optional<std::string> resolve_path_component(const std::string& colu
fmt::format("ExpressionAttributeNames missing entry '{}' required by expression", column_name));
}
used_attribute_names.emplace(column_name);
auto result = std::string(rjson::to_string_view(*value));
validate_attr_name_length("", result.size(), false, "ExpressionAttributeNames contains invalid value: ");
return result;
return std::string(rjson::to_string_view(*value));
}
return std::nullopt;
}
@@ -739,26 +737,6 @@ rjson::value calculate_value(const parsed::set_rhs& rhs,
return rjson::null_value();
}
void validate_attr_name_length(std::string_view supplementary_context, size_t attr_name_length, bool is_key, std::string_view error_msg_prefix) {
constexpr const size_t DYNAMODB_KEY_ATTR_NAME_SIZE_MAX = 255;
constexpr const size_t DYNAMODB_NONKEY_ATTR_NAME_SIZE_MAX = 65535;
const size_t max_length = is_key ? DYNAMODB_KEY_ATTR_NAME_SIZE_MAX : DYNAMODB_NONKEY_ATTR_NAME_SIZE_MAX;
if (attr_name_length > max_length) {
std::string error_msg;
if (!error_msg_prefix.empty()) {
error_msg += error_msg_prefix;
}
if (!supplementary_context.empty()) {
error_msg += "in ";
error_msg += supplementary_context;
error_msg += " - ";
}
error_msg += fmt::format("Attribute name is too large, must be less than {} bytes", std::to_string(max_length + 1));
throw api_error::validation(error_msg);
}
}
} // namespace alternator
auto fmt::formatter<alternator::parsed::path>::format(const alternator::parsed::path& p, fmt::format_context& ctx) const

View File

@@ -91,18 +91,6 @@ options {
throw expressions_syntax_error(format("{} at char {}", err,
ex->get_charPositionInLine()));
}
// ANTLR3 tries to recover missing tokens - it tries to finish parsing
// and create valid objects, as if the missing token was there.
// But it has a bug and leaks these tokens.
// We override offending method and handle abandoned pointers.
std::vector<std::unique_ptr<TokenType>> _missing_tokens;
TokenType* getMissingSymbol(IntStreamType* istream, ExceptionBaseType* e,
ANTLR_UINT32 expectedTokenType, BitsetListType* follow) {
auto token = BaseType::getMissingSymbol(istream, e, expectedTokenType, follow);
_missing_tokens.emplace_back(token);
return token;
}
}
@lexer::context {
void displayRecognitionError(ANTLR_UINT8** token_names, ExceptionBaseType* ex) {
@@ -196,13 +184,7 @@ path_component: NAME | NAMEREF;
path returns [parsed::path p]:
root=path_component { $p.set_root($root.text); }
( '.' name=path_component { $p.add_dot($name.text); }
| '[' INTEGER ']' {
try {
$p.add_index(std::stoi($INTEGER.text));
} catch(std::out_of_range&) {
throw expressions_syntax_error("list index out of integer range");
}
}
| '[' INTEGER ']' { $p.add_index(std::stoi($INTEGER.text)); }
)*;
/* See comment above why the "depth" counter was needed here */
@@ -248,7 +230,7 @@ update_expression_clause returns [parsed::update_expression e]:
// Note the "EOF" token at the end of the update expression. We want to the
// parser to match the entire string given to it - not just its beginning!
update_expression returns [parsed::update_expression e]:
(update_expression_clause { e.append($update_expression_clause.e); })+ EOF;
(update_expression_clause { e.append($update_expression_clause.e); })* EOF;
projection_expression returns [std::vector<parsed::path> v]:
p=path { $v.push_back(std::move($p.p)); }
@@ -275,13 +257,6 @@ primitive_condition returns [parsed::primitive_condition c]:
(',' v=value[0] { $c.add_value(std::move($v.v)); })*
')'
)?
{
// Post-parse check to reject non-function single values
if ($c._op == parsed::primitive_condition::type::VALUE &&
!$c._values.front().is_func()) {
throw expressions_syntax_error("Single value must be a function");
}
}
;
// The following rules for parsing boolean expressions are verbose and

View File

@@ -18,8 +18,6 @@
#include "expressions_types.hh"
#include "utils/rjson.hh"
#include "utils/updateable_value.hh"
#include "stats.hh"
namespace alternator {
@@ -28,26 +26,6 @@ public:
using runtime_error::runtime_error;
};
namespace parsed {
class expression_cache_impl;
class expression_cache {
std::unique_ptr<expression_cache_impl> _impl;
public:
struct config {
utils::updateable_value<uint32_t> max_cache_entries;
};
expression_cache(config cfg, stats& stats);
~expression_cache();
// stop background tasks, if any
future<> stop();
update_expression parse_update_expression(std::string_view query);
std::vector<path> parse_projection_expression(std::string_view query);
condition_expression parse_condition_expression(std::string_view query, const char* caller);
};
} // namespace parsed
// Preferably use parsed::expression_cache instance instead of this free functions.
parsed::update_expression parse_update_expression(std::string_view query);
std::vector<parsed::path> parse_projection_expression(std::string_view query);
parsed::condition_expression parse_condition_expression(std::string_view query, const char* caller);
@@ -113,7 +91,5 @@ rjson::value calculate_value(const parsed::value& v,
rjson::value calculate_value(const parsed::set_rhs& rhs,
const rjson::value* previous_item);
void validate_attr_name_length(std::string_view supplementary_context, size_t attr_name_length, bool is_key, std::string_view error_msg_prefix = {});
} /* namespace alternator */

View File

@@ -209,7 +209,9 @@ public:
// function is supported).
// 2. Ternary operator - v1 BETWEEN v2 and v3 (means v1 >= v2 AND v1 <= v3).
// 3. N-ary operator - v1 IN ( v2, v3, ... )
// 4. A single function call (attribute_exists etc.).
// 4. A single function call (attribute_exists etc.). The parser actually
// accepts a more general "value" here but later stages reject a value
// which is not a function call (because DynamoDB does it too).
class primitive_condition {
public:
enum class type {

View File

@@ -13,7 +13,7 @@
#include "utils/rjson.hh"
#include "serialization.hh"
#include "schema/column_computation.hh"
#include "column_computation.hh"
#include "db/view/regular_column_transformation.hh"
namespace alternator {

View File

@@ -1,109 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "expressions.hh"
#include "utils/log.hh"
#include "utils/lru_string_map.hh"
#include <variant>
static logging::logger logger_("parsed-expression-cache");
namespace alternator::parsed {
struct expression_cache_impl {
stats& _stats;
using cached_expressions_types = std::variant<
update_expression,
condition_expression,
std::vector<path>
>;
sized_lru_string_map<cached_expressions_types> _cached_entries;
utils::observable<uint32_t>::observer _max_cache_entries_observer;
expression_cache_impl(expression_cache::config cfg, stats& stats);
// to define the specialized return type of `get_or_create()`
template <typename Func, typename... Args>
using ParseResult = std::invoke_result_t<Func, std::string_view, Args...>;
// Caching layer for parsed expressions
// The expression type is determined by the type of the parsing function passed as a parameter,
// and the return type is exactly the same as the return type of this parsing function.
// StatsType is used only to update appropriate statistics - currently it is aligned with the expression type,
// but it could be extended in the future if needed, e.g. split per operation.
template <stats::expression_types StatsType, typename Func, typename... Args>
ParseResult<Func, Args...> get_or_create(std::string_view query, Func&& parse_func, Args&&... other_args) {
if (_cached_entries.disabled()) {
return parse_func(query, std::forward<Args>(other_args)...);
}
if (!_cached_entries.sanity_check()) {
_stats.expression_cache.requests[StatsType].misses++;
return parse_func(query, std::forward<Args>(other_args)...);
}
auto value = _cached_entries.find(query);
if (value) {
logger_.trace("Cache hit for query: {}", query);
_stats.expression_cache.requests[StatsType].hits++;
try {
return std::get<ParseResult<Func, Args...>>(value->get());
} catch (const std::bad_variant_access&) {
// User can reach this code, by sending the same query string as a different expression type.
// In practice valid queries are different enough to not collide.
// Entries in cache are only valid queries.
// This request will fail at parsing below.
// If, by any chance this is a valid query, it will be updated below with the new value.
logger_.trace("Cache hit for '{}', but type mismatch.", query);
_stats.expression_cache.requests[StatsType].hits--;
}
} else {
logger_.trace("Cache miss for query: {}", query);
}
ParseResult<Func, Args...> expr = parse_func(query, std::forward<Args>(other_args)...);
// Invalid query will throw here ^
_stats.expression_cache.requests[StatsType].misses++;
if (value) [[unlikely]] {
value->get() = cached_expressions_types{expr};
} else {
_cached_entries.insert(query, cached_expressions_types{expr});
}
return expr;
}
};
expression_cache_impl::expression_cache_impl(expression_cache::config cfg, stats& stats) :
_stats(stats), _cached_entries(logger_, _stats.expression_cache.evictions),
_max_cache_entries_observer(cfg.max_cache_entries.observe([this] (uint32_t max_value) {
_cached_entries.set_max_size(max_value);
})) {
_cached_entries.set_max_size(cfg.max_cache_entries());
}
expression_cache::expression_cache(expression_cache::config cfg, stats& stats) :
_impl(std::make_unique<expression_cache_impl>(std::move(cfg), stats)) {
}
expression_cache::~expression_cache() = default;
future<> expression_cache::stop() {
return _impl->_cached_entries.stop();
}
update_expression expression_cache::parse_update_expression(std::string_view query) {
return _impl->get_or_create<stats::expression_types::UPDATE_EXPRESSION>(query, alternator::parse_update_expression);
}
std::vector<path> expression_cache::parse_projection_expression(std::string_view query) {
return _impl->get_or_create<stats::expression_types::PROJECTION_EXPRESSION>(query, alternator::parse_projection_expression);
}
condition_expression expression_cache::parse_condition_expression(std::string_view query, const char* caller) {
return _impl->get_or_create<stats::expression_types::CONDITION_EXPRESSION>(query, alternator::parse_condition_expression, caller);
}
} // namespace alternator::parsed

View File

@@ -8,16 +8,13 @@
#pragma once
#include "cdc/cdc_options.hh"
#include "cdc/log.hh"
#include "seastarx.hh"
#include "service/paxos/cas_request.hh"
#include "service/cas_shard.hh"
#include "utils/rjson.hh"
#include "consumed_capacity.hh"
#include "executor.hh"
#include "tracing/trace_state.hh"
#include "keys/keys.hh"
#include "keys.hh"
namespace alternator {
@@ -58,7 +55,7 @@ public:
static write_isolation get_write_isolation_for_schema(schema_ptr schema);
static write_isolation default_write_isolation;
public:
static void set_default_write_isolation(std::string_view mode);
protected:
@@ -109,27 +106,21 @@ public:
// violating this). We mark apply() "const" to let the compiler validate
// this for us. The output-only field _return_attributes is marked
// "mutable" above so that apply() can still write to it.
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const = 0;
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts) const = 0;
// Convert the above apply() into the signature needed by cas_request:
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override;
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts) override;
virtual ~rmw_operation() = default;
const wcu_consumed_capacity_counter& consumed_capacity() const noexcept { return _consumed_capacity; }
schema_ptr schema() const { return _schema; }
const rjson::value& request() const { return _request; }
rjson::value&& move_request() && { return std::move(_request); }
future<executor::request_return_type> execute(service::storage_proxy& proxy,
std::optional<service::cas_shard> cas_shard,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit,
bool needs_read_before_write,
stats& global_stats,
stats& per_table_stats,
stats& stats,
uint64_t& wcu_total);
std::optional<service::cas_shard> shard_for_execute(bool needs_read_before_write);
private:
inline bool should_fill_preimage() const { return _schema->cdc_options().enabled(); }
std::optional<shard_id> shard_for_execute(bool needs_read_before_write);
};
} // namespace alternator

View File

@@ -11,8 +11,8 @@
#include "utils/log.hh"
#include "serialization.hh"
#include "error.hh"
#include "types/concrete_types.hh"
#include "types/json_utils.hh"
#include "concrete_types.hh"
#include "cql3/type_json.hh"
#include "mutation/position_in_partition.hh"
static logging::logger slogger("alternator-serialization");
@@ -282,23 +282,15 @@ std::string type_to_string(data_type type) {
return it->second;
}
std::optional<bytes> try_get_key_column_value(const rjson::value& item, const column_definition& column) {
bytes get_key_column_value(const rjson::value& item, const column_definition& column) {
std::string column_name = column.name_as_text();
const rjson::value* key_typed_value = rjson::find(item, column_name);
if (!key_typed_value) {
return std::nullopt;
throw api_error::validation(fmt::format("Key column {} not found", column_name));
}
return get_key_from_typed_value(*key_typed_value, column);
}
bytes get_key_column_value(const rjson::value& item, const column_definition& column) {
auto value = try_get_key_column_value(item, column);
if (!value) {
throw api_error::validation(fmt::format("Key column {} not found", column.name_as_text()));
}
return std::move(*value);
}
// Parses the JSON encoding for a key value, which is a map with a single
// entry whose key is the type and the value is the encoded value.
// If this type does not match the desired "type_str", an api_error::validation
@@ -388,38 +380,20 @@ clustering_key ck_from_json(const rjson::value& item, schema_ptr schema) {
return clustering_key::make_empty();
}
std::vector<bytes> raw_ck;
// Note: it's possible to get more than one clustering column here, as
// Alternator can be used to read scylla internal tables.
// FIXME: this is a loop, but we really allow only one clustering key column.
for (const column_definition& cdef : schema->clustering_key_columns()) {
auto raw_value = get_key_column_value(item, cdef);
bytes raw_value = get_key_column_value(item, cdef);
raw_ck.push_back(std::move(raw_value));
}
return clustering_key::from_exploded(raw_ck);
}
clustering_key_prefix ck_prefix_from_json(const rjson::value& item, schema_ptr schema) {
if (schema->clustering_key_size() == 0) {
return clustering_key_prefix::make_empty();
}
std::vector<bytes> raw_ck;
for (const column_definition& cdef : schema->clustering_key_columns()) {
auto raw_value = try_get_key_column_value(item, cdef);
if (!raw_value) {
break;
}
raw_ck.push_back(std::move(*raw_value));
}
return clustering_key_prefix::from_exploded(raw_ck);
}
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema) {
const bool is_alternator_ks = is_alternator_keyspace(schema->ks_name());
if (is_alternator_ks) {
return position_in_partition::for_key(ck_from_json(item, schema));
auto ck = ck_from_json(item, schema);
if (is_alternator_keyspace(schema->ks_name())) {
return position_in_partition::for_key(std::move(ck));
}
const auto region_item = rjson::find(item, scylla_paging_region);
const auto weight_item = rjson::find(item, scylla_paging_weight);
if (bool(region_item) != bool(weight_item)) {
@@ -439,9 +413,8 @@ position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema)
} else {
throw std::runtime_error(fmt::format("Invalid value for weight: {}", weight_view));
}
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(ck_prefix_from_json(item, schema)) : std::nullopt);
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(std::move(ck)) : std::nullopt);
}
auto ck = ck_from_json(item, schema);
if (ck.is_empty()) {
return position_in_partition::for_partition_start();
}

View File

@@ -13,7 +13,7 @@
#include <optional>
#include "types/types.hh"
#include "schema/schema_fwd.hh"
#include "keys/keys.hh"
#include "keys.hh"
#include "utils/rjson.hh"
#include "utils/big_decimal.hh"

View File

@@ -13,7 +13,7 @@
#include <seastar/http/function_handlers.hh>
#include <seastar/http/short_streams.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/json/json_elements.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/short_streams.hh>
#include "seastarx.hh"
@@ -31,9 +31,6 @@
#include "gms/gossiper.hh"
#include "utils/overloaded_functor.hh"
#include "utils/aws_sigv4.hh"
#include "client_data.hh"
#include "utils/updateable_value.hh"
#include <zlib.h>
static logging::logger slogger("alternator-server");
@@ -103,13 +100,6 @@ static void handle_CORS(const request& req, reply& rep, bool preflight) {
// the user directly. Other exceptions are unexpected, and reported as
// Internal Server Error.
class api_handler : public handler_base {
// Although the the DynamoDB API responses are JSON, additional
// conventions apply to these responses. For this reason, DynamoDB uses
// the content type "application/x-amz-json-1.0" instead of the standard
// "application/json". Some other AWS services use later versions instead
// of "1.0", but DynamoDB currently uses "1.0". Note that this content
// type applies to all replies, both success and error.
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
public:
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
@@ -134,19 +124,22 @@ public:
}
auto res = resf.get();
std::visit(overloaded_functor {
[&] (std::string&& str) {
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(str);
rep->set_content_type(REPLY_CONTENT_TYPE);
},
[&] (executor::body_writer&& body_writer) {
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
},
[&] (const api_error& err) {
generate_error_reply(*rep, err);
}
}, std::move(res));
[&] (const json::json_return_type& json_return_value) {
slogger.trace("api_handler success case");
if (json_return_value._body_writer) {
// Unfortunately, write_body() forces us to choose
// from a fixed and irrelevant list of "mime-types"
// at this point. But we'll override it with the
// one (application/x-amz-json-1.0) below.
rep->write_body("json", std::move(json_return_value._body_writer));
} else {
rep->_content += json_return_value._res;
}
},
[&] (const api_error& err) {
generate_error_reply(*rep, err);
}
}, res);
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
@@ -158,6 +151,7 @@ public:
handle_CORS(*req, *rep, false);
return _f_handle(std::move(req), std::move(rep)).then(
[](std::unique_ptr<reply> rep) {
rep->set_mime_type("application/x-amz-json-1.0");
rep->done();
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
@@ -173,7 +167,6 @@ protected:
rjson::add(results, "message", err._msg);
rep._content = rjson::print(std::move(results));
rep._status = err._http_code;
rep.set_content_type(REPLY_CONTENT_TYPE);
slogger.trace("api_handler error case: {}", rep._content);
}
@@ -235,8 +228,9 @@ protected:
// If the rack does not exist, we return an empty list - not an error.
sstring query_rack = req->get_query_param("rack");
for (auto& id : local_dc_nodes) {
auto ip = _gossiper.get_address_map().get(id);
if (!query_rack.empty()) {
auto rack = _gossiper.get_application_state_value(id, gms::application_state::RACK);
auto rack = _gossiper.get_application_state_value(ip, gms::application_state::RACK);
if (rack != query_rack) {
continue;
}
@@ -244,10 +238,10 @@ protected:
// Note that it's not enough for the node to be is_alive() - a
// node joining the cluster is also "alive" but not responsive to
// requests. We alive *and* normal. See #19694, #21538.
if (_gossiper.is_alive(id) && _gossiper.is_normal(id)) {
if (_gossiper.is_alive(ip) && _gossiper.is_normal(ip)) {
// Use the gossiped broadcast_rpc_address if available instead
// of the internal IP address "ip". See discussion in #18711.
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(id)));
rjson::push_back(results, rjson::from_string(_gossiper.get_rpc_address(ip)));
}
}
rep->set_status(reply::status_type::ok);
@@ -273,57 +267,24 @@ protected:
}
};
// This function increments the authentication_failures counter, and may also
// log a warn-level message and/or throw an exception, depending on what
// enforce_authorization and warn_authorization are set to.
// The username and client address are only used for logging purposes -
// they are not included in the error message returned to the client, since
// the client knows who it is.
// Note that if enforce_authorization is false, this function will return
// without throwing. So a caller that doesn't want to continue after an
// authentication_error must explicitly return after calling this function.
template<typename Exception>
static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) {
stats.authentication_failures++;
if (enforce_authorization) {
if (warn_authorization) {
slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address);
}
throw std::move(e);
} else {
if (warn_authorization) {
slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address);
}
}
}
future<std::string> server::verify_signature(const request& req, const chunked_content& content) {
if (!_enforce_authorization.get() && !_warn_authorization.get()) {
if (!_enforce_authorization) {
slogger.debug("Skipping authorization");
return make_ready_future<std::string>();
}
auto host_it = req._headers.find("Host");
if (host_it == req._headers.end()) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature("Host header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::invalid_signature("Host header is mandatory for signature verification");
}
auto authorization_it = req._headers.find("Authorization");
if (authorization_it == req._headers.end()) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::missing_authentication_token("Authorization header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
}
std::string host = host_it->second;
std::string_view authorization_header = authorization_it->second;
auto pos = authorization_header.find_first_of(' ');
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)),
"", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
}
authorization_header.remove_prefix(pos+1);
std::string credential;
@@ -358,9 +319,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address());
return make_ready_future<std::string>();
throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential));
}
std::string user(credential_split[0]);
std::string datestamp(credential_split[1]);
@@ -384,7 +343,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
return get_key_from_roles(proxy, as, std::move(username));
};
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
user = std::move(user),
host = std::move(host),
datestamp = std::move(datestamp),
@@ -392,32 +351,18 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
signed_headers_map = std::move(signed_headers_map),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
key_cache::value_ptr key_ptr(nullptr);
try {
key_ptr = key_ptr_fut.get();
} catch (const api_error& e) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
e, user, req.get_client_address());
return std::string();
}
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
std::string signature;
try {
signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method,
datestamp, signed_headers_str, signed_headers_map, &content, region, service, "");
} catch (const std::exception& e) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())),
user, req.get_client_address());
return std::string();
throw api_error::invalid_signature(e.what());
}
if (signature != std::string_view(user_signature)) {
_key_cache.remove(user);
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::unrecognized_client("wrong signature"),
user, req.get_client_address());
return std::string();
throw api_error::unrecognized_client("The security token included in the request is invalid.");
}
return user;
});
@@ -430,82 +375,35 @@ static tracing::trace_state_ptr create_tracing_session(tracing::tracing& tracing
return tracing_instance.create_session(tracing::trace_type::QUERY, props);
}
// A helper class to represent a potentially truncated view of a chunked_content.
// If the content is short enough and single chunked, it just holds a view into the content.
// Otherwise it will be copied into an internal buffer, possibly truncated (depending on maximum allowed size passed in),
// and the view will point into that buffer.
// `as_view()` method will return the view.
// `take_as_sstring()` will either move out the internal buffer (if any), or create a new sstring from the view.
// You should consider `as_view()` valid as long both the original chunked_content and the truncated_content object are alive.
class truncated_content {
std::string_view _view;
sstring _content_maybe;
void copy_from_content(const chunked_content& content) {
size_t offset = 0;
for(auto &tmp : content) {
size_t to_copy = std::min(tmp.size(), _content_maybe.size() - offset);
std::copy(tmp.get(), tmp.get() + to_copy, _content_maybe.data() + offset);
offset += to_copy;
if (offset >= _content_maybe.size()) {
break;
}
}
// truncated_content_view() prints a potentially long chunked_content for
// debugging purposes. In the common case when the content is not excessively
// long, it just returns a view into the given content, without any copying.
// But when the content is very long, it is truncated after some arbitrary
// max_len (or one chunk, whichever comes first), with "<truncated>" added at
// the end. To do this modification to the string, we need to create a new
// std::string, so the caller must pass us a reference to one, "buf", where
// we can store the content. The returned view is only alive for as long this
// buf is kept alive.
static std::string_view truncated_content_view(const chunked_content& content, std::string& buf) {
constexpr size_t max_len = 1024;
if (content.empty()) {
return std::string_view();
} else if (content.size() == 1 && content.begin()->size() <= max_len) {
return std::string_view(content.begin()->get(), content.begin()->size());
} else {
buf = std::string(content.begin()->get(), std::min(content.begin()->size(), max_len)) + "<truncated>";
return std::string_view(buf);
}
public:
truncated_content(const chunked_content& content, size_t max_len = std::numeric_limits<size_t>::max()) {
if (content.empty()) return;
if (content.size() == 1 && content.begin()->size() <= max_len) {
_view = std::string_view(content.begin()->get(), content.begin()->size());
return;
}
constexpr std::string_view truncated_text = "<truncated>";
size_t content_size = 0;
for(auto &tmp : content) {
content_size += tmp.size();
}
if (content_size <= max_len) {
_content_maybe = sstring{ sstring::initialized_later{}, content_size };
copy_from_content(content);
}
else {
_content_maybe = sstring{ sstring::initialized_later{}, max_len + truncated_text.size() };
copy_from_content(content);
std::copy(truncated_text.begin(), truncated_text.end(), _content_maybe.data() + _content_maybe.size() - truncated_text.size());
}
_view = std::string_view(_content_maybe);
}
std::string_view as_view() const { return _view; }
sstring take_as_sstring() && {
if (_content_maybe.empty() && !_view.empty()) {
return sstring{_view};
}
return std::move(_content_maybe);
}
};
// `truncated_content_view` will produce an object representing a view to a passed content
// possibly truncated at some length. The value returned is used in two ways:
// - to print it in logs (use `as_view()` method for this)
// - to pass it to tracing object, where it will be stored and used later
// (use `take_as_sstring()` method as this produces a copy in form of a sstring)
// `truncated_content` delays constructing `sstring` object until it's actually needed.
// `truncated_content` is valid as long as passed `content` is alive.
// if the content is truncated, `<truncated>` will be appended at the maximum size limit
// and total size will be `max_users_query_size_in_trace_output() + strlen("<truncated>")`.
static truncated_content truncated_content_view(const chunked_content& content, size_t max_size) {
return truncated_content{content, max_size};
}
static tracing::trace_state_ptr maybe_trace_query(service::client_state& client_state, std::string_view username, std::string_view op, const chunked_content& query, size_t max_users_query_size_in_trace_output) {
static tracing::trace_state_ptr maybe_trace_query(service::client_state& client_state, std::string_view username, std::string_view op, const chunked_content& query) {
tracing::trace_state_ptr trace_state;
tracing::tracing& tracing_instance = tracing::tracing::get_local_tracing_instance();
if (tracing_instance.trace_next_query() || tracing_instance.slow_query_tracing_enabled()) {
trace_state = create_tracing_session(tracing_instance);
std::string buf;
tracing::add_session_param(trace_state, "alternator_op", op);
tracing::add_query(trace_state, truncated_content_view(query, max_users_query_size_in_trace_output).take_as_sstring());
tracing::add_query(trace_state, truncated_content_view(query, buf));
tracing::begin(trace_state, seastar::format("Alternator {}", op), client_state.get_client_address());
if (!username.empty()) {
tracing::set_username(trace_state, auth::authenticated_user(username));
@@ -514,207 +412,30 @@ static tracing::trace_state_ptr maybe_trace_query(service::client_state& client_
return trace_state;
}
// This read_entire_stream() is similar to Seastar's read_entire_stream()
// which reads the given content_stream until its end into non-contiguous
// memory. The difference is that this implementation takes an extra length
// limit, and throws an error if we read more than this limit.
// This length-limited variant would not have been needed if Seastar's HTTP
// server's set_content_length_limit() worked in every case, but unfortunately
// it does not - it only works if the request has a Content-Length header (see
// issue #8196). In contrast this function can limit the request's length no
// matter how it's encoded. We need this limit to protect Alternator from
// oversized requests that can deplete memory.
static future<chunked_content>
read_entire_stream(input_stream<char>& inp, size_t length_limit) {
chunked_content ret;
// We try to read length_limit + 1 bytes, so that we can throw an
// exception if we managed to read more than length_limit.
ssize_t remain = length_limit + 1;
do {
temporary_buffer<char> buf = co_await inp.read_up_to(remain);
if (buf.empty()) {
break;
}
remain -= buf.size();
ret.push_back(std::move(buf));
} while (remain > 0);
// If we read the full length_limit + 1 bytes, we went over the limit:
if (remain <= 0) {
// By throwing here an error, we may send a reply (the error message)
// without having read the full request body. Seastar's httpd will
// realize that we have not read the entire content stream, and
// correctly mark the connection unreusable, i.e., close it.
// This means we are currently exposed to issue #12166 caused by
// Seastar issue 1325), where the client may get an RST instead of
// a FIN, and may rarely get a "Connection reset by peer" before
// reading the error we send.
throw api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", length_limit));
}
co_return ret;
}
// safe_gzip_stream is an exception-safe wrapper for zlib's z_stream.
// The "z_stream" struct is used by zlib to hold state while decompressing a
// stream of data. It allocates memory which must be freed with inflateEnd(),
// which the destructor of this class does.
class safe_gzip_zstream {
z_stream _zs;
public:
safe_gzip_zstream() {
memset(&_zs, 0, sizeof(_zs));
// The strange 16 + WMAX_BITS tells zlib to expect and decode
// a gzip header, not a zlib header.
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
}
~safe_gzip_zstream() {
inflateEnd(&_zs);
}
z_stream* operator->() {
return &_zs;
}
z_stream* get() {
return &_zs;
}
void reset() {
inflateReset(&_zs);
}
};
// ungzip() takes a chunked_content with a gzip-compressed request body,
// uncompresses it, and returns the uncompressed content as a chunked_content.
// If the uncompressed content exceeds length_limit, an error is thrown.
static future<chunked_content>
ungzip(chunked_content&& compressed_body, size_t length_limit) {
chunked_content ret;
// output_buf can be any size - when uncompressing input_buf, it doesn't
// need to fit in a single output_buf, we'll use multiple output_buf for
// a single input_buf if needed.
constexpr size_t OUTPUT_BUF_SIZE = 4096;
temporary_buffer<char> output_buf;
safe_gzip_zstream strm;
bool complete_stream = false; // empty input is not a valid gzip
size_t total_out_bytes = 0;
for (const temporary_buffer<char>& input_buf : compressed_body) {
if (input_buf.empty()) {
continue;
}
complete_stream = false;
strm->next_in = (Bytef*) input_buf.get();
strm->avail_in = (uInt) input_buf.size();
do {
co_await coroutine::maybe_yield();
if (output_buf.empty()) {
output_buf = temporary_buffer<char>(OUTPUT_BUF_SIZE);
}
strm->next_out = (Bytef*) output_buf.get();
strm->avail_out = OUTPUT_BUF_SIZE;
int e = inflate(strm.get(), Z_NO_FLUSH);
size_t out_bytes = OUTPUT_BUF_SIZE - strm->avail_out;
if (out_bytes > 0) {
// If output_buf is nearly full, we save it as-is in ret. But
// if it only has little data, better copy to a small buffer.
if (out_bytes > OUTPUT_BUF_SIZE/2) {
ret.push_back(std::move(output_buf).prefix(out_bytes));
// output_buf is now empty. if this loop finds more input,
// we'll allocate a new output buffer.
} else {
ret.push_back(temporary_buffer<char>(output_buf.get(), out_bytes));
}
total_out_bytes += out_bytes;
if (total_out_bytes > length_limit) {
throw api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", length_limit));
}
}
if (e == Z_STREAM_END) {
// There may be more input after the first gzip stream - in
// either this input_buf or the next one. The additional input
// should be a second concatenated gzip. We need to allow that
// by resetting the gzip stream and continuing the input loop
// until there's no more input.
strm.reset();
if (strm->avail_in == 0) {
complete_stream = true;
break;
}
} else if (e != Z_OK && e != Z_BUF_ERROR) {
// DynamoDB returns an InternalServerError when given a bad
// gzip request body. See test test_broken_gzip_content
throw api_error::internal("Error during gzip decompression of request body");
}
} while (strm->avail_in > 0 || strm->avail_out == 0);
}
if (!complete_stream) {
// The gzip stream was not properly finished with Z_STREAM_END
throw api_error::internal("Truncated gzip in request body");
}
co_return ret;
}
future<executor::request_return_type> server::handle_api_request(std::unique_ptr<request> req) {
_executor._stats.total_operations++;
sstring target = req->get_header("X-Amz-Target");
// target is DynamoDB API version followed by a dot '.' and operation type (e.g. CreateTable)
auto dot = target.find('.');
std::string_view op = (dot == sstring::npos) ? std::string_view() : std::string_view(target).substr(dot+1);
if (req->content_length > request_content_length_limit) {
// If we have a Content-Length header and know the request will be too
// long, we don't need to wait for read_entire_stream() below to
// discover it. And we definitely mustn't try to get_units() below for
// for such a size.
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
}
// JSON parsing can allocate up to roughly 2x the size of the raw
// document, + a couple of bytes for maintenance.
// If the Content-Length of the request is not available, we assume
// the largest possible request (request_content_length_limit, i.e., 16 MB)
// and after reading the request we return_units() the excess.
size_t mem_estimate = (req->content_length ? req->content_length : request_content_length_limit) * 2 + 8000;
// TODO: consider the case where req->content_length is missing. Maybe
// we need to take the content_length_limit and return some of the units
// when we finish read_content_and_verify_signature?
size_t mem_estimate = req->content_length * 2 + 8000;
auto units_fut = get_units(*_memory_limiter, mem_estimate);
if (_memory_limiter->waiters()) {
++_executor._stats.requests_blocked_memory;
}
auto units = co_await std::move(units_fut);
SCYLLA_ASSERT(req->content_stream);
chunked_content content = co_await read_entire_stream(*req->content_stream, request_content_length_limit);
// If the request had no Content-Length, we reserved too many units
// so need to return some
if (req->content_length == 0) {
size_t content_length = 0;
for (const auto& chunk : content) {
content_length += chunk.size();
}
size_t new_mem_estimate = content_length * 2 + 8000;
units.return_units(mem_estimate - new_mem_estimate);
}
chunked_content content = co_await util::read_entire_stream(*req->content_stream);
auto username = co_await verify_signature(*req, content);
// If the request is compressed, uncompress it now, after we checked
// the signature (the signature is computed on the compressed content).
// We apply the request_content_length_limit again to the uncompressed
// content - we don't want to allow a tiny compressed request to
// expand to a huge uncompressed request.
sstring content_encoding = req->get_header("Content-Encoding");
if (content_encoding == "gzip") {
content = co_await ungzip(std::move(content), request_content_length_limit);
} else if (!content_encoding.empty()) {
// DynamoDB returns a 500 error for unsupported Content-Encoding.
// I'm not sure if this is the best error code, but let's do it too.
// See the test test_garbage_content_encoding confirming this case.
co_return api_error::internal("Unsupported Content-Encoding");
}
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
auto system_clients_entry = _ongoing_requests.emplace(
req->get_client_address(), req->get_header("User-Agent"),
username, current_scheduling_group(),
req->get_protocol_name() == "https");
if (slogger.is_enabled(log_level::trace)) {
slogger.trace("Request: {} {} {}", op, truncated_content_view(content, _max_users_query_size_in_trace_output).as_view(), req->_headers);
std::string buf;
slogger.trace("Request: {} {} {}", op, truncated_content_view(content, buf), req->_headers);
}
auto callback_it = _callbacks.find(op);
if (callback_it == _callbacks.end()) {
@@ -734,7 +455,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
}
co_await client_state.maybe_update_per_service_level_params();
tracing::trace_state_ptr trace_state = maybe_trace_query(client_state, username, op, content, _max_users_query_size_in_trace_output.get());
tracing::trace_state_ptr trace_state = maybe_trace_query(client_state, username, op, content);
tracing::trace(trace_state, "{}", op);
auto user = client_state.user();
@@ -742,9 +463,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
client_state = std::move(client_state), trace_state = std::move(trace_state),
units = std::move(units), req = std::move(req)] () mutable -> future<executor::request_return_type> {
rjson::value json_request = co_await _json_parser.parse(std::move(content));
if (!json_request.IsObject()) {
co_return api_error::validation("Request content must be an object");
}
co_return co_await callback(_executor, client_state, trace_state,
make_service_permit(std::move(units)), std::move(json_request), std::move(req));
};
@@ -785,9 +503,9 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
, _auth_service(auth_service)
, _sl_controller(sl_controller)
, _key_cache(1024, 1min, slogger)
, _max_users_query_size_in_trace_output(1024)
, _enforce_authorization(false)
, _enabled_servers{}
, _pending_requests("alternator::server::pending_requests")
, _pending_requests{}
, _timeout_config(_proxy.data_dictionary().get_config())
, _callbacks{
{"CreateTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value json_request, std::unique_ptr<request> req) {
@@ -866,13 +584,10 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
utils::updateable_value<bool> enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
_enforce_authorization = std::move(enforce_authorization);
_warn_authorization = std::move(warn_authorization);
_max_concurrent_requests = std::move(max_concurrent_requests);
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
if (!port && !https_port) {
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
" must be specified in order to init an alternator HTTP server instance"));
@@ -882,12 +597,14 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
if (port) {
set_routes(_http_server._routes);
_http_server.set_content_length_limit(server::content_length_limit);
_http_server.set_content_streaming(true);
_http_server.listen(socket_address{addr, *port}).get();
_enabled_servers.push_back(std::ref(_http_server));
}
if (https_port) {
set_routes(_https_server._routes);
_https_server.set_content_length_limit(server::content_length_limit);
_https_server.set_content_streaming(true);
if (this_shard_id() == 0) {
@@ -962,37 +679,6 @@ future<> server::json_parser::stop() {
return std::move(_run_parse_json_thread);
}
// Convert an entry in the server's list of ongoing Alternator requests
// (_ongoing_requests) into a client_data object. This client_data object
// will then be used to produce a row for the "system.clients" virtual table.
client_data server::ongoing_request::make_client_data() const {
client_data cd;
cd.ct = client_type::alternator;
cd.ip = _client_address.addr();
cd.port = _client_address.port();
cd.shard_id = this_shard_id();
cd.connection_stage = client_connection_stage::established;
cd.username = _username;
cd.scheduling_group_name = _scheduling_group.name();
cd.ssl_enabled = _is_https;
// For now, we save the full User-Agent header as the "driver name"
// and keep "driver_version" unset.
cd.driver_name = _user_agent;
// Leave "protocol_version" unset, it has no meaning in Alternator.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
// As reported in issue #9216, we never set these fields in CQL
// either (see cql_server::connection::make_client_data()).
return cd;
}
future<utils::chunked_vector<client_data>> server::get_client_data() {
utils::chunked_vector<client_data> ret;
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
ret.emplace_back(r.make_client_data());
});
co_return ret;
}
const char* api_error::what() const noexcept {
if (_what_string.empty()) {
_what_string = fmt::format("{} {}: {}", std::to_underlying(_http_code), _type, _msg);

View File

@@ -9,7 +9,6 @@
#pragma once
#include "alternator/executor.hh"
#include "utils/scoped_item_list.hh"
#include <seastar/core/future.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/http/httpd.hh>
@@ -21,18 +20,12 @@
#include "utils/updateable_value.hh"
#include <seastar/core/units.hh>
struct client_data;
namespace alternator {
using chunked_content = rjson::chunked_content;
class server : public peering_sharded_service<server> {
// The maximum size of a request body that Alternator will accept,
// in bytes. This is a safety measure to prevent Alternator from
// running out of memory when a client sends a very large request.
// DynamoDB also has the same limit set to 16 MB.
static constexpr size_t request_content_length_limit = 16*MB;
static constexpr size_t content_length_limit = 16*MB;
using alternator_callback = std::function<future<executor::request_return_type>(executor&, executor::client_state&,
tracing::trace_state_ptr, service_permit, rjson::value, std::unique_ptr<http::request>)>;
using alternator_callbacks_map = std::unordered_map<std::string_view, alternator_callback>;
@@ -47,10 +40,8 @@ class server : public peering_sharded_service<server> {
key_cache _key_cache;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
utils::updateable_value<uint64_t> _max_users_query_size_in_trace_output;
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server>, 2> _enabled_servers;
named_gate _pending_requests;
gate _pending_requests;
// In some places we will need a CQL updateable_timeout_config object even
// though it isn't really relevant for Alternator which defines its own
// timeouts separately. We can create this object only once.
@@ -83,31 +74,12 @@ class server : public peering_sharded_service<server> {
};
json_parser _json_parser;
// The server maintains a list of ongoing requests, that are being handled
// by handle_api_request(). It uses this list in get_client_data(), which
// is called when reading the "system.clients" virtual table.
struct ongoing_request {
socket_address _client_address;
sstring _user_agent;
sstring _username;
scheduling_group _scheduling_group;
bool _is_https;
client_data make_client_data() const;
};
utils::scoped_item_list<ongoing_request> _ongoing_requests;
public:
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
utils::updateable_value<bool> enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
// get_client_data() is called (on each shard separately) when the virtual
// table "system.clients" is read. It is expected to generate a list of
// clients connected to this server (on this shard). This function is
// called by alternator::controller::get_client_data().
future<utils::chunked_vector<client_data>> get_client_data();
private:
void set_routes(seastar::httpd::routes& r);
// If verification succeeds, returns the authenticated user's username

View File

@@ -14,58 +14,28 @@
namespace alternator {
const char* ALTERNATOR_METRICS = "alternator";
static seastar::metrics::histogram estimated_histogram_to_metrics(const utils::estimated_histogram& histogram) {
seastar::metrics::histogram res;
res.buckets.resize(histogram.bucket_offsets.size());
uint64_t cumulative_count = 0;
res.sample_count = histogram._count;
res.sample_sum = histogram._sample_sum;
for (size_t i = 0; i < res.buckets.size(); i++) {
auto& v = res.buckets[i];
v.upper_bound = histogram.bucket_offsets[i];
cumulative_count += histogram.buckets[i];
v.count = cumulative_count;
}
return res;
}
static seastar::metrics::label column_family_label("cf");
static seastar::metrics::label keyspace_label("ks");
static void register_metrics_with_optional_table(seastar::metrics::metric_groups& metrics, const stats& stats, const sstring& ks, const sstring& table) {
stats::stats() : api_operations{} {
// Register the
seastar::metrics::label op("op");
bool has_table = table.length();
std::vector<seastar::metrics::label> aggregate_labels;
std::vector<seastar::metrics::label_instance> labels = {alternator_label};
sstring group_name = (has_table)? "alternator_table" : "alternator";
if (has_table) {
labels.push_back(column_family_label(table));
labels.push_back(keyspace_label(ks));
aggregate_labels.push_back(seastar::metrics::shard_label);
}
metrics.add_group(group_name, {
#define OPERATION(name, CamelCaseName) \
seastar::metrics::make_total_operations("operation", stats.api_operations.name, \
seastar::metrics::description("number of operations via Alternator API"), labels)(basic_level)(op(CamelCaseName)).aggregate(aggregate_labels).set_skip_when_empty(),
#define OPERATION_LATENCY(name, CamelCaseName) \
metrics.add_group(group_name, { \
seastar::metrics::make_histogram("op_latency", \
seastar::metrics::description("Latency histogram of an operation via Alternator API"), labels, [&stats]{return to_metrics_histogram(stats.api_operations.name.histogram());})(op(CamelCaseName))(basic_level).aggregate({seastar::metrics::shard_label}).set_skip_when_empty()}); \
if (!has_table) {\
metrics.add_group("alternator", { \
seastar::metrics::make_summary("op_latency_summary", \
seastar::metrics::description("Latency summary of an operation via Alternator API"), [&stats]{return to_metrics_summary(stats.api_operations.name.summary());})(op(CamelCaseName))(basic_level)(alternator_label).set_skip_when_empty()}); \
}
_metrics.add_group("alternator", {
#define OPERATION(name, CamelCaseName) \
seastar::metrics::make_total_operations("operation", api_operations.name, \
seastar::metrics::description("number of operations via Alternator API"), {op(CamelCaseName), alternator_label, basic_level}).set_skip_when_empty(),
#define OPERATION_LATENCY(name, CamelCaseName) \
seastar::metrics::make_histogram("op_latency", \
seastar::metrics::description("Latency histogram of an operation via Alternator API"), {op(CamelCaseName), alternator_label, basic_level}, [this]{return to_metrics_histogram(api_operations.name.histogram());}).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(), \
seastar::metrics::make_summary("op_latency_summary", \
seastar::metrics::description("Latency summary of an operation via Alternator API"), [this]{return to_metrics_summary(api_operations.name.summary());})(op(CamelCaseName))(basic_level)(alternator_label).set_skip_when_empty(),
OPERATION(batch_get_item, "BatchGetItem")
OPERATION(batch_write_item, "BatchWriteItem")
OPERATION(create_backup, "CreateBackup")
OPERATION(create_global_table, "CreateGlobalTable")
OPERATION(create_table, "CreateTable")
OPERATION(delete_backup, "DeleteBackup")
OPERATION(delete_item, "DeleteItem")
OPERATION(delete_table, "DeleteTable")
OPERATION(describe_backup, "DescribeBackup")
OPERATION(describe_continuous_backups, "DescribeContinuousBackups")
OPERATION(describe_endpoints, "DescribeEndpoints")
@@ -94,117 +64,55 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
OPERATION(update_item, "UpdateItem")
OPERATION(update_table, "UpdateTable")
OPERATION(update_time_to_live, "UpdateTimeToLive")
OPERATION_LATENCY(put_item_latency, "PutItem")
OPERATION_LATENCY(get_item_latency, "GetItem")
OPERATION_LATENCY(delete_item_latency, "DeleteItem")
OPERATION_LATENCY(update_item_latency, "UpdateItem")
OPERATION_LATENCY(batch_write_item_latency, "BatchWriteItem")
OPERATION_LATENCY(batch_get_item_latency, "BatchGetItem")
OPERATION(list_streams, "ListStreams")
OPERATION(describe_stream, "DescribeStream")
OPERATION(get_shard_iterator, "GetShardIterator")
OPERATION(get_records, "GetRecords")
OPERATION_LATENCY(get_records_latency, "GetRecords")
});
OPERATION_LATENCY(put_item_latency, "PutItem")
OPERATION_LATENCY(get_item_latency, "GetItem")
OPERATION_LATENCY(delete_item_latency, "DeleteItem")
OPERATION_LATENCY(update_item_latency, "UpdateItem")
OPERATION_LATENCY(batch_write_item_latency, "BatchWriteItem")
OPERATION_LATENCY(batch_get_item_latency, "BatchGetItem")
OPERATION_LATENCY(get_records_latency, "GetRecords")
if (!has_table) {
// Create and delete operations are not applicable to a per-table metrics
// only register it for the global metrics
metrics.add_group("alternator", {
OPERATION(create_table, "CreateTable")
OPERATION(delete_table, "DeleteTable")
});
}
metrics.add_group(group_name, {
seastar::metrics::make_total_operations("unsupported_operations", stats.unsupported_operations,
seastar::metrics::description("number of unsupported operations via Alternator API"), labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("total_operations", stats.total_operations,
seastar::metrics::description("number of total operations via Alternator API"), labels)(basic_level).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("reads_before_write", stats.reads_before_write,
seastar::metrics::description("number of performed read-before-write operations"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("write_using_lwt", stats.write_using_lwt,
seastar::metrics::description("number of writes that used LWT"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("shard_bounce_for_lwt", stats.shard_bounce_for_lwt,
seastar::metrics::description("number writes that had to be bounced from this shard because of LWT requirements"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("requests_blocked_memory", stats.requests_blocked_memory,
seastar::metrics::description("Counts a number of requests blocked due to memory pressure."), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("requests_shed", stats.requests_shed,
seastar::metrics::description("Counts a number of requests shed due to overload."), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("filtered_rows_read_total", stats.cql_stats.filtered_rows_read_total,
seastar::metrics::description("number of rows read during filtering operations"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("filtered_rows_matched_total", stats.cql_stats.filtered_rows_matched_total,
seastar::metrics::description("number of rows read and matched during filtering operations"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_counter("rcu_total", [&stats]{return 0.5 * stats.rcu_half_units_total;},
seastar::metrics::description("total number of consumed read units"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", stats.wcu_total[stats::wcu_types::PUT_ITEM],
seastar::metrics::description("total number of consumed write units"), labels)(op("PutItem")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", stats.wcu_total[stats::wcu_types::DELETE_ITEM],
seastar::metrics::description("total number of consumed write units"), labels)(op("DeleteItem")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", stats.wcu_total[stats::wcu_types::UPDATE_ITEM],
seastar::metrics::description("total number of consumed write units"), labels)(op("UpdateItem")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", stats.wcu_total[stats::wcu_types::INDEX],
seastar::metrics::description("total number of consumed write units"), labels)(op("Index")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("filtered_rows_dropped_total", [&stats] { return stats.cql_stats.filtered_rows_read_total - stats.cql_stats.filtered_rows_matched_total; },
seastar::metrics::description("number of rows read and dropped during filtering operations"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"), labels,
stats.api_operations.batch_write_item_batch_total)(op("BatchWriteItem")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"), labels,
stats.api_operations.batch_get_item_batch_total)(op("BatchGetItem")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_histogram("batch_item_count_histogram", seastar::metrics::description("Histogram of the number of items in a batch request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.api_operations.batch_get_item_histogram);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("batch_item_count_histogram", seastar::metrics::description("Histogram of the number of items in a batch request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.api_operations.batch_write_item_histogram);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.get_item_op_size_kb);})(op("GetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.put_item_op_size_kb);})(op("PutItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.delete_item_op_size_kb);})(op("DeleteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.update_item_op_size_kb);})(op("UpdateItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.batch_get_item_op_size_kb);})(op("BatchGetItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_histogram("operation_size_kb", seastar::metrics::description("Histogram of item sizes involved in a request"), labels,
[&stats]{ return estimated_histogram_to_metrics(stats.operation_sizes.batch_write_item_op_size_kb);})(op("BatchWriteItem")).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
_metrics.add_group("alternator", {
seastar::metrics::make_total_operations("unsupported_operations", unsupported_operations,
seastar::metrics::description("number of unsupported operations via Alternator API"))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("total_operations", total_operations,
seastar::metrics::description("number of total operations via Alternator API"))(basic_level)(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("reads_before_write", reads_before_write,
seastar::metrics::description("number of performed read-before-write operations"))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("write_using_lwt", write_using_lwt,
seastar::metrics::description("number of writes that used LWT"))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("shard_bounce_for_lwt", shard_bounce_for_lwt,
seastar::metrics::description("number writes that had to be bounced from this shard because of LWT requirements"))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("requests_blocked_memory", requests_blocked_memory,
seastar::metrics::description("Counts a number of requests blocked due to memory pressure."))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("requests_shed", requests_shed,
seastar::metrics::description("Counts a number of requests shed due to overload."))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("filtered_rows_read_total", cql_stats.filtered_rows_read_total,
seastar::metrics::description("number of rows read during filtering operations"))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("filtered_rows_matched_total", cql_stats.filtered_rows_matched_total,
seastar::metrics::description("number of rows read and matched during filtering operations")),
seastar::metrics::make_counter("rcu_total", rcu_total,
seastar::metrics::description("total number of consumed read units, counted as half units"))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", wcu_total[wcu_types::PUT_ITEM],
seastar::metrics::description("total number of consumed write units, counted as half units"),{op("PutItem")})(alternator_label).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", wcu_total[wcu_types::DELETE_ITEM],
seastar::metrics::description("total number of consumed write units, counted as half units"),{op("DeleteItem")})(alternator_label).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", wcu_total[wcu_types::UPDATE_ITEM],
seastar::metrics::description("total number of consumed write units, counted as half units"),{op("UpdateItem")})(alternator_label).set_skip_when_empty(),
seastar::metrics::make_counter("wcu_total", wcu_total[wcu_types::INDEX],
seastar::metrics::description("total number of consumed write units, counted as half units"),{op("Index")})(alternator_label).set_skip_when_empty(),
seastar::metrics::make_total_operations("filtered_rows_dropped_total", [this] { return cql_stats.filtered_rows_read_total - cql_stats.filtered_rows_matched_total; },
seastar::metrics::description("number of rows read and dropped during filtering operations"))(alternator_label).set_skip_when_empty(),
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchWriteItem")},
api_operations.batch_write_item_batch_total)(alternator_label).set_skip_when_empty(),
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchGetItem")},
api_operations.batch_get_item_batch_total)(alternator_label).set_skip_when_empty(),
});
seastar::metrics::label expression_label("expression");
metrics.add_group(group_name, {
seastar::metrics::make_total_operations("expression_cache_evictions", stats.expression_cache.evictions,
seastar::metrics::description("Counts number of entries evicted from expressions cache"), labels).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("expression_cache_hits", stats.expression_cache.requests[stats::expression_types::UPDATE_EXPRESSION].hits,
seastar::metrics::description("Counts number of hits of cached expressions"), labels)(expression_label("UpdateExpression")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::UPDATE_EXPRESSION].misses,
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("UpdateExpression")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("expression_cache_hits", stats.expression_cache.requests[stats::expression_types::CONDITION_EXPRESSION].hits,
seastar::metrics::description("Counts number of hits of cached expressions"), labels)(expression_label("ConditionExpression")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::CONDITION_EXPRESSION].misses,
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ConditionExpression")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("expression_cache_hits", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].hits,
seastar::metrics::description("Counts number of hits of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty(),
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses,
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty()
});
// Only register the following metrics for the global metrics, not per-table
if (!has_table) {
metrics.add_group("alternator", {
seastar::metrics::make_counter("authentication_failures", stats.authentication_failures,
seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_counter("authorization_failures", stats.authorization_failures,
seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
});
}
}
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) {
register_metrics_with_optional_table(metrics, stats, "", "");
}
table_stats::table_stats(const sstring& ks, const sstring& table) {
_stats = make_lw_shared<stats>();
register_metrics_with_optional_table(_metrics, *_stats, ks, table);
}
}

View File

@@ -12,7 +12,6 @@
#include <seastar/core/metrics_registration.hh>
#include "utils/histogram.hh"
#include "utils/estimated_histogram.hh"
#include "cql3/stats.hh"
namespace alternator {
@@ -22,6 +21,7 @@ namespace alternator {
// visible by the metrics REST API, with the "alternator" prefix.
class stats {
public:
stats();
// Count of DynamoDB API operations by types
struct {
uint64_t batch_get_item = 0;
@@ -75,47 +75,7 @@ public:
utils::timed_rate_moving_average_summary_and_histogram batch_write_item_latency;
utils::timed_rate_moving_average_summary_and_histogram batch_get_item_latency;
utils::timed_rate_moving_average_summary_and_histogram get_records_latency;
utils::estimated_histogram batch_get_item_histogram{22}; // a histogram that covers the range 1 - 100
utils::estimated_histogram batch_write_item_histogram{22}; // a histogram that covers the range 1 - 100
} api_operations;
// Operation size metrics
struct {
// Item size statistics collected per table and aggregated per node.
// Each histogram covers the range 0 - 446. Resolves #25143.
// A size is the retrieved item's size.
utils::estimated_histogram get_item_op_size_kb{30};
// A size is the maximum of the new item's size and the old item's size.
utils::estimated_histogram put_item_op_size_kb{30};
// A size is the deleted item's size. If the deleted item's size is
// unknown (i.e. read-before-write wasn't necessary and it wasn't
// forced by a configuration option), it won't be recorded on the
// histogram.
utils::estimated_histogram delete_item_op_size_kb{30};
// A size is the maximum of existing item's size and the estimated size
// of the update. This will be changed to the maximum of the existing item's
// size and the new item's size in a subsequent PR.
utils::estimated_histogram update_item_op_size_kb{30};
// A size is the sum of the sizes of all items per table. This means
// that a single BatchGetItem / BatchWriteItem updates the histogram
// for each table that it has items in.
// The sizes are the retrieved items' sizes grouped per table.
utils::estimated_histogram batch_get_item_op_size_kb{30};
// The sizes are the the written items' sizes grouped per table.
utils::estimated_histogram batch_write_item_op_size_kb{30};
} operation_sizes;
// Count of authentication and authorization failures, counted if either
// alternator_enforce_authorization or alternator_warn_authorization are
// set to true. If both are false, no authentication or authorization
// checks are performed, so failures are not recognized or counted.
// "authentication" failure means the request was not signed with a valid
// user and key combination. "authorization" failure means the request was
// authenticated to a valid user - but this user did not have permissions
// to perform the operation (considering RBAC settings and the user's
// superuser status).
uint64_t authentication_failures = 0;
uint64_t authorization_failures = 0;
// Miscellaneous event counters
uint64_t total_operations = 0;
uint64_t unsupported_operations = 0;
@@ -124,7 +84,7 @@ public:
uint64_t shard_bounce_for_lwt = 0;
uint64_t requests_blocked_memory = 0;
uint64_t requests_shed = 0;
uint64_t rcu_half_units_total = 0;
uint64_t rcu_total = 0;
// wcu can results from put, update, delete and index
// Index related will be done on top of the operation it comes with
enum wcu_types {
@@ -138,33 +98,10 @@ public:
uint64_t wcu_total[NUM_TYPES] = {0};
// CQL-derived stats
cql3::cql_stats cql_stats;
// Enumeration of expression types only for stats
// if needed it can be extended e.g. per operation
enum expression_types {
UPDATE_EXPRESSION,
CONDITION_EXPRESSION,
PROJECTION_EXPRESSION,
NUM_EXPRESSION_TYPES
};
struct {
struct {
uint64_t hits = 0;
uint64_t misses = 0;
} requests[NUM_EXPRESSION_TYPES];
uint64_t evictions = 0;
} expression_cache;
};
struct table_stats {
table_stats(const sstring& ks, const sstring& table);
private:
// The metric_groups object holds this stat object's metrics registered
// as long as the stats object is alive.
seastar::metrics::metric_groups _metrics;
lw_shared_ptr<stats> _stats;
};
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats);
inline uint64_t bytes_to_kb_ceil(uint64_t bytes) {
return (bytes + 1023) / 1024;
}
}

View File

@@ -13,6 +13,7 @@
#include <seastar/json/formatter.hh>
#include "auth/permission.hh"
#include "db/config.hh"
#include "cdc/log.hh"
@@ -31,7 +32,6 @@
#include "executor.hh"
#include "data_dictionary/data_dictionary.hh"
#include "utils/rjson.hh"
/**
* Base template type to implement rapidjson::internal::TypeHelper<...>:s
@@ -126,7 +126,7 @@ public:
}
};
} // namespace alternator
}
template<typename ValueType>
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_arn>
@@ -217,7 +217,7 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
rjson::add(ret, "LastEvaluatedStreamArn", *last);
}
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
}
struct shard_id {
@@ -296,7 +296,7 @@ sequence_number::sequence_number(std::string_view v)
}())
{}
} // namespace alternator
}
template<typename ValueType>
struct rapidjson::internal::TypeHelper<ValueType, alternator::shard_id>
@@ -356,7 +356,7 @@ static stream_view_type cdc_options_to_steam_view_type(const cdc::options& opts)
return type;
}
} // namespace alternator
}
template<typename ValueType>
struct rapidjson::internal::TypeHelper<ValueType, alternator::stream_view_type>
@@ -475,10 +475,10 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
} else {
status = "ENABLED";
}
}
}
auto ttl = std::chrono::seconds(opts.ttl());
rjson::add(stream_desc, "StreamStatus", rjson::from_string(status));
stream_view_type type = cdc_options_to_steam_view_type(opts);
@@ -491,7 +491,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
if (!opts.enabled()) {
rjson::add(ret, "StreamDescription", std::move(stream_desc));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
}
// TODO: label
@@ -617,7 +617,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
rjson::add(stream_desc, "Shards", std::move(shards));
rjson::add(ret, "StreamDescription", std::move(stream_desc));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
});
}
@@ -714,7 +714,7 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
auto type = rjson::get<shard_iterator_type>(request, "ShardIteratorType");
auto seq_num = rjson::get_opt<sequence_number>(request, "SequenceNumber");
if (type < shard_iterator_type::TRIM_HORIZON && !seq_num) {
throw api_error::validation("Missing required parameter \"SequenceNumber\"");
}
@@ -724,7 +724,7 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
auto stream_arn = rjson::get<alternator::stream_arn>(request, "StreamArn");
auto db = _proxy.data_dictionary();
schema_ptr schema = nullptr;
std::optional<shard_id> sid;
@@ -770,7 +770,7 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
auto ret = rjson::empty_object();
rjson::add(ret, "ShardIterator", iter);
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
}
struct event_id {
@@ -789,7 +789,7 @@ struct event_id {
return os;
}
};
} // namespace alternator
}
template<typename ValueType>
struct rapidjson::internal::TypeHelper<ValueType, alternator::event_id>
@@ -808,9 +808,6 @@ future<executor::request_return_type> executor::get_records(client_state& client
if (limit < 1) {
throw api_error::validation("Limit must be 1 or more");
}
if (limit > 1000) {
throw api_error::validation("Limit must be less than or equal to 1000");
}
auto db = _proxy.data_dictionary();
schema_ptr schema, base;
@@ -827,7 +824,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
partition_key pk = iter.shard.id.to_partition_key(*schema);
@@ -871,12 +868,10 @@ future<executor::request_return_type> executor::get_records(client_state& client
std::transform(pks.begin(), pks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
auto regular_column_start_idx = columns.size();
auto regular_column_filter = std::views::filter([](const column_definition& cdef) { return cdef.name() == op_column_name || cdef.name() == eor_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); });
std::ranges::transform(schema->regular_columns() | regular_column_filter, std::back_inserter(columns), [](auto& c) { return &c; });
auto regular_columns = std::ranges::subrange(columns.begin() + regular_column_start_idx, columns.end())
| std::views::transform(&column_definition::id)
auto regular_columns = schema->regular_columns()
| std::views::filter([](const column_definition& cdef) { return cdef.name() == op_column_name || cdef.name() == eor_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); })
| std::views::transform([&] (const column_definition& cdef) { columns.emplace_back(&cdef); return cdef.id; })
| std::ranges::to<query::column_id_vector>()
;
@@ -927,7 +922,6 @@ future<executor::request_return_type> executor::get_records(client_state& client
std::optional<utils::UUID> timestamp;
auto dynamodb = rjson::empty_object();
auto record = rjson::empty_object();
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
using op_utype = std::underlying_type_t<cdc::operation>;
@@ -937,10 +931,9 @@ future<executor::request_return_type> executor::get_records(client_state& client
dynamodb = rjson::empty_object();
}
if (!record.ObjectEmpty()) {
rjson::add(record, "awsRegion", rjson::from_string(dc_name));
// TODO: awsRegion?
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
rjson::add(record, "eventSource", "scylladb:alternator");
rjson::add(record, "eventVersion", "1.1");
rjson::push_back(records, std::move(record));
record = rjson::empty_object();
--limit;
@@ -959,7 +952,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
rjson::add(dynamodb, "StreamViewType", type);
// TODO: SizeBytes
//TODO: SizeInBytes
}
/**
@@ -999,16 +992,6 @@ future<executor::request_return_type> executor::get_records(client_state& client
case cdc::operation::insert:
rjson::add(record, "eventName", "INSERT");
break;
case cdc::operation::service_row_delete:
case cdc::operation::service_partition_delete:
{
auto user_identity = rjson::empty_object();
rjson::add(user_identity, "Type", "Service");
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
rjson::add(record, "userIdentity", std::move(user_identity));
rjson::add(record, "eventName", "REMOVE");
break;
}
default:
rjson::add(record, "eventName", "REMOVE");
break;
@@ -1035,7 +1018,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
// will notice end end of shard and not return NextShardIterator.
rjson::add(ret, "NextShardIterator", next_iter);
_stats.api_operations.get_records_latency.mark(std::chrono::steady_clock::now() - start_time);
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
}
// ugh. figure out if we are and end-of-shard
@@ -1061,19 +1044,21 @@ future<executor::request_return_type> executor::get_records(client_state& client
if (is_big(ret)) {
return make_ready_future<executor::request_return_type>(make_streamed(std::move(ret)));
}
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(ret)));
});
});
}
bool executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {
void executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {
auto stream_enabled = rjson::find(stream_specification, "StreamEnabled");
if (!stream_enabled || !stream_enabled->IsBool()) {
throw api_error::validation("StreamSpecification needs boolean StreamEnabled");
}
if (stream_enabled->GetBool()) {
if (!sp.features().alternator_streams) {
auto db = sp.data_dictionary();
if (!db.features().alternator_streams) {
throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster.");
}
@@ -1098,12 +1083,10 @@ bool executor::add_stream_options(const rjson::value& stream_specification, sche
break;
}
builder.with_cdc_options(opts);
return true;
} else {
cdc::options opts;
opts.enabled(false);
builder.with_cdc_options(opts);
return false;
}
}
@@ -1132,4 +1115,4 @@ void executor::supplement_table_stream_info(rjson::value& descr, const schema& s
}
}
} // namespace alternator
}

View File

@@ -17,7 +17,6 @@
#include <seastar/core/lowres_clock.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include "cdc/log.hh"
#include "exceptions/exceptions.hh"
#include "gms/gossiper.hh"
#include "gms/inet_address.hh"
@@ -28,7 +27,7 @@
#include "replica/database.hh"
#include "service/client_state.hh"
#include "service_permit.hh"
#include "mutation/timestamp.hh"
#include "timestamp.hh"
#include "service/storage_proxy.hh"
#include "service/pager/paging_state.hh"
#include "service/pager/query_pagers.hh"
@@ -57,18 +56,18 @@ static logging::logger tlogger("alternator_ttl");
namespace alternator {
// We write the expiration-time attribute enabled on a table in a
// We write the expiration-time attribute enabled on a table using a
// tag TTL_TAG_KEY.
// Currently, the *value* of this tag is simply the name of the attribute,
// and the expiration scanner interprets it as an Alternator attribute name -
// It can refer to a real column or if that doesn't exist, to a member of
// the ":attrs" map column. Although this is designed for Alternator, it may
// be good enough for CQL as well (there, the ":attrs" column won't exist).
extern const sstring TTL_TAG_KEY;
static const sstring TTL_TAG_KEY("system:ttl_attribute");
future<executor::request_return_type> executor::update_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
_stats.api_operations.update_time_to_live++;
if (!_proxy.features().alternator_ttl) {
if (!_proxy.data_dictionary().features().alternator_ttl) {
co_return api_error::unknown_operation("UpdateTimeToLive not yet supported. Experimental support is available if the 'alternator-ttl' experimental feature is enabled on all nodes.");
}
@@ -82,6 +81,11 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
co_return api_error::validation("UpdateTimeToLive requires boolean Enabled");
}
bool enabled = v->GetBool();
// Alternator TTL doesn't yet work when the table uses tablets (#16567)
if (enabled && _proxy.local_db().find_keyspace(schema->ks_name()).get_replication_strategy().uses_tablets()) {
co_return api_error::validation("TTL not yet supported on a table using tablets (issue #16567). "
"Create a table with the tag 'experimental:initial_tablets' set to 'none' to use vnodes.");
}
v = rjson::find(*spec, "AttributeName");
if (!v || !v->IsString()) {
co_return api_error::validation("UpdateTimeToLive requires string AttributeName");
@@ -95,7 +99,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {
@@ -119,7 +123,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
// basically identical to the request's
rjson::value response = rjson::empty_object();
rjson::add(response, "TimeToLiveSpecification", std::move(*spec));
co_return rjson::print(std::move(response));
co_return make_jsonable(std::move(response));
}
future<executor::request_return_type> executor::describe_time_to_live(client_state& client_state, service_permit permit, rjson::value request) {
@@ -136,7 +140,7 @@ future<executor::request_return_type> executor::describe_time_to_live(client_sta
}
rjson::value response = rjson::empty_object();
rjson::add(response, "TimeToLiveDescription", std::move(desc));
co_return rjson::print(std::move(response));
co_return make_jsonable(std::move(response));
}
// expiration_service is a sharded service responsible for cleaning up expired
@@ -287,18 +291,13 @@ static future<> expire_item(service::storage_proxy& proxy,
auto ck = clustering_key::from_exploded(exploded_ck);
m.partition().clustered_row(*schema, ck).apply(tombstone(ts, gc_clock::now()));
}
utils::chunked_vector<mutation> mutations;
std::vector<mutation> mutations;
mutations.push_back(std::move(m));
return proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(), // FIXME - which timeout?
qs.get_trace_state(), qs.get_permit(),
db::allow_per_partition_rate_limit::no,
false,
cdc::per_request_options{
.is_system_originated = true,
}
);
db::allow_per_partition_rate_limit::no);
}
static size_t random_offset(size_t min, size_t max) {
@@ -316,10 +315,8 @@ static size_t random_offset(size_t min, size_t max) {
// this range's primary node is down. For this we need to return not just
// a list of this node's secondary ranges - but also the primary owner of
// each of those ranges.
//
// The function is to be used with vnodes only
static future<std::vector<std::pair<dht::token_range, locator::host_id>>> get_secondary_ranges(
const locator::effective_replication_map* erm,
const locator::effective_replication_map_ptr& erm,
locator::host_id ep) {
const auto& tm = *erm->get_token_metadata_ptr();
const auto& sorted_tokens = tm.sorted_tokens();
@@ -330,7 +327,6 @@ static future<std::vector<std::pair<dht::token_range, locator::host_id>>> get_se
auto prev_tok = sorted_tokens.back();
for (const auto& tok : sorted_tokens) {
co_await coroutine::maybe_yield();
// FIXME: pass is_vnode=true to get_natural_replicas since the token is in tm.sorted_tokens()
host_id_vector_replica_set eps = erm->get_natural_replicas(tok);
if (eps.size() <= 1 || eps[1] != ep) {
prev_tok = tok;
@@ -400,7 +396,7 @@ class ranges_holder_primary {
dht::token_range_vector _token_ranges;
public:
explicit ranges_holder_primary(dht::token_range_vector token_ranges) : _token_ranges(std::move(token_ranges)) {}
static future<ranges_holder_primary> make(const locator::vnode_effective_replication_map* erm, locator::host_id ep) {
static future<ranges_holder_primary> make(const locator::vnode_effective_replication_map_ptr& erm, locator::host_id ep) {
co_return ranges_holder_primary(co_await erm->get_primary_ranges(ep));
}
std::size_t size() const { return _token_ranges.size(); }
@@ -420,7 +416,7 @@ public:
explicit ranges_holder_secondary(std::vector<std::pair<dht::token_range, locator::host_id>> token_ranges, const gms::gossiper& g)
: _token_ranges(std::move(token_ranges))
, _gossiper(g) {}
static future<ranges_holder_secondary> make(const locator::vnode_effective_replication_map* erm, locator::host_id ep, const gms::gossiper& g) {
static future<ranges_holder_secondary> make(const locator::effective_replication_map_ptr& erm, locator::host_id ep, const gms::gossiper& g) {
co_return ranges_holder_secondary(co_await get_secondary_ranges(erm, ep), g);
}
std::size_t size() const { return _token_ranges.size(); }
@@ -433,8 +429,6 @@ public:
}
};
// The token_ranges_owned_by_this_shard class is only used for vnodes, where the vnodes give a partition range for the entire node
// and such range still needs to be divided between the shards.
template<class primary_or_secondary_t>
class token_ranges_owned_by_this_shard {
schema_ptr _s;
@@ -528,7 +522,7 @@ struct scan_ranges_context {
// should be possible (and a must for issue #7751!).
lw_shared_ptr<service::pager::paging_state> paging_state = nullptr;
auto regular_columns =
s->regular_columns() | std::views::transform(&column_definition::id)
s->regular_columns() | std::views::transform([] (const column_definition& cdef) { return cdef.id; })
| std::ranges::to<query::column_id_vector>();
selection = cql3::selection::selection::wildcard(s);
query::partition_slice::option_set opts = selection->get_query_options();
@@ -661,17 +655,6 @@ static future<> scan_table_ranges(
}
}
static future<> scan_tablet(locator::tablet_id tablet, service::storage_proxy& proxy, abort_source& abort_source, named_semaphore& page_sem,
expiration_service::stats& expiration_stats, const scan_ranges_context& scan_ctx, const locator::tablet_map& tablet_map) {
auto tablet_token_range = tablet_map.get_token_range(tablet);
dht::ring_position tablet_start(tablet_token_range.start()->value(), dht::ring_position::token_bound::start),
tablet_end(tablet_token_range.end()->value(), dht::ring_position::token_bound::end);
auto partition_range = dht::partition_range::make(std::move(tablet_start), std::move(tablet_end));
// Note that because of issue #9167 we need to run a separate query on each partition range, and can't pass
// several of them into one partition_range_vector that is passed to scan_table_ranges().
return scan_table_ranges(proxy, scan_ctx, {partition_range}, abort_source, page_sem, expiration_stats);
}
// scan_table() scans, in one table, data "owned" by this shard, looking for
// expired items and deleting them.
// We consider each node to "own" its primary token ranges, i.e., the tokens
@@ -747,69 +730,34 @@ static future<bool> scan_table(
expiration_stats.scan_table++;
// FIXME: need to pace the scan, not do it all at once.
scan_ranges_context scan_ctx{s, proxy, std::move(column_name), std::move(member)};
if (s->table().uses_tablets()) {
locator::effective_replication_map_ptr erm = s->table().get_effective_replication_map();
auto my_host_id = erm->get_topology().my_host_id();
const auto &tablet_map = erm->get_token_metadata().tablets().get_tablet_map(s->id());
for (std::optional tablet = tablet_map.first_tablet(); tablet; tablet = tablet_map.next_tablet(*tablet)) {
auto tablet_primary_replica = tablet_map.get_primary_replica(*tablet, erm->get_topology());
// check if this is the primary replica for the current tablet
if (tablet_primary_replica.host == my_host_id && tablet_primary_replica.shard == this_shard_id()) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);
} else if(erm->get_replication_factor() > 1) {
// Check if this is the secondary replica for the current tablet
// and if the primary replica is down which means we will take over this work.
// If each node only scans its own primary ranges, then when any node is
// down part of the token range will not get scanned. This can be viewed
// as acceptable (when the comes back online, it will resume its scan),
// but as noted in issue #9787, we can allow more prompt expiration
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica
if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) {
if (!gossiper.is_alive(tablet_primary_replica.host)) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);
}
}
}
}
} else { // VNodes
locator::static_effective_replication_map_ptr ermp =
db.real_database().find_keyspace(s->ks_name()).get_static_effective_replication_map();
auto* erm = ermp->maybe_as_vnode_effective_replication_map();
if (!erm) {
on_internal_error(tlogger, format("Keyspace {} is local", s->ks_name()));
}
auto my_host_id = erm->get_topology().my_host_id();
token_ranges_owned_by_this_shard my_ranges(s, co_await ranges_holder_primary::make(erm, my_host_id));
while (std::optional<dht::partition_range> range = my_ranges.next_partition_range()) {
// Note that because of issue #9167 we need to run a separate
// query on each partition range, and can't pass several of
// them into one partition_range_vector.
dht::partition_range_vector partition_ranges;
partition_ranges.push_back(std::move(*range));
// FIXME: if scanning a single range fails, including network errors,
// we fail the entire scan (and rescan from the beginning). Need to
// reconsider this. Saving the scan position might be a good enough
// solution for this problem.
co_await scan_table_ranges(proxy, scan_ctx, std::move(partition_ranges), abort_source, page_sem, expiration_stats);
}
// If each node only scans its own primary ranges, then when any node is
// down part of the token range will not get scanned. This can be viewed
// as acceptable (when the comes back online, it will resume its scan),
// but as noted in issue #9787, we can allow more prompt expiration
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
token_ranges_owned_by_this_shard my_secondary_ranges(s, co_await ranges_holder_secondary::make(erm, my_host_id, gossiper));
while (std::optional<dht::partition_range> range = my_secondary_ranges.next_partition_range()) {
expiration_stats.secondary_ranges_scanned++;
dht::partition_range_vector partition_ranges;
partition_ranges.push_back(std::move(*range));
co_await scan_table_ranges(proxy, scan_ctx, std::move(partition_ranges), abort_source, page_sem, expiration_stats);
}
auto erm = db.real_database().find_keyspace(s->ks_name()).get_vnode_effective_replication_map();
auto my_host_id = erm->get_topology().my_host_id();
token_ranges_owned_by_this_shard my_ranges(s, co_await ranges_holder_primary::make(erm, my_host_id));
while (std::optional<dht::partition_range> range = my_ranges.next_partition_range()) {
// Note that because of issue #9167 we need to run a separate
// query on each partition range, and can't pass several of
// them into one partition_range_vector.
dht::partition_range_vector partition_ranges;
partition_ranges.push_back(std::move(*range));
// FIXME: if scanning a single range fails, including network errors,
// we fail the entire scan (and rescan from the beginning). Need to
// reconsider this. Saving the scan position might be a good enough
// solution for this problem.
co_await scan_table_ranges(proxy, scan_ctx, std::move(partition_ranges), abort_source, page_sem, expiration_stats);
}
// If each node only scans its own primary ranges, then when any node is
// down part of the token range will not get scanned. This can be viewed
// as acceptable (when the comes back online, it will resume its scan),
// but as noted in issue #9787, we can allow more prompt expiration
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
token_ranges_owned_by_this_shard my_secondary_ranges(s, co_await ranges_holder_secondary::make(erm, my_host_id, gossiper));
while (std::optional<dht::partition_range> range = my_secondary_ranges.next_partition_range()) {
expiration_stats.secondary_ranges_scanned++;
dht::partition_range_vector partition_ranges;
partition_ranges.push_back(std::move(*range));
co_await scan_table_ranges(proxy, scan_ctx, std::move(partition_ranges), abort_source, page_sem, expiration_stats);
}
co_return true;
}

View File

@@ -106,8 +106,5 @@ target_link_libraries(api
wasmtime_bindings
absl::headers)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(api REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers api
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -246,24 +246,6 @@
}
}
},
"sstableinfo":{
"id":"sstableinfo",
"description":"Compacted sstable information",
"properties":{
"generation":{
"type": "string",
"description":"Generation of the sstable"
},
"origin":{
"type":"string",
"description":"Origin of the sstable"
},
"size":{
"type":"long",
"description":"Size of the sstable"
}
}
},
"compaction_info" :{
"id": "compaction_info",
"description":"A key value mapping",
@@ -345,10 +327,6 @@
"type":"string",
"description":"The UUID"
},
"shard_id":{
"type":"int",
"description":"The shard id the compaction was executed on"
},
"cf":{
"type":"string",
"description":"The column family name"
@@ -357,17 +335,9 @@
"type":"string",
"description":"The keyspace name"
},
"compaction_type":{
"type":"string",
"description":"Type of compaction"
},
"started_at":{
"type":"long",
"description":"The time compaction started"
},
"compacted_at":{
"type":"long",
"description":"The time compaction completed"
"description":"The time of compaction"
},
"bytes_in":{
"type":"long",
@@ -383,32 +353,6 @@
"type":"row_merged"
},
"description":"The merged rows"
},
"sstables_in": {
"type":"array",
"items":{
"type":"sstableinfo"
},
"description":"List of input sstables for compaction"
},
"sstables_out": {
"type":"array",
"items":{
"type":"sstableinfo"
},
"description":"List of output sstables from compaction"
},
"total_tombstone_purge_attempt":{
"type":"long",
"description":"Total number of tombstone purge attempts"
},
"total_tombstone_purge_failure_due_to_overlapping_with_memtable":{
"type":"long",
"description":"Number of tombstone purge failures due to data overlapping with memtables"
},
"total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable":{
"type":"long",
"description":"Number of tombstone purge failures due to data overlapping with non-compacting sstables"
}
}
}

View File

@@ -136,6 +136,14 @@
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"unsafe",
"description":"Set to True to perform an unsafe assassination",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}

View File

@@ -220,25 +220,6 @@
}
]
},
{
"path":"/storage_service/nodes/excluded",
"operations":[
{
"method":"GET",
"summary":"Retrieve host ids of nodes which are marked as excluded",
"type":"array",
"items":{
"type":"string"
},
"nickname":"get_excluded_nodes",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/storage_service/nodes/joining",
"operations":[
@@ -613,50 +594,6 @@
}
]
},
{
"path": "/storage_service/natural_endpoints/v2/{keyspace}",
"operations": [
{
"method": "GET",
"summary":"This method returns the N endpoints that are responsible for storing the specified key i.e for replication. the endpoint responsible for this key",
"type": "array",
"items": {
"type": "string"
},
"nickname": "get_natural_endpoints_v2",
"produces": [
"application/json"
],
"parameters": [
{
"name": "keyspace",
"description": "The keyspace to query about.",
"required": true,
"allowMultiple": false,
"type": "string",
"paramType": "path"
},
{
"name": "cf",
"description": "Column family name.",
"required": true,
"allowMultiple": false,
"type": "string",
"paramType": "query"
},
{
"name": "key_component",
"description": "Each component of the key for which we need to find the endpoint (e.g. ?key_component=part1&key_component=part2).",
"required": true,
"allowMultiple": true,
"type": "string",
"paramType": "query"
}
]
}
]
},
{
"path":"/storage_service/cdc_streams_check_and_repair",
"operations":[
@@ -961,14 +898,6 @@
"type":"string",
"paramType":"query",
"enum": ["all", "dc", "rack", "node"]
},
{
"name":"primary_replica_only",
"description":"Load the sstables and stream to the primary replica node within the scope, if one is specified. If not, stream to the global primary replica.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
@@ -1055,7 +984,7 @@
]
},
{
"path":"/storage_service/cleanup_all/",
"path":"/storage_service/cleanup_all",
"operations":[
{
"method":"POST",
@@ -1065,30 +994,6 @@
"produces":[
"application/json"
],
"parameters":[
{
"name":"global",
"description":"true if cleanup of entire cluster is requested",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/mark_node_as_clean",
"operations":[
{
"method":"POST",
"summary":"Mark the node as clean. After that the node will not be considered as needing cleanup during automatic cleanup which is triggered by some topology operations",
"type":"void",
"nickname":"reset_cleanup_needed",
"produces":[
"application/json"
],
"parameters":[]
}
]
@@ -1195,14 +1100,6 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name": "drop_unfixable_sstables",
"description": "When set to true, drop unfixable sstables. Applies only to scrub mode SEGREGATE.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}
@@ -1622,30 +1519,6 @@
}
]
},
{
"path":"/storage_service/exclude_node",
"operations":[
{
"method":"POST",
"summary":"Marks the node as permanently down (excluded).",
"type":"void",
"nickname":"exclude_node",
"produces":[
"application/json"
],
"parameters":[
{
"name":"hosts",
"description":"Comma-separated list of host ids to exclude",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/removal_status",
"operations":[
@@ -2271,31 +2144,6 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"skip_cleanup",
"description":"Don't cleanup keys from loaded sstables. Invalid if load_and_stream is true",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"skip_reshape",
"description":"Don't reshape the loaded sstables. Invalid if load_and_stream is true",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"scope",
"description":"Defines the set of nodes to which mutations can be streamed",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query",
"enum": ["all", "dc", "rack", "node"]
}
]
}
@@ -3048,14 +2896,6 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
@@ -3187,73 +3027,6 @@
}
]
},
{
"path":"/storage_service/retrain_dict",
"operations":[
{
"method":"POST",
"summary":"Retrain the SSTable compression dictionary for the target table.",
"type":"void",
"nickname":"retrain_dict",
"produces":[
"application/json"
],
"parameters":[
{
"name":"keyspace",
"description":"Name of the keyspace containing the target table.",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"cf",
"description":"Name of the target table.",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/estimate_compression_ratios",
"operations":[
{
"method":"GET",
"summary":"Compute an estimated compression ratio for SSTables of the given table, for various compression configurations.",
"type":"array",
"items":{
"type":"compression_config_result"
},
"nickname":"estimate_compression_ratios",
"produces":[
"application/json"
],
"parameters":[
{
"name":"keyspace",
"description":"Name of the keyspace containing the target table.",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"cf",
"description":"Name of the target table.",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/raft_topology/reload",
"operations":[
@@ -3296,54 +3069,6 @@
]
}
]
},
{
"path":"/storage_service/raft_topology/cmd_rpc_status",
"operations":[
{
"method":"GET",
"summary":"Get information about currently running topology cmd rpc",
"type":"string",
"nickname":"raft_topology_get_cmd_status",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/storage_service/drop_quarantined_sstables",
"operations":[
{
"method":"POST",
"summary":"Drops all quarantined sstables in all keyspaces or specified keyspace and tables",
"type":"void",
"nickname":"drop_quarantined_sstables",
"produces":[
"application/json"
],
"parameters":[
{
"name":"keyspace",
"description":"The keyspace name to drop quarantined sstables from.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"tables",
"description":"Comma-separated table names to drop quarantined sstables from.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
}
],
"models":{
@@ -3480,11 +3205,11 @@
"properties":{
"start_token":{
"type":"string",
"description":"The range start token (exclusive)"
"description":"The range start token"
},
"end_token":{
"type":"string",
"description":"The range end token (inclusive)"
"description":"The range start token"
},
"endpoints":{
"type":"array",
@@ -3557,7 +3282,7 @@
"version":{
"type":"string",
"enum":[
"ka", "la", "mc", "md", "me", "ms"
"ka", "la", "mc", "md", "me"
],
"description":"SSTable version"
},
@@ -3603,32 +3328,6 @@
"type":"string"
}
}
},
"compression_config_result":{
"id":"compression_config_result",
"description":"Compression ratio estimation result for one config",
"properties":{
"level":{
"type":"long",
"description":"The used value of `compression_level`"
},
"chunk_length_in_kb":{
"type":"long",
"description":"The used value of `chunk_length_in_kb`"
},
"dict":{
"type":"string",
"description":"The used dictionary: `none`, `past` (== current), or `future`"
},
"sstable_compression":{
"type":"string",
"description":"The used compressor name (aka `sstable_compression`)"
},
"ratio":{
"type":"float",
"description":"The resulting compression ratio (estimated on a random sample of files)"
}
}
}
}
}

View File

@@ -447,7 +447,7 @@
"description":"The number of units completed so far"
},
"children_ids":{
"type":"chunked_array",
"type":"array",
"items":{
"type":"task_identity"
},

View File

@@ -42,14 +42,6 @@
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"consider_only_existing_data",
"description":"Set to \"true\" to flush all memtables and force tombstone garbage collection to check only the sstables being compacted (false by default). The memtable, commitlog and other uncompacted sstables will not be checked during tombstone garbage collection.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}

View File

@@ -137,6 +137,14 @@ future<> unset_load_meter(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_load_meter(ctx, r); });
}
future<> set_format_selector(http_context& ctx, db::sstables_format_selector& sel) {
return ctx.http_server.set_routes([&ctx, &sel] (routes& r) { set_format_selector(ctx, r, sel); });
}
future<> unset_format_selector(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_format_selector(ctx, r); });
}
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader) {
return ctx.http_server.set_routes([&ctx, &sst_loader] (routes& r) { set_sstables_loader(ctx, r, sst_loader); });
}
@@ -216,22 +224,15 @@ future<> unset_server_gossip(http_context& ctx) {
});
}
future<> set_server_column_family(http_context& ctx, sharded<replica::database>& db) {
co_await register_api(ctx, "column_family",
"The column family API", [&db] (http_context& ctx, routes& r) {
set_column_family(ctx, r, db);
});
co_await register_api(ctx, "cache_service",
"The cache service API", [&db] (http_context& ctx, routes& r) {
set_cache_service(ctx, db, r);
future<> set_server_column_family(http_context& ctx, sharded<db::system_keyspace>& sys_ks) {
return register_api(ctx, "column_family",
"The column family API", [&sys_ks] (http_context& ctx, routes& r) {
set_column_family(ctx, r, sys_ks);
});
}
future<> unset_server_column_family(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) {
unset_column_family(ctx, r);
unset_cache_service(ctx, r);
});
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_column_family(ctx, r); });
}
future<> set_server_messaging_service(http_context& ctx, sharded<netw::messaging_service>& ms) {
@@ -263,6 +264,15 @@ future<> unset_server_stream_manager(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_stream_manager(ctx, r); });
}
future<> set_server_cache(http_context& ctx) {
return register_api(ctx, "cache_service",
"The cache service API", set_cache_service);
}
future<> unset_server_cache(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_cache_service(ctx, r); });
}
future<> set_hinted_handoff(http_context& ctx, sharded<service::storage_proxy>& proxy, sharded<gms::gossiper>& g) {
return register_api(ctx, "hinted_handoff",
"The hinted handoff API", [&proxy, &g] (http_context& ctx, routes& r) {
@@ -274,7 +284,7 @@ future<> unset_hinted_handoff(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_hinted_handoff(ctx, r); });
}
future<> set_server_compaction_manager(http_context& ctx, sharded<compaction::compaction_manager>& cm) {
future<> set_server_compaction_manager(http_context& ctx, sharded<compaction_manager>& cm) {
return register_api(ctx, "compaction_manager", "The Compaction manager API", [&cm] (http_context& ctx, routes& r) {
set_compaction_manager(ctx, r, cm);
});
@@ -381,5 +391,32 @@ future<> unset_server_raft(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_raft(ctx, r); });
}
void req_params::process(const request& req) {
// Process mandatory parameters
for (auto& [name, ent] : params) {
if (!ent.is_mandatory) {
continue;
}
try {
ent.value = req.get_path_param(name);
} catch (std::out_of_range&) {
throw httpd::bad_param_exception(fmt::format("Mandatory parameter '{}' was not provided", name));
}
}
// Process optional parameters
for (auto& [name, value] : req.query_parameters) {
try {
auto& ent = params.at(name);
if (ent.is_mandatory) {
throw httpd::bad_param_exception(fmt::format("Parameter '{}' is expected to be provided as part of the request url", name));
}
ent.value = value;
} catch (std::out_of_range&) {
throw httpd::bad_param_exception(fmt::format("Unsupported optional parameter '{}'", name));
}
}
}
}

View File

@@ -23,6 +23,17 @@
namespace api {
template<class T>
std::vector<sstring> container_to_vec(const T& container) {
std::vector<sstring> res;
res.reserve(std::size(container));
for (const auto& i : container) {
res.push_back(fmt::to_string(i));
}
return res;
}
template<class T>
std::vector<T> map_to_key_value(const std::map<sstring, sstring>& map) {
std::vector<T> res;
@@ -56,6 +67,17 @@ T map_sum(T&& dest, const S& src) {
return std::move(dest);
}
template <typename MAP>
std::vector<sstring> map_keys(const MAP& map) {
std::vector<sstring> res;
res.reserve(std::size(map));
for (const auto& i : map) {
res.push_back(fmt::to_string(i.first));
}
return res;
}
/**
* General sstring splitting function
*/
@@ -73,7 +95,7 @@ inline std::vector<sstring> split(const sstring& text, const char* separator) {
*
*/
template<class T, class F, class V>
future<json::json_return_type> sum_stats(sharded<T>& d, V F::*f) {
future<json::json_return_type> sum_stats(distributed<T>& d, V F::*f) {
return d.map_reduce0([f](const T& p) {return p.get_stats().*f;}, 0,
std::plus<V>()).then([](V val) {
return make_ready_future<json::json_return_type>(val);
@@ -106,7 +128,7 @@ httpd::utils_json::rate_moving_average_and_histogram timer_to_json(const utils::
}
template<class T, class F>
future<json::json_return_type> sum_histogram_stats(sharded<T>& d, utils::timed_rate_moving_average_and_histogram F::*f) {
future<json::json_return_type> sum_histogram_stats(distributed<T>& d, utils::timed_rate_moving_average_and_histogram F::*f) {
return d.map_reduce0([f](const T& p) {return (p.get_stats().*f).hist;}, utils::ihistogram(),
std::plus<utils::ihistogram>()).then([](const utils::ihistogram& val) {
@@ -115,7 +137,7 @@ future<json::json_return_type> sum_histogram_stats(sharded<T>& d, utils::timed_
}
template<class T, class F>
future<json::json_return_type> sum_timer_stats(sharded<T>& d, utils::timed_rate_moving_average_and_histogram F::*f) {
future<json::json_return_type> sum_timer_stats(distributed<T>& d, utils::timed_rate_moving_average_and_histogram F::*f) {
return d.map_reduce0([f](const T& p) {return (p.get_stats().*f).rate();}, utils::rate_moving_average_and_histogram(),
std::plus<utils::rate_moving_average_and_histogram>()).then([](const utils::rate_moving_average_and_histogram& val) {
@@ -124,7 +146,7 @@ future<json::json_return_type> sum_timer_stats(sharded<T>& d, utils::timed_rate
}
template<class T, class F>
future<json::json_return_type> sum_timer_stats(sharded<T>& d, utils::timed_rate_moving_average_summary_and_histogram F::*f) {
future<json::json_return_type> sum_timer_stats(distributed<T>& d, utils::timed_rate_moving_average_summary_and_histogram F::*f) {
return d.map_reduce0([f](const T& p) {return (p.get_stats().*f).rate();}, utils::rate_moving_average_and_histogram(),
std::plus<utils::rate_moving_average_and_histogram>()).then([](const utils::rate_moving_average_and_histogram& val) {
return make_ready_future<json::json_return_type>(timer_to_json(val));
@@ -230,6 +252,67 @@ public:
operator T() const { return value; }
};
using mandatory = bool_class<struct mandatory_tag>;
class req_params {
public:
struct def {
std::optional<sstring> value;
mandatory is_mandatory = mandatory::no;
def(std::optional<sstring> value_ = std::nullopt, mandatory is_mandatory_ = mandatory::no)
: value(std::move(value_))
, is_mandatory(is_mandatory_)
{ }
def(mandatory is_mandatory_)
: is_mandatory(is_mandatory_)
{ }
};
private:
std::unordered_map<sstring, def> params;
public:
req_params(std::initializer_list<std::pair<sstring, def>> l) {
for (const auto& [name, ent] : l) {
add(std::move(name), std::move(ent));
}
}
void add(sstring name, def ent) {
params.emplace(std::move(name), std::move(ent));
}
void process(const request& req);
const std::optional<sstring>& get(const char* name) const {
return params.at(name).value;
}
template <typename T = sstring>
const std::optional<T> get_as(const char* name) const {
return get(name);
}
template <typename T = sstring>
requires std::same_as<T, bool>
const std::optional<bool> get_as(const char* name) const {
auto value = get(name);
if (!value) {
return std::nullopt;
}
std::transform(value->begin(), value->end(), value->begin(), ::tolower);
if (value == "true" || value == "yes" || value == "1") {
return true;
}
if (value == "false" || value == "no" || value == "0") {
return false;
}
throw boost::bad_lexical_cast{};
}
};
httpd::utils_json::estimated_histogram time_to_json_histogram(const utils::time_estimated_histogram& val);
}

View File

@@ -18,9 +18,7 @@
using request = http::request;
using reply = http::reply;
namespace compaction {
class compaction_manager;
}
namespace service {
@@ -58,6 +56,7 @@ class sstables_format_selector;
namespace view {
class view_builder;
}
class system_keyspace;
}
namespace netw { class messaging_service; }
class repair_service;
@@ -84,9 +83,9 @@ struct http_context {
sstring api_dir;
sstring api_doc;
httpd::http_server_control http_server;
sharded<replica::database>& db;
distributed<replica::database>& db;
http_context(sharded<replica::database>& _db)
http_context(distributed<replica::database>& _db)
: db(_db)
{
}
@@ -117,7 +116,7 @@ future<> set_server_token_metadata(http_context& ctx, sharded<locator::shared_to
future<> unset_server_token_metadata(http_context& ctx);
future<> set_server_gossip(http_context& ctx, sharded<gms::gossiper>& g);
future<> unset_server_gossip(http_context& ctx);
future<> set_server_column_family(http_context& ctx, sharded<replica::database>& db);
future<> set_server_column_family(http_context& ctx, sharded<db::system_keyspace>& sys_ks);
future<> unset_server_column_family(http_context& ctx);
future<> set_server_messaging_service(http_context& ctx, sharded<netw::messaging_service>& ms);
future<> unset_server_messaging_service(http_context& ctx);
@@ -127,7 +126,9 @@ future<> set_server_stream_manager(http_context& ctx, sharded<streaming::stream_
future<> unset_server_stream_manager(http_context& ctx);
future<> set_hinted_handoff(http_context& ctx, sharded<service::storage_proxy>& p, sharded<gms::gossiper>& g);
future<> unset_hinted_handoff(http_context& ctx);
future<> set_server_compaction_manager(http_context& ctx, sharded<compaction::compaction_manager>& cm);
future<> set_server_cache(http_context& ctx);
future<> unset_server_cache(http_context& ctx);
future<> set_server_compaction_manager(http_context& ctx, sharded<compaction_manager>& cm);
future<> unset_server_compaction_manager(http_context& ctx);
future<> set_server_done(http_context& ctx);
future<> set_server_task_manager(http_context& ctx, sharded<tasks::task_manager>& tm, lw_shared_ptr<db::config> cfg, sharded<gms::gossiper>& gossiper);
@@ -140,6 +141,8 @@ future<> set_server_raft(http_context&, sharded<service::raft_group_registry>&);
future<> unset_server_raft(http_context&);
future<> set_load_meter(http_context& ctx, service::load_meter& lm);
future<> unset_load_meter(http_context& ctx);
future<> set_format_selector(http_context& ctx, db::sstables_format_selector& sel);
future<> unset_format_selector(http_context& ctx);
future<> set_server_cql_server_test(http_context& ctx, cql_transport::controller& ctl);
future<> unset_server_cql_server_test(http_context& ctx);
future<> set_server_service_levels(http_context& ctx, cql_transport::controller& ctl, sharded<cql3::query_processor>& qp);

View File

@@ -16,7 +16,7 @@ using namespace json;
using namespace seastar::httpd;
namespace cs = httpd::cache_service_json;
void set_cache_service(http_context& ctx, sharded<replica::database>& db, routes& r) {
void set_cache_service(http_context& ctx, routes& r) {
cs::get_row_cache_save_period_in_seconds.set(r, [](std::unique_ptr<http::request> req) {
// We never save the cache
// Origin uses 0 for never
@@ -204,53 +204,53 @@ void set_cache_service(http_context& ctx, sharded<replica::database>& db, routes
});
});
cs::get_row_hits.set(r, [&db] (std::unique_ptr<http::request> req) {
return map_reduce_cf(db, uint64_t(0), [](const replica::column_family& cf) {
cs::get_row_hits.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, uint64_t(0), [](const replica::column_family& cf) {
return cf.get_row_cache().stats().hits.count();
}, std::plus<uint64_t>());
});
cs::get_row_requests.set(r, [&db] (std::unique_ptr<http::request> req) {
return map_reduce_cf(db, uint64_t(0), [](const replica::column_family& cf) {
cs::get_row_requests.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, uint64_t(0), [](const replica::column_family& cf) {
return cf.get_row_cache().stats().hits.count() + cf.get_row_cache().stats().misses.count();
}, std::plus<uint64_t>());
});
cs::get_row_hit_rate.set(r, [&db] (std::unique_ptr<http::request> req) {
return map_reduce_cf(db, ratio_holder(), [](const replica::column_family& cf) {
cs::get_row_hit_rate.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx, ratio_holder(), [](const replica::column_family& cf) {
return ratio_holder(cf.get_row_cache().stats().hits.count() + cf.get_row_cache().stats().misses.count(),
cf.get_row_cache().stats().hits.count());
}, std::plus<ratio_holder>());
});
cs::get_row_hits_moving_avrage.set(r, [&db] (std::unique_ptr<http::request> req) {
return map_reduce_cf_raw(db, utils::rate_moving_average(), [](const replica::column_family& cf) {
cs::get_row_hits_moving_avrage.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf_raw(ctx, utils::rate_moving_average(), [](const replica::column_family& cf) {
return cf.get_row_cache().stats().hits.rate();
}, std::plus<utils::rate_moving_average>()).then([](const utils::rate_moving_average& m) {
return make_ready_future<json::json_return_type>(meter_to_json(m));
});
});
cs::get_row_requests_moving_avrage.set(r, [&db] (std::unique_ptr<http::request> req) {
return map_reduce_cf_raw(db, utils::rate_moving_average(), [](const replica::column_family& cf) {
cs::get_row_requests_moving_avrage.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf_raw(ctx, utils::rate_moving_average(), [](const replica::column_family& cf) {
return cf.get_row_cache().stats().hits.rate() + cf.get_row_cache().stats().misses.rate();
}, std::plus<utils::rate_moving_average>()).then([](const utils::rate_moving_average& m) {
return make_ready_future<json::json_return_type>(meter_to_json(m));
});
});
cs::get_row_size.set(r, [&db] (std::unique_ptr<http::request> req) {
cs::get_row_size.set(r, [&ctx] (std::unique_ptr<http::request> req) {
// In origin row size is the weighted size.
// We currently do not support weights, so we use raw size in bytes instead
return db.map_reduce0([](replica::database& db) -> uint64_t {
return ctx.db.map_reduce0([](replica::database& db) -> uint64_t {
return db.row_cache_tracker().region().occupancy().used_space();
}, uint64_t(0), std::plus<uint64_t>()).then([](const int64_t& res) {
return make_ready_future<json::json_return_type>(res);
});
});
cs::get_row_entries.set(r, [&db] (std::unique_ptr<http::request> req) {
return db.map_reduce0([](replica::database& db) -> uint64_t {
cs::get_row_entries.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return ctx.db.map_reduce0([](replica::database& db) -> uint64_t {
return db.row_cache_tracker().partitions();
}, uint64_t(0), std::plus<uint64_t>()).then([](const int64_t& res) {
return make_ready_future<json::json_return_type>(res);

View File

@@ -7,20 +7,15 @@
*/
#pragma once
#include <seastar/core/sharded.hh>
namespace seastar::httpd {
class routes;
}
namespace replica {
class database;
}
namespace api {
struct http_context;
void set_cache_service(http_context& ctx, seastar::sharded<replica::database>& db, seastar::httpd::routes& r);
void set_cache_service(http_context& ctx, seastar::httpd::routes& r);
void unset_cache_service(http_context& ctx, seastar::httpd::routes& r);
}

File diff suppressed because it is too large Load Diff

View File

@@ -13,25 +13,25 @@
#include <any>
#include "api/api_init.hh"
namespace db {
class system_keyspace;
}
namespace api {
void set_column_family(http_context& ctx, httpd::routes& r, sharded<replica::database>& db);
void set_column_family(http_context& ctx, httpd::routes& r, sharded<db::system_keyspace>& sys_ks);
void unset_column_family(http_context& ctx, httpd::routes& r);
table_info parse_table_info(const sstring& name, const replica::database& db);
template<class Mapper, class I, class Reducer>
future<I> map_reduce_cf_raw(sharded<replica::database>& db, const sstring& name, I init,
future<I> map_reduce_cf_raw(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer) {
auto uuid = parse_table_info(name, db.local()).id;
using mapper_type = std::function<future<std::unique_ptr<std::any>>(replica::database&)>;
auto uuid = parse_table_info(name, ctx.db.local()).id;
using mapper_type = std::function<std::unique_ptr<std::any>(replica::database&)>;
using reducer_type = std::function<std::unique_ptr<std::any>(std::unique_ptr<std::any>, std::unique_ptr<std::any>)>;
return db.map_reduce0(mapper_type([mapper, uuid](replica::database& db) {
return futurize_invoke([mapper, &db, uuid] {
return mapper(db.find_column_family(uuid));
}).then([] (auto result) {
return std::make_unique<std::any>(I(std::move(result)));
});
return ctx.db.map_reduce0(mapper_type([mapper, uuid](replica::database& db) {
return std::make_unique<std::any>(I(mapper(db.find_column_family(uuid))));
}), std::make_unique<std::any>(std::move(init)), reducer_type([reducer = std::move(reducer)] (std::unique_ptr<std::any> a, std::unique_ptr<std::any> b) mutable {
return std::make_unique<std::any>(I(reducer(std::any_cast<I>(std::move(*a)), std::any_cast<I>(std::move(*b)))));
})).then([] (std::unique_ptr<std::any> r) {
@@ -41,30 +41,33 @@ future<I> map_reduce_cf_raw(sharded<replica::database>& db, const sstring& name,
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(sharded<replica::database>& db, const sstring& name, I init,
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer) {
return map_reduce_cf_raw(db, name, init, mapper, reducer).then([](const I& res) {
return map_reduce_cf_raw(ctx, name, init, mapper, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
template<class Mapper, class I, class Reducer, class Result>
future<json::json_return_type> map_reduce_cf(sharded<replica::database>& db, const sstring& name, I init,
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer, Result result) {
return map_reduce_cf_raw(db, name, init, mapper, reducer).then([result](const I& res) mutable {
return map_reduce_cf_raw(ctx, name, init, mapper, reducer).then([result](const I& res) mutable {
result = res;
return make_ready_future<json::json_return_type>(result);
});
}
future<json::json_return_type> map_reduce_cf_time_histogram(http_context& ctx, const sstring& name, std::function<utils::time_estimated_histogram(const replica::column_family&)> f);
struct map_reduce_column_families_locally {
std::any init;
std::function<future<std::unique_ptr<std::any>>(replica::column_family&)> mapper;
std::function<std::unique_ptr<std::any>(replica::column_family&)> mapper;
std::function<std::unique_ptr<std::any>(std::unique_ptr<std::any>, std::unique_ptr<std::any>)> reducer;
future<std::unique_ptr<std::any>> operator()(replica::database& db) const {
auto res = seastar::make_lw_shared<std::unique_ptr<std::any>>(std::make_unique<std::any>(init));
return db.get_tables_metadata().for_each_table_gently([res, this] (table_id, seastar::lw_shared_ptr<replica::table> table) -> future<> {
*res = reducer(std::move(*res), co_await mapper(*table.get()));
return db.get_tables_metadata().for_each_table_gently([res, this] (table_id, seastar::lw_shared_ptr<replica::table> table) {
*res = reducer(std::move(*res), mapper(*table.get()));
return make_ready_future();
}).then([res] () {
return std::move(*res);
});
@@ -72,21 +75,17 @@ struct map_reduce_column_families_locally {
};
template<class Mapper, class I, class Reducer>
future<I> map_reduce_cf_raw(sharded<replica::database>& db, I init,
future<I> map_reduce_cf_raw(http_context& ctx, I init,
Mapper mapper, Reducer reducer) {
using mapper_type = std::function<future<std::unique_ptr<std::any>>(replica::column_family&)>;
using mapper_type = std::function<std::unique_ptr<std::any>(replica::column_family&)>;
using reducer_type = std::function<std::unique_ptr<std::any>(std::unique_ptr<std::any>, std::unique_ptr<std::any>)>;
auto wrapped_mapper = mapper_type([mapper = std::move(mapper)] (replica::column_family& cf) mutable {
return futurize_invoke([&cf, mapper] {
return mapper(cf);
}).then([] (auto result) {
return std::make_unique<std::any>(I(std::move(result)));
});
return std::make_unique<std::any>(I(mapper(cf)));
});
auto wrapped_reducer = reducer_type([reducer = std::move(reducer)] (std::unique_ptr<std::any> a, std::unique_ptr<std::any> b) mutable {
return std::make_unique<std::any>(I(reducer(std::any_cast<I>(std::move(*a)), std::any_cast<I>(std::move(*b)))));
});
return db.map_reduce0(map_reduce_column_families_locally{init,
return ctx.db.map_reduce0(map_reduce_column_families_locally{init,
std::move(wrapped_mapper), wrapped_reducer}, std::make_unique<std::any>(init), wrapped_reducer).then([] (std::unique_ptr<std::any> res) {
return std::any_cast<I>(std::move(*res));
});
@@ -94,13 +93,20 @@ future<I> map_reduce_cf_raw(sharded<replica::database>& db, I init,
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(sharded<replica::database>& db, I init,
future<json::json_return_type> map_reduce_cf(http_context& ctx, I init,
Mapper mapper, Reducer reducer) {
return map_reduce_cf_raw(db, init, mapper, reducer).then([](const I& res) {
return map_reduce_cf_raw(ctx, init, mapper, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
future<json::json_return_type> get_cf_stats(http_context& ctx, const sstring& name,
int64_t replica::column_family_stats::*f);
future<json::json_return_type> get_cf_stats(http_context& ctx,
int64_t replica::column_family_stats::*f);
std::tuple<sstring, sstring> parse_fully_qualified_cf_name(sstring name);
}

View File

@@ -14,7 +14,6 @@
#include "api/api.hh"
#include "api/api-doc/compaction_manager.json.hh"
#include "api/api-doc/storage_service.json.hh"
#include "db/compaction_history_entry.hh"
#include "db/system_keyspace.hh"
#include "column_family.hh"
#include "unimplemented.hh"
@@ -29,9 +28,9 @@ namespace ss = httpd::storage_service_json;
using namespace json;
using namespace seastar::httpd;
static future<json::json_return_type> get_cm_stats(sharded<compaction::compaction_manager>& cm,
int64_t compaction::compaction_manager::stats::*f) {
return cm.map_reduce0([f](compaction::compaction_manager& cm) {
static future<json::json_return_type> get_cm_stats(sharded<compaction_manager>& cm,
int64_t compaction_manager::stats::*f) {
return cm.map_reduce0([f](compaction_manager& cm) {
return cm.get_stats().*f;
}, int64_t(0), std::plus<int64_t>()).then([](const int64_t& res) {
return make_ready_future<json::json_return_type>(res);
@@ -47,9 +46,9 @@ static std::unordered_map<std::pair<sstring, sstring>, uint64_t, utils::tuple_ha
return std::move(a);
}
void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::compaction_manager>& cm) {
void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction_manager>& cm) {
cm::get_compactions.set(r, [&cm] (std::unique_ptr<http::request> req) {
return cm.map_reduce0([](compaction::compaction_manager& cm) {
return cm.map_reduce0([](compaction_manager& cm) {
std::vector<cm::summary> summaries;
for (const auto& c : cm.get_compactions()) {
@@ -58,7 +57,7 @@ void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::co
s.ks = c.ks_name;
s.cf = c.cf_name;
s.unit = "keys";
s.task_type = compaction::compaction_name(c.type);
s.task_type = sstables::compaction_name(c.type);
s.completed = c.total_keys_written;
s.total = c.total_partitions;
summaries.push_back(std::move(s));
@@ -72,9 +71,10 @@ void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::co
cm::get_pending_tasks_by_table.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return ctx.db.map_reduce0([](replica::database& db) {
return do_with(std::unordered_map<std::pair<sstring, sstring>, uint64_t, utils::tuple_hash>(), [&db](std::unordered_map<std::pair<sstring, sstring>, uint64_t, utils::tuple_hash>& tasks) {
return db.get_tables_metadata().for_each_table_gently([&tasks] (table_id, lw_shared_ptr<replica::table> table) -> future<> {
return db.get_tables_metadata().for_each_table_gently([&tasks] (table_id, lw_shared_ptr<replica::table> table) {
replica::table& cf = *table.get();
tasks[std::make_pair(cf.schema()->ks_name(), cf.schema()->cf_name())] = co_await cf.estimate_pending_compactions();
tasks[std::make_pair(cf.schema()->ks_name(), cf.schema()->cf_name())] = cf.estimate_pending_compactions();
return make_ready_future<>();
}).then([&tasks] {
return std::move(tasks);
});
@@ -103,20 +103,23 @@ void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::co
cm::stop_compaction.set(r, [&cm] (std::unique_ptr<http::request> req) {
auto type = req->get_query_param("type");
return cm.invoke_on_all([type] (compaction::compaction_manager& cm) {
return cm.invoke_on_all([type] (compaction_manager& cm) {
return cm.stop_compaction(type);
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
cm::stop_keyspace_compaction.set(r, [&ctx, &cm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto [ks_name, tables] = parse_table_infos(ctx, *req, "tables");
cm::stop_keyspace_compaction.set(r, [&ctx] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto ks_name = validate_keyspace(ctx, req);
auto tables = parse_table_infos(ks_name, ctx, req->query_parameters, "tables");
auto type = req->get_query_param("type");
co_await cm.invoke_on_all([&] (compaction::compaction_manager& cm) {
co_await ctx.db.invoke_on_all([&] (replica::database& db) {
auto& cm = db.get_compaction_manager();
return parallel_for_each(tables, [&] (const table_info& ti) {
return cm.stop_compaction(type, [id = ti.id] (const compaction::compaction_group_view* x) {
return x->schema()->id() == id;
auto& t = db.find_column_family(ti.id);
return t.parallel_foreach_table_state([&] (compaction::table_state& ts) {
return cm.stop_compaction(type, &ts);
});
});
});
@@ -124,13 +127,13 @@ void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::co
});
cm::get_pending_tasks.set(r, [&ctx] (std::unique_ptr<http::request> req) {
return map_reduce_cf(ctx.db, int64_t(0), [](replica::column_family& cf) {
return map_reduce_cf(ctx, int64_t(0), [](replica::column_family& cf) {
return cf.estimate_pending_compactions();
}, std::plus<int64_t>());
});
cm::get_completed_tasks.set(r, [&cm] (std::unique_ptr<http::request> req) {
return get_cm_stats(cm, &compaction::compaction_manager::stats::completed_tasks);
return get_cm_stats(cm, &compaction_manager::stats::completed_tasks);
});
cm::get_total_compactions_completed.set(r, [] (std::unique_ptr<http::request> req) {
@@ -148,7 +151,7 @@ void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::co
});
cm::get_compaction_history.set(r, [&cm] (std::unique_ptr<http::request> req) {
noncopyable_function<future<>(output_stream<char>&&)> f = [&cm] (output_stream<char>&& out) -> future<> {
std::function<future<>(output_stream<char>&&)> f = [&cm] (output_stream<char>&& out) -> future<> {
auto s = std::move(out);
bool first = true;
std::exception_ptr ex;
@@ -157,11 +160,8 @@ void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::co
co_await cm.local().get_compaction_history([&s, &first](const db::compaction_history_entry& entry) mutable -> future<> {
cm::history h;
h.id = fmt::to_string(entry.id);
h.shard_id = entry.shard_id;
h.ks = std::move(entry.ks);
h.cf = std::move(entry.cf);
h.compaction_type = entry.compaction_type;
h.started_at = entry.started_at;
h.compacted_at = entry.compacted_at;
h.bytes_in = entry.bytes_in;
h.bytes_out = entry.bytes_out;
@@ -173,24 +173,6 @@ void set_compaction_manager(http_context& ctx, routes& r, sharded<compaction::co
e.value = it.second;
h.rows_merged.push(std::move(e));
}
for (const auto& data : entry.sstables_in) {
httpd::compaction_manager_json::sstableinfo sstable;
sstable.generation = fmt::to_string(data.generation),
sstable.origin = data.origin,
sstable.size = data.size,
h.sstables_in.push(std::move(sstable));
}
for (const auto& data : entry.sstables_out) {
httpd::compaction_manager_json::sstableinfo sstable;
sstable.generation = fmt::to_string(data.generation),
sstable.origin = data.origin,
sstable.size = data.size,
h.sstables_out.push(std::move(sstable));
}
h.total_tombstone_purge_attempt = entry.total_tombstone_purge_attempt;
h.total_tombstone_purge_failure_due_to_overlapping_with_memtable = entry.total_tombstone_purge_failure_due_to_overlapping_with_memtable;
h.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable = entry.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable;
if (!first) {
co_await s.write(", ");
}

View File

@@ -13,13 +13,11 @@ namespace seastar::httpd {
class routes;
}
namespace compaction {
class compaction_manager;
}
namespace api {
struct http_context;
void set_compaction_manager(http_context& ctx, seastar::httpd::routes& r, seastar::sharded<compaction::compaction_manager>& cm);
void set_compaction_manager(http_context& ctx, seastar::httpd::routes& r, seastar::sharded<compaction_manager>& cm);
void unset_compaction_manager(http_context& ctx, seastar::httpd::routes& r);
}

View File

@@ -23,6 +23,22 @@ using namespace seastar::httpd;
namespace sp = httpd::storage_proxy_json;
namespace ss = httpd::storage_service_json;
template<class T>
json::json_return_type get_json_return_type(const T& val) {
return json::json_return_type(val);
}
/*
* As commented on db::seed_provider_type is not used
* and probably never will.
*
* Just in case, we will return its name
*/
template<>
json::json_return_type get_json_return_type(const db::seed_provider_type& val) {
return json::json_return_type(val.class_name);
}
std::string_view format_type(std::string_view type) {
if (type == "int") {
return "integer";
@@ -171,7 +187,7 @@ void set_config(std::shared_ptr < api_registry_builder20 > rb, http_context& ctx
});
ss::get_all_data_file_locations.set(r, [&cfg](const_req req) {
return cfg.data_file_directories();
return container_to_vec(cfg.data_file_directories());
});
ss::get_saved_caches_location.set(r, [&cfg](const_req req) {

View File

@@ -21,10 +21,10 @@ namespace hf = httpd::error_injection_json;
void set_error_injection(http_context& ctx, routes& r) {
hf::enable_injection.set(r, [](std::unique_ptr<request> req) -> future<json::json_return_type> {
hf::enable_injection.set(r, [](std::unique_ptr<request> req) {
sstring injection = req->get_path_param("injection");
bool one_shot = req->get_query_param("one_shot") == "True";
auto params = co_await util::read_entire_stream_contiguous(*req->content_stream);
auto params = req->content;
const size_t max_params_size = 1024 * 1024;
if (params.size() > max_params_size) {
@@ -39,11 +39,12 @@ void set_error_injection(http_context& ctx, routes& r) {
: rjson::parse_to_map<utils::error_injection_parameters>(params);
auto& errinj = utils::get_local_injector();
co_await errinj.enable_on_all(injection, one_shot, std::move(parameters));
return errinj.enable_on_all(injection, one_shot, std::move(parameters)).then([] {
return make_ready_future<json::json_return_type>(json::json_void());
});
} catch (const rjson::error& e) {
throw httpd::bad_param_exception(format("Failed to parse injections parameters: {}", e.what()));
}
co_return json::json_void();
});
hf::get_enabled_injections_on_all.set(r, [](std::unique_ptr<request> req) {

View File

@@ -22,10 +22,10 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
return g.container().invoke_on(0, [] (gms::gossiper& g) {
std::vector<fd::endpoint_state> res;
res.reserve(g.num_endpoints());
g.for_each_endpoint_state([&] (const gms::endpoint_state& eps) {
g.for_each_endpoint_state([&] (const gms::inet_address& addr, const gms::endpoint_state& eps) {
fd::endpoint_state val;
val.addrs = fmt::to_string(eps.get_ip());
val.is_alive = g.is_alive(eps.get_host_id());
val.addrs = fmt::to_string(addr);
val.is_alive = g.is_alive(addr);
val.generation = eps.get_heart_beat_state().get_generation().value();
val.version = eps.get_heart_beat_state().get_heart_beat_version().value();
val.update_time = eps.get_update_timestamp().time_since_epoch().count();
@@ -40,9 +40,7 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
}
res.emplace_back(std::move(val));
});
return make_ready_future<json::json_return_type>(json::stream_range_as_array(res, [](const fd::endpoint_state& i){
return i;
}));
return make_ready_future<json::json_return_type>(res);
});
});
@@ -66,15 +64,11 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
fd::get_simple_states.set(r, [&g] (std::unique_ptr<request> req) {
return g.container().invoke_on(0, [] (gms::gossiper& g) {
std::vector<fd::mapper> nodes_status;
nodes_status.reserve(g.num_endpoints());
g.for_each_endpoint_state([&] (const gms::endpoint_state& es) {
fd::mapper val;
val.key = fmt::to_string(es.get_ip());
val.value = g.is_alive(es.get_host_id()) ? "UP" : "DOWN";
nodes_status.emplace_back(std::move(val));
std::map<sstring, sstring> nodes_status;
g.for_each_endpoint_state([&] (const gms::inet_address& node, const gms::endpoint_state&) {
nodes_status.emplace(fmt::to_string(node), g.is_alive(node) ? "UP" : "DOWN");
});
return make_ready_future<json::json_return_type>(std::move(nodes_status));
return make_ready_future<json::json_return_type>(map_to_key_value<fd::mapper>(nodes_status));
});
});
@@ -87,7 +81,7 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
fd::get_endpoint_state.set(r, [&g] (std::unique_ptr<request> req) {
return g.container().invoke_on(0, [req = std::move(req)] (gms::gossiper& g) {
auto state = g.get_endpoint_state_ptr(g.get_host_id(gms::inet_address(req->get_path_param("addr"))));
auto state = g.get_endpoint_state_ptr(gms::inet_address(req->get_path_param("addr")));
if (!state) {
return make_ready_future<json::json_return_type>(format("unknown endpoint {}", req->get_path_param("addr")));
}

View File

@@ -21,45 +21,51 @@ using namespace json;
void set_gossiper(http_context& ctx, routes& r, gms::gossiper& g) {
httpd::gossiper_json::get_down_endpoint.set(r, [&g] (std::unique_ptr<request> req) -> future<json::json_return_type> {
auto res = co_await g.get_unreachable_members_synchronized();
co_return json::json_return_type(res | std::views::transform([] (auto& ep) { return fmt::to_string(ep); }) | std::ranges::to<std::vector>());
co_return json::json_return_type(container_to_vec(res));
});
httpd::gossiper_json::get_live_endpoint.set(r, [&g] (std::unique_ptr<request> req) -> future<json::json_return_type> {
auto res = co_await g.get_live_members_synchronized();
co_return json::json_return_type(res | std::views::transform([] (auto& ep) { return fmt::to_string(ep); }) | std::ranges::to<std::vector>());
httpd::gossiper_json::get_live_endpoint.set(r, [&g] (std::unique_ptr<request> req) {
return g.get_live_members_synchronized().then([] (auto res) {
return make_ready_future<json::json_return_type>(container_to_vec(res));
});
});
httpd::gossiper_json::get_endpoint_downtime.set(r, [&g] (std::unique_ptr<request> req) -> future<json::json_return_type> {
gms::inet_address ep(req->get_path_param("addr"));
// synchronize unreachable_members on all shards
co_await g.get_unreachable_members_synchronized();
co_return g.get_endpoint_downtime(g.get_host_id(ep));
co_return g.get_endpoint_downtime(ep);
});
httpd::gossiper_json::get_current_generation_number.set(r, [&g] (std::unique_ptr<http::request> req) {
gms::inet_address ep(req->get_path_param("addr"));
return g.get_current_generation_number(g.get_host_id(ep)).then([] (gms::generation_type res) {
return g.get_current_generation_number(ep).then([] (gms::generation_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});
httpd::gossiper_json::get_current_heart_beat_version.set(r, [&g] (std::unique_ptr<http::request> req) {
gms::inet_address ep(req->get_path_param("addr"));
return g.get_current_heart_beat_version(g.get_host_id(ep)).then([] (gms::version_type res) {
return g.get_current_heart_beat_version(ep).then([] (gms::version_type res) {
return make_ready_future<json::json_return_type>(res.value());
});
});
httpd::gossiper_json::assassinate_endpoint.set(r, [&g](std::unique_ptr<http::request> req) {
return g.assassinate_endpoint(req->get_path_param("addr")).then([] {
if (req->get_query_param("unsafe") != "True") {
return g.assassinate_endpoint(req->get_path_param("addr")).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
}
return g.unsafe_assassinate_endpoint(req->get_path_param("addr")).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
httpd::gossiper_json::force_remove_endpoint.set(r, [&g](std::unique_ptr<http::request> req) {
gms::inet_address ep(req->get_path_param("addr"));
return g.force_remove_endpoint(g.get_host_id(ep), gms::null_permit_id).then([] () {
return g.force_remove_endpoint(ep, gms::null_permit_id).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

View File

@@ -148,7 +148,7 @@ void set_messaging_service(http_context& ctx, routes& r, sharded<netw::messaging
hf::inject_disconnect.set(r, [&ms] (std::unique_ptr<request> req) -> future<json::json_return_type> {
auto ip = msg_addr(req->get_path_param("ip"));
co_await ms.invoke_on_all([ip] (netw::messaging_service& ms) {
ms.remove_rpc_client(ip, std::nullopt);
ms.remove_rpc_client(ip);
});
co_return json::json_void();
});

View File

@@ -71,7 +71,7 @@ void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_regis
co_return json_void{};
});
r::get_leader_host.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
if (req->get_query_param("group_id").empty()) {
if (!req->query_parameters.contains("group_id")) {
const auto leader_id = co_await raft_gr.invoke_on(0, [] (service::raft_group_registry& raft_gr) {
auto& srv = raft_gr.group0();
return srv.current_leader();
@@ -100,7 +100,7 @@ void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_regis
r::read_barrier.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
auto timeout = get_request_timeout(*req);
if (req->get_query_param("group_id").empty()) {
if (!req->query_parameters.contains("group_id")) {
// Read barrier on group 0 by default
co_await raft_gr.invoke_on(0, [timeout] (service::raft_group_registry& raft_gr) -> future<> {
co_await raft_gr.group0_with_timeouts().read_barrier(nullptr, timeout);
@@ -131,7 +131,7 @@ void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_regis
const auto stepdown_timeout_ticks = dur / service::raft_tick_interval;
auto timeout_dur = raft::logical_clock::duration(stepdown_timeout_ticks);
if (req->get_query_param("group_id").empty()) {
if (!req->query_parameters.contains("group_id")) {
// Stepdown on group 0 by default
co_await raft_gr.invoke_on(0, [timeout_dur] (service::raft_group_registry& raft_gr) {
apilog.info("Triggering stepdown for group0");

View File

@@ -11,7 +11,7 @@
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include <seastar/json/json_elements.hh>
#include "seastar/json/json_elements.hh"
#include "transport/controller.hh"
#include <unordered_map>

View File

@@ -39,7 +39,7 @@ utils::time_estimated_histogram timed_rate_moving_average_summary_merge(utils::t
* @return A future that resolves to the result of the aggregation.
*/
template<typename V, typename Reducer, typename InnerMapper>
future<V> two_dimensional_map_reduce(sharded<service::storage_proxy>& d,
future<V> two_dimensional_map_reduce(distributed<service::storage_proxy>& d,
InnerMapper mapper, Reducer reducer, V initial_value) {
return d.map_reduce0( [mapper, reducer, initial_value] (const service::storage_proxy& sp) {
return map_reduce_scheduling_group_specific<service::storage_proxy_stats::stats>(
@@ -59,7 +59,7 @@ future<V> two_dimensional_map_reduce(sharded<service::storage_proxy>& d,
* @return A future that resolves to the result of the aggregation.
*/
template<typename V, typename Reducer, typename F, typename C>
future<V> two_dimensional_map_reduce(sharded<service::storage_proxy>& d,
future<V> two_dimensional_map_reduce(distributed<service::storage_proxy>& d,
C F::*f, Reducer reducer, V initial_value) {
return two_dimensional_map_reduce(d, [f] (F& stats) -> V {
return stats.*f;
@@ -75,20 +75,20 @@ future<V> two_dimensional_map_reduce(sharded<service::storage_proxy>& d,
*
*/
template<typename V, typename F>
future<json::json_return_type> sum_stats_storage_proxy(sharded<proxy>& d, V F::*f) {
future<json::json_return_type> sum_stats_storage_proxy(distributed<proxy>& d, V F::*f) {
return two_dimensional_map_reduce(d, [f] (F& stats) { return stats.*f; }, std::plus<V>(), V(0)).then([] (V val) {
return make_ready_future<json::json_return_type>(val);
});
}
static future<utils::rate_moving_average> sum_timed_rate(sharded<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
static future<utils::rate_moving_average> sum_timed_rate(distributed<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
return two_dimensional_map_reduce(d, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).rate();
}, std::plus<utils::rate_moving_average>(), utils::rate_moving_average());
}
static future<json::json_return_type> sum_timed_rate_as_obj(sharded<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
static future<json::json_return_type> sum_timed_rate_as_obj(distributed<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
return sum_timed_rate(d, f).then([](const utils::rate_moving_average& val) {
httpd::utils_json::rate_moving_average m;
m = val;
@@ -100,7 +100,7 @@ httpd::utils_json::rate_moving_average_and_histogram get_empty_moving_average()
return timer_to_json(utils::rate_moving_average_and_histogram());
}
static future<json::json_return_type> sum_timed_rate_as_long(sharded<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
static future<json::json_return_type> sum_timed_rate_as_long(distributed<proxy>& d, utils::timed_rate_moving_average service::storage_proxy_stats::stats::*f) {
return sum_timed_rate(d, f).then([](const utils::rate_moving_average& val) {
return make_ready_future<json::json_return_type>(val.count);
});
@@ -152,7 +152,7 @@ static future<json::json_return_type> total_latency(sharded<service::storage_pr
*/
template<typename F>
future<json::json_return_type>
sum_histogram_stats_storage_proxy(sharded<proxy>& d,
sum_histogram_stats_storage_proxy(distributed<proxy>& d,
utils::timed_rate_moving_average_summary_and_histogram F::*f) {
return two_dimensional_map_reduce(d, [f] (service::storage_proxy_stats::stats& stats) {
return (stats.*f).hist;
@@ -172,7 +172,7 @@ sum_histogram_stats_storage_proxy(sharded<proxy>& d,
*/
template<typename F>
future<json::json_return_type>
sum_timer_stats_storage_proxy(sharded<proxy>& d,
sum_timer_stats_storage_proxy(distributed<proxy>& d,
utils::timed_rate_moving_average_summary_and_histogram F::*f) {
return two_dimensional_map_reduce(d, [f] (service::storage_proxy_stats::stats& stats) {

File diff suppressed because it is too large Load Diff

View File

@@ -52,19 +52,17 @@ table_id validate_table(const replica::database& db, sstring ks_name, sstring ta
// containing the description of the respective no_such_column_family error.
// Returns a vector of all table infos given by the parameter, or
// if the parameter is not found or is empty, returns a list of all table infos in the keyspace.
std::vector<table_info> parse_table_infos(const sstring& ks_name, const http_context& ctx, const std::unordered_map<sstring, sstring>& query_params, sstring param_name);
std::vector<table_info> parse_table_infos(const sstring& ks_name, const http_context& ctx, sstring value);
std::pair<sstring, std::vector<table_info>> parse_table_infos(const http_context& ctx, const http::request& req, sstring cf_param_name = "cf");
struct scrub_info {
compaction::compaction_type_options::scrub opts;
sstables::compaction_type_options::scrub opts;
sstring keyspace;
std::vector<sstring> column_families;
sstring snapshot_tag;
};
scrub_info parse_scrub_options(const http_context& ctx, std::unique_ptr<http::request> req);
future<scrub_info> parse_scrub_options(const http_context& ctx, sharded<db::snapshot_ctl>& snap_ctl, std::unique_ptr<http::request> req);
void set_storage_service(http_context& ctx, httpd::routes& r, sharded<service::storage_service>& ss, service::raft_group0_client&);
void unset_storage_service(http_context& ctx, httpd::routes& r);
@@ -82,13 +80,6 @@ void set_snapshot(http_context& ctx, httpd::routes& r, sharded<db::snapshot_ctl>
void unset_snapshot(http_context& ctx, httpd::routes& r);
void set_load_meter(http_context& ctx, httpd::routes& r, service::load_meter& lm);
void unset_load_meter(http_context& ctx, httpd::routes& r);
seastar::future<json::json_return_type> run_toppartitions_query(db::toppartitions_query& q, bool legacy_request = false);
// converts string value of boolean parameter into bool
// maps (case insensitively)
// "true", "yes" and "1" into true
// "false", "no" and "0" into false
// otherwise throws runtime_error
bool validate_bool_x(const sstring& param, bool default_value);
seastar::future<json::json_return_type> run_toppartitions_query(db::toppartitions_query& q, http_context &ctx, bool legacy_request = false);
} // namespace api

View File

@@ -10,7 +10,7 @@
#include "api/api-doc/system.json.hh"
#include "api/api-doc/metrics.json.hh"
#include "replica/database.hh"
#include "sstables/sstables_manager.hh"
#include "db/sstables-format-selector.hh"
#include <rapidjson/document.h>
#include <boost/lexical_cast.hpp>
@@ -54,8 +54,7 @@ void set_system(http_context& ctx, routes& r) {
hm::set_metrics_config.set(r, [](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
rapidjson::Document doc;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
doc.Parse(content.c_str());
doc.Parse(req->content.c_str());
if (!doc.IsArray()) {
throw bad_param_exception("Expected a json array");
}
@@ -88,19 +87,21 @@ void set_system(http_context& ctx, routes& r) {
relabels[i].expr = element["regex"].GetString();
}
}
bool failed = false;
co_await smp::invoke_on_all([&relabels, &failed] {
return metrics::set_relabel_configs(relabels).then([&failed](const metrics::metric_relabeling_result& result) {
if (result.metrics_relabeled_due_to_collision > 0) {
failed = true;
return do_with(std::move(relabels), false, [](const std::vector<seastar::metrics::relabel_config>& relabels, bool& failed) {
return smp::invoke_on_all([&relabels, &failed] {
return metrics::set_relabel_configs(relabels).then([&failed](const metrics::metric_relabeling_result& result) {
if (result.metrics_relabeled_due_to_collision > 0) {
failed = true;
}
return;
});
}).then([&failed](){
if (failed) {
throw bad_param_exception("conflicts found during relabeling");
}
return;
return make_ready_future<json::json_return_type>(seastar::json::json_void());
});
});
if (failed) {
throw bad_param_exception("conflicts found during relabeling");
}
co_return seastar::json::json_void();
});
hs::get_system_uptime.set(r, [](const_req req) {
@@ -183,13 +184,18 @@ void set_system(http_context& ctx, routes& r) {
apilog.info("Profile dumped to {}", profile_dest);
return make_ready_future<json::json_return_type>(json::json_return_type(json::json_void()));
}) ;
}
hs::get_highest_supported_sstable_version.set(r, [&ctx] (std::unique_ptr<request> req) {
return smp::submit_to(0, [&ctx] {
auto format = ctx.db.local().get_user_sstables_manager().get_highest_supported_format();
return make_ready_future<json::json_return_type>(seastar::to_sstring(format));
void set_format_selector(http_context& ctx, routes& r, db::sstables_format_selector& sel) {
hs::get_highest_supported_sstable_version.set(r, [&sel] (std::unique_ptr<request> req) {
return smp::submit_to(0, [&sel] {
return make_ready_future<json::json_return_type>(seastar::to_sstring(sel.selected_format()));
});
});
}
void unset_format_selector(http_context& ctx, routes& r) {
hs::get_highest_supported_sstable_version.unset(r);
}
}

View File

@@ -12,9 +12,14 @@ namespace seastar::httpd {
class routes;
}
namespace db { class sstables_format_selector; }
namespace api {
struct http_context;
void set_system(http_context& ctx, seastar::httpd::routes& r);
void set_format_selector(http_context& ctx, seastar::httpd::routes& r, db::sstables_format_selector& sel);
void unset_format_selector(http_context& ctx, seastar::httpd::routes& r);
}

View File

@@ -6,7 +6,6 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/http/exception.hh>
@@ -35,9 +34,8 @@ static ::tm get_time(db_clock::time_point tp) {
}
tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& gossiper) {
chunked_fifo<tm::task_identity> tis;
tis.reserve(status.children.size());
for (const auto& child : status.children) {
std::vector<tm::task_identity> tis{status.children.size()};
std::ranges::transform(status.children, tis.begin(), [&gossiper] (const auto& child) {
tm::task_identity ident;
gms::inet_address addr{};
if (gossiper.local_is_initialized()) {
@@ -45,8 +43,8 @@ tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& g
}
ident.task_id = child.task_id.to_sstring();
ident.node = fmt::format("{}", addr);
tis.push_back(std::move(ident));
}
return ident;
});
tm::task_status res{};
res.id = status.task_id.to_sstring();
@@ -107,11 +105,11 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
throw bad_param_exception(fmt::format("{}", std::current_exception()));
}
if (auto param = req->get_query_param("keyspace"); !param.empty()) {
keyspace = param;
if (auto it = req->query_parameters.find("keyspace"); it != req->query_parameters.end()) {
keyspace = it->second;
}
if (auto param = req->get_query_param("table"); !param.empty()) {
table = param;
if (auto it = req->query_parameters.find("table"); it != req->query_parameters.end()) {
table = it->second;
}
return module->get_stats(internal, [keyspace = std::move(keyspace), table = std::move(table)] (std::string& ks, std::string& t) {
@@ -119,7 +117,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
});
});
noncopyable_function<future<>(output_stream<char>&&)> f = [r = std::move(res)] (output_stream<char>&& os) -> future<> {
std::function<future<>(output_stream<char>&&)> f = [r = std::move(res)] (output_stream<char>&& os) -> future<> {
auto s = std::move(os);
std::exception_ptr ex;
try {
@@ -175,8 +173,8 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}};
tasks::task_status status;
std::optional<std::chrono::seconds> timeout = std::nullopt;
if (auto param = req->get_query_param("timeout"); !param.empty()) {
timeout = std::chrono::seconds(boost::lexical_cast<uint32_t>(param));
if (auto it = req->query_parameters.find("timeout"); it != req->query_parameters.end()) {
timeout = std::chrono::seconds(boost::lexical_cast<uint32_t>(it->second));
}
try {
auto task = tasks::task_handler{tm.local(), id};
@@ -196,7 +194,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
auto task = tasks::task_handler{tm.local(), id};
auto res = co_await task.get_status_recursively(true);
noncopyable_function<future<>(output_stream<char>&&)> f = [r = std::move(res), &gossiper] (output_stream<char>&& os) -> future<> {
std::function<future<>(output_stream<char>&&)> f = [r = std::move(res), &gossiper] (output_stream<char>&& os) -> future<> {
auto s = std::move(os);
auto res = std::move(r);
co_await s.write("[");
@@ -217,7 +215,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
tm::get_and_update_ttl.set(r, [&cfg] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
uint32_t ttl = cfg.task_ttl_seconds();
try {
co_await cfg.task_ttl_seconds.set_value_on_all_shards(req->get_query_param("ttl"), utils::config_file::config_source::API);
co_await cfg.task_ttl_seconds.set_value_on_all_shards(req->query_parameters["ttl"], utils::config_file::config_source::API);
} catch (...) {
throw bad_param_exception(fmt::format("{}", std::current_exception()));
}
@@ -232,7 +230,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
tm::get_and_update_user_ttl.set(r, [&cfg] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
uint32_t user_ttl = cfg.user_task_ttl_seconds();
try {
co_await cfg.user_task_ttl_seconds.set_value_on_all_shards(req->get_query_param("user_ttl"), utils::config_file::config_source::API);
co_await cfg.user_task_ttl_seconds.set_value_on_all_shards(req->query_parameters["user_ttl"], utils::config_file::config_source::API);
} catch (...) {
throw bad_param_exception(fmt::format("{}", std::current_exception()));
}

View File

@@ -57,16 +57,20 @@ void set_task_manager_test(http_context& ctx, routes& r, sharded<tasks::task_man
tmt::register_test_task.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
sharded<tasks::task_manager>& tms = tm;
const auto id_param = req->get_query_param("task_id");
auto id = !id_param.empty() ? tasks::task_id{utils::UUID{id_param}} : tasks::task_id::create_null_id();
const auto shard_param = req->get_query_param("shard");
unsigned shard = shard_param.empty() ? 0 : boost::lexical_cast<unsigned>(shard_param);
std::string keyspace = req->get_query_param("keyspace");
std::string table = req->get_query_param("table");
std::string entity = req->get_query_param("entity");
auto it = req->query_parameters.find("task_id");
auto id = it != req->query_parameters.end() ? tasks::task_id{utils::UUID{it->second}} : tasks::task_id::create_null_id();
it = req->query_parameters.find("shard");
unsigned shard = it != req->query_parameters.end() ? boost::lexical_cast<unsigned>(it->second) : 0;
it = req->query_parameters.find("keyspace");
std::string keyspace = it != req->query_parameters.end() ? it->second : "";
it = req->query_parameters.find("table");
std::string table = it != req->query_parameters.end() ? it->second : "";
it = req->query_parameters.find("entity");
std::string entity = it != req->query_parameters.end() ? it->second : "";
it = req->query_parameters.find("parent_id");
tasks::task_info data;
if (auto parent_id = req->get_query_param("parent_id"); !parent_id.empty()) {
data.id = tasks::task_id{utils::UUID{parent_id}};
if (it != req->query_parameters.end()) {
data.id = tasks::task_id{utils::UUID{it->second}};
auto parent_ptr = co_await tasks::task_manager::lookup_task_on_all_shards(tm, data.id);
data.shard = parent_ptr->get_status().shard;
}
@@ -84,7 +88,7 @@ void set_task_manager_test(http_context& ctx, routes& r, sharded<tasks::task_man
});
tmt::unregister_test_task.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto id = tasks::task_id{utils::UUID{req->get_query_param("task_id")}};
auto id = tasks::task_id{utils::UUID{req->query_parameters["task_id"]}};
try {
co_await tasks::task_manager::invoke_on_task(tm, id, [] (tasks::task_manager::task_variant task_v, tasks::virtual_task_hint) -> future<> {
return std::visit(overloaded_functor{
@@ -105,8 +109,9 @@ void set_task_manager_test(http_context& ctx, routes& r, sharded<tasks::task_man
tmt::finish_test_task.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}};
std::string error = req->get_query_param("error");
bool fail = !error.empty();
auto it = req->query_parameters.find("error");
bool fail = it != req->query_parameters.end();
std::string error = fail ? it->second : "";
try {
co_await tasks::task_manager::invoke_on_task(tm, id, [fail, error = std::move(error)] (tasks::task_manager::task_variant task_v, tasks::virtual_task_hint) -> future<> {

View File

@@ -12,7 +12,6 @@
#include "api/api.hh"
#include "api/storage_service.hh"
#include "api/api-doc/tasks.json.hh"
#include "api/api-doc/storage_service.json.hh"
#include "compaction/compaction_manager.hh"
#include "compaction/task_manager_module.hh"
#include "service/storage_service.hh"
@@ -26,163 +25,96 @@ extern logging::logger apilog;
namespace api {
namespace t = httpd::tasks_json;
namespace ss = httpd::storage_service_json;
using namespace json;
using ks_cf_func = std::function<future<json::json_return_type>(http_context&, std::unique_ptr<http::request>, sstring, std::vector<table_info>)>;
static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
return [&ctx, f = std::move(f)](std::unique_ptr<http::request> req) {
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
auto keyspace = validate_keyspace(ctx, req);
auto table_infos = parse_table_infos(keyspace, ctx, req->query_parameters, "cf");
return f(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
};
}
static future<shared_ptr<compaction::major_keyspace_compaction_task_impl>> force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush && !consider_only_existing_data) {
fmopt = compaction::flush_mode::skip;
}
return compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
}
static future<shared_ptr<compaction::upgrade_sstables_compaction_task_impl>> upgrade_sstables(http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) {
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
return compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
}
static future<shared_ptr<compaction::cleanup_keyspace_compaction_task_impl>> force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
if (rs.is_local() || !rs.is_vnode_based()) {
auto reason = rs.is_local() ? "require" : "support";
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
co_return nullptr;
}
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
co_return co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
}
void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& snap_ctl) {
t::force_keyspace_compaction_async.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
auto& db = ctx.db;
auto params = req_params({
std::pair("keyspace", mandatory::yes),
std::pair("cf", mandatory::no),
std::pair("flush_memtables", mandatory::no),
});
params.process(*req);
auto keyspace = validate_keyspace(ctx, *params.get("keyspace"));
auto table_infos = parse_table_infos(keyspace, ctx, params.get("cf").value_or(""));
auto flush = params.get_as<bool>("flush_memtables").value_or(true);
apilog.debug("force_keyspace_compaction_async: keyspace={} tables={}, flush={}", keyspace, table_infos, flush);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<flush_mode> fmopt;
if (!flush) {
fmopt = flush_mode::skip;
}
auto task = co_await compaction_module.make_and_start_task<major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt);
co_return json::json_return_type(task->get_status().id.to_sstring());
});
ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
co_await task->done();
co_return json_void();
});
t::force_keyspace_cleanup_async.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
tasks::task_id id = tasks::task_id::create_null_id();
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
id = task->get_status().id;
auto& db = ctx.db;
auto keyspace = validate_keyspace(ctx, req);
auto table_infos = parse_table_infos(keyspace, ctx, req->query_parameters, "cf");
apilog.info("force_keyspace_cleanup_async: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup_async: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
co_return json::json_return_type(id.to_sstring());
});
ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
co_await task->done();
}
co_return json::json_return_type(0);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>({}, std::move(keyspace), db, table_infos, flush_mode::all_tables, tasks::is_user_task::yes);
co_return json::json_return_type(task->get_status().id.to_sstring());
});
t::perform_keyspace_offstrategy_compaction_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::offstrategy_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, table_infos, nullptr);
auto task = co_await compaction_module.make_and_start_task<offstrategy_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, table_infos, nullptr);
co_return json::json_return_type(task->get_status().id.to_sstring());
}));
ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
bool res = false;
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::offstrategy_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, table_infos, &res);
co_await task->done();
co_return json::json_return_type(res);
}));
t::upgrade_sstables_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
co_return json::json_return_type(task->get_status().id.to_sstring());
}));
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
co_await task->done();
co_return json::json_return_type(0);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
co_return json::json_return_type(task->get_status().id.to_sstring());
}));
t::scrub_async.set(r, [&ctx, &snap_ctl] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& db = ctx.db;
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
}
auto info = co_await parse_scrub_options(ctx, snap_ctl, std::move(req));
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::scrub_sstables_compaction_task_impl>({}, std::move(info.keyspace), db, std::move(info.column_families), info.opts, nullptr);
auto task = co_await compaction_module.make_and_start_task<scrub_sstables_compaction_task_impl>({}, std::move(info.keyspace), db, std::move(info.column_families), info.opts, nullptr);
co_return json::json_return_type(task->get_status().id.to_sstring());
});
ss::force_compaction.set(r, [&ctx] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& db = ctx.db;
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
apilog.info("force_compaction: flush={} consider_only_existing_data={}", flush, consider_only_existing_data);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush && !consider_only_existing_data) {
fmopt = compaction::flush_mode::skip;
}
auto task = co_await compaction_module.make_and_start_task<compaction::global_major_compaction_task_impl>({}, db, fmopt, consider_only_existing_data);
co_await task->done();
co_return json_void();
});
}
void unset_tasks_compaction_module(http_context& ctx, httpd::routes& r) {
t::force_keyspace_compaction_async.unset(r);
ss::force_keyspace_compaction.unset(r);
t::force_keyspace_cleanup_async.unset(r);
ss::force_keyspace_cleanup.unset(r);
t::perform_keyspace_offstrategy_compaction_async.unset(r);
ss::perform_keyspace_offstrategy_compaction.unset(r);
t::upgrade_sstables_async.unset(r);
ss::upgrade_sstables.unset(r);
t::scrub_async.unset(r);
ss::force_compaction.unset(r);
}
}

View File

@@ -54,23 +54,12 @@ void set_token_metadata(http_context& ctx, routes& r, sharded<locator::shared_to
for (const auto host_id: leaving_host_ids) {
eps.insert(g.local().get_address_map().get(host_id));
}
return eps | std::views::transform([] (auto& i) { return fmt::to_string(i); }) | std::ranges::to<std::vector>();
return container_to_vec(eps);
});
ss::get_moving_nodes.set(r, [](const_req req) {
std::unordered_set<sstring> addr;
return addr | std::ranges::to<std::vector>();
});
ss::get_excluded_nodes.set(r, [&tm](const_req req) {
const auto& local_tm = *tm.local().get();
std::vector<sstring> eps;
local_tm.get_topology().for_each_node([&] (auto& node) {
if (node.is_excluded()) {
eps.push_back(node.host_id().to_sstring());
}
});
return eps;
return container_to_vec(addr);
});
ss::get_joining_nodes.set(r, [&tm, &g](const_req req) {
@@ -81,21 +70,15 @@ void set_token_metadata(http_context& ctx, routes& r, sharded<locator::shared_to
for (const auto& [token, host_id]: points) {
eps.insert(g.local().get_address_map().get(host_id));
}
return eps | std::views::transform([] (auto& i) { return fmt::to_string(i); }) | std::ranges::to<std::vector>();
return container_to_vec(eps);
});
ss::get_host_id_map.set(r, [&tm, &g](const_req req) {
if (!g.local().is_enabled()) {
throw std::runtime_error("The gossiper is not ready yet");
}
return tm.local().get()->get_host_ids()
| std::views::transform([&g] (locator::host_id id) {
ss::mapper m;
m.key = fmt::to_string(g.local().get_address_map().get(id));
m.value = fmt::to_string(id);
return m;
})
| std::ranges::to<std::vector<ss::mapper>>();
std::vector<ss::mapper> res;
auto map = tm.local().get()->get_host_ids() |
std::views::transform([&g] (locator::host_id id) { return std::make_pair(g.local().get_address_map().get(id), id); }) |
std::ranges::to<std::unordered_map>();
return map_to_key_value(std::move(map), res);
});
static auto host_or_broadcast = [&tm](const_req req) {
@@ -141,7 +124,6 @@ void unset_token_metadata(http_context& ctx, routes& r) {
ss::get_leaving_nodes.unset(r);
ss::get_moving_nodes.unset(r);
ss::get_joining_nodes.unset(r);
ss::get_excluded_nodes.unset(r);
ss::get_host_id_map.unset(r);
httpd::endpoint_snitch_info_json::get_datacenter.unset(r);
httpd::endpoint_snitch_info_json::get_rack.unset(r);

View File

@@ -5,7 +5,6 @@ target_sources(scylla_audit
PRIVATE
audit.cc
audit_cf_storage_helper.cc
audit_composite_storage_helper.cc
audit_syslog_storage_helper.cc)
target_include_directories(scylla_audit
PUBLIC
@@ -17,7 +16,4 @@ target_link_libraries(scylla_audit
PRIVATE
cql3)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(scylla_audit REUSE_FROM scylla-precompiled-header)
endif()
add_whole_archive(audit scylla_audit)

View File

@@ -13,11 +13,9 @@
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "storage_helper.hh"
#include "audit_cf_storage_helper.hh"
#include "audit_syslog_storage_helper.hh"
#include "audit_composite_storage_helper.hh"
#include "audit.hh"
#include "../db/config.hh"
#include "utils/class_registrator.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
@@ -28,47 +26,6 @@ namespace audit {
logging::logger logger("audit");
static std::set<sstring> parse_audit_modes(const sstring& data) {
std::set<sstring> result;
if (!data.empty()) {
std::vector<sstring> audit_modes;
boost::split(audit_modes, data, boost::is_any_of(","));
if (audit_modes.empty()) {
return {};
}
for (sstring& audit_mode : audit_modes) {
boost::trim(audit_mode);
if (audit_mode == "none") {
return {};
}
if (audit_mode != "table" && audit_mode != "syslog") {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", audit_mode));
}
result.insert(std::move(audit_mode));
}
}
return result;
}
static std::unique_ptr<storage_helper> create_storage_helper(const std::set<sstring>& audit_modes, cql3::query_processor& qp, service::migration_manager& mm) {
SCYLLA_ASSERT(!audit_modes.empty() && !audit_modes.contains("none"));
std::vector<std::unique_ptr<storage_helper>> helpers;
for (const sstring& audit_mode : audit_modes) {
if (audit_mode == "table") {
helpers.emplace_back(std::make_unique<audit_cf_storage_helper>(qp, mm));
} else if (audit_mode == "syslog") {
helpers.emplace_back(std::make_unique<audit_syslog_storage_helper>(qp, mm));
}
}
SCYLLA_ASSERT(!helpers.empty());
if (helpers.size() == 1) {
return std::move(helpers.front());
}
return std::make_unique<audit_composite_storage_helper>(std::move(helpers));
}
static sstring category_to_string(statement_category category)
{
switch (category) {
@@ -146,9 +103,7 @@ static std::set<sstring> parse_audit_keyspaces(const sstring& data) {
}
audit::audit(locator::shared_token_metadata& token_metadata,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
sstring&& storage_helper_name,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
@@ -157,21 +112,28 @@ audit::audit(locator::shared_token_metadata& token_metadata,
, _audited_keyspaces(std::move(audited_keyspaces))
, _audited_tables(std::move(audited_tables))
, _audited_categories(std::move(audited_categories))
, _storage_helper_class_name(std::move(storage_helper_name))
, _cfg(cfg)
, _cfg_keyspaces_observer(cfg.audit_keyspaces.observe([this] (sstring const& new_value){ update_config<std::set<sstring>>(new_value, parse_audit_keyspaces, _audited_keyspaces); }))
, _cfg_tables_observer(cfg.audit_tables.observe([this] (sstring const& new_value){ update_config<std::map<sstring, std::set<sstring>>>(new_value, parse_audit_tables, _audited_tables); }))
, _cfg_categories_observer(cfg.audit_categories.observe([this] (sstring const& new_value){ update_config<category_set>(new_value, parse_audit_categories, _audited_categories); }))
{
_storage_helper_ptr = create_storage_helper(std::move(audit_modes), qp, mm);
}
{ }
audit::~audit() = default;
future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
std::set<sstring> audit_modes = parse_audit_modes(cfg.audit());
if (audit_modes.empty()) {
future<> audit::create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm) {
sstring storage_helper_name;
if (cfg.audit() == "table") {
storage_helper_name = "audit_cf_storage_helper";
} else if (cfg.audit() == "syslog") {
storage_helper_name = "audit_syslog_storage_helper";
} else if (cfg.audit() == "none") {
// Audit is off
logger.info("Audit is disabled");
return make_ready_future<>();
} else {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", cfg.audit()));
}
category_set audited_categories = parse_audit_categories(cfg.audit_categories());
std::map<sstring, std::set<sstring>> audited_tables = parse_audit_tables(cfg.audit_tables());
@@ -181,20 +143,19 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables());
return audit_instance().start(std::ref(stm),
std::ref(qp),
std::ref(mm),
std::move(audit_modes),
std::move(storage_helper_name),
std::move(audited_keyspaces),
std::move(audited_tables),
std::move(audited_categories),
std::cref(cfg))
.then([&cfg] {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit.start(cfg);
});
std::cref(cfg));
}
future<> audit::start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg, &qp, &mm] (audit& local_audit) {
return local_audit.start(cfg, qp.local(), mm.local());
});
}
@@ -220,7 +181,15 @@ audit_info_ptr audit::create_no_audit_info() {
return audit_info_ptr();
}
future<> audit::start(const db::config& cfg) {
future<> audit::start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm) {
try {
_storage_helper_ptr = create_object<storage_helper>(_storage_helper_class_name, qp, mm);
} catch (no_such_class& e) {
logger.error("Can't create audit storage helper {}: not supported", _storage_helper_class_name);
throw;
} catch (...) {
throw;
}
return _storage_helper_ptr->start(cfg);
}
@@ -240,11 +209,6 @@ future<> audit::log(const audit_info* audit_info, service::query_state& query_st
static const sstring anonymous_username("anonymous");
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
socket_address client_ip = client_state.get_client_address().addr();
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info->category_string(), cl, error, audit_info->keyspace(),
audit_info->query(), client_ip, audit_info->table(), username);
}
return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, audit_info, node_ip, client_ip, cl, username, error)
.handle_exception([audit_info, node_ip, client_ip, cl, username, error] (auto ep) {
logger.error("Unexpected exception when writing log with: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {} exception {}",
@@ -255,10 +219,6 @@ future<> audit::log(const audit_info* audit_info, service::query_state& query_st
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
node_ip, client_ip, username, error ? "true" : "false");
}
return futurize_invoke(std::mem_fn(&storage_helper::write_login), _storage_helper_ptr, username, node_ip, client_ip, error)
.handle_exception([username, node_ip, client_ip, error] (auto ep) {
logger.error("Unexpected exception when writing login log with: node_ip {} client_ip {} username {} error {} exception {}",

View File

@@ -102,6 +102,7 @@ class audit final : public seastar::async_sharded_service<audit> {
std::map<sstring, std::set<sstring>> _audited_tables;
category_set _audited_categories;
sstring _storage_helper_class_name;
std::unique_ptr<storage_helper> _storage_helper_ptr;
const db::config& _cfg;
@@ -124,20 +125,18 @@ public:
static audit& local_audit_instance() {
return audit_instance().local();
}
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm);
static future<> start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table);
static audit_info_ptr create_no_audit_info();
audit(locator::shared_token_metadata& stm,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
audit(locator::shared_token_metadata& stm, sstring&& storage_helper_name,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
const db::config& cfg);
~audit();
future<> start(const db::config& cfg);
future<> start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm);
future<> stop();
future<> shutdown();
bool should_log(const audit_info* audit_info) const;

View File

@@ -11,11 +11,11 @@
#include "cql3/query_processor.hh"
#include "data_dictionary/keyspace_metadata.hh"
#include "utils/UUID_gen.hh"
#include "utils/class_registrator.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "locator/abstract_replication_strategy.hh"
namespace audit {
@@ -64,8 +64,8 @@ future<> audit_cf_storage_helper::migrate_audit_table(service::group0_guard grou
data_dictionary::database db = _qp.db();
cql3::statements::ks_prop_defs old_ks_prop_defs;
auto old_ks_metadata = old_ks_prop_defs.as_ks_metadata_update(
ks->metadata(), *_qp.proxy().get_token_metadata_ptr(), db.features(), db.get_config());
locator::replication_strategy_config_options strategy_opts;
ks->metadata(), *_qp.proxy().get_token_metadata_ptr(), db.features());
std::map<sstring, sstring> strategy_opts;
for (const auto &dc: _qp.proxy().get_token_metadata_ptr()->get_topology().get_datacenters())
strategy_opts[dc] = "3";
@@ -73,7 +73,6 @@ future<> audit_cf_storage_helper::migrate_audit_table(service::group0_guard grou
"org.apache.cassandra.locator.NetworkTopologyStrategy",
strategy_opts,
std::nullopt, // initial_tablets
std::nullopt, // consistency_option
old_ks_metadata->durable_writes(),
old_ks_metadata->get_storage_options(),
old_ks_metadata->tables());
@@ -197,4 +196,7 @@ cql3::query_options audit_cf_storage_helper::make_login_data(socket_address node
return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
using registry = class_registrator<storage_helper, audit_cf_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_cf_storage_helper");
}

View File

@@ -1,68 +0,0 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/loop.hh>
#include <seastar/core/future-util.hh>
#include "audit/audit_composite_storage_helper.hh"
#include "utils/class_registrator.hh"
namespace audit {
audit_composite_storage_helper::audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&& storage_helpers)
: _storage_helpers(std::move(storage_helpers))
{}
future<> audit_composite_storage_helper::start(const db::config& cfg) {
auto res = seastar::parallel_for_each(
_storage_helpers,
[&cfg] (std::unique_ptr<storage_helper>& h) {
return h->start(cfg);
}
);
return res;
}
future<> audit_composite_storage_helper::stop() {
auto res = seastar::parallel_for_each(
_storage_helpers,
[] (std::unique_ptr<storage_helper>& h) {
return h->stop();
}
);
return res;
}
future<> audit_composite_storage_helper::write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[audit_info, node_ip, client_ip, cl, &username, error](std::unique_ptr<storage_helper>& h) {
return h->write(audit_info, node_ip, client_ip, cl, username, error);
}
);
}
future<> audit_composite_storage_helper::write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[&username, node_ip, client_ip, error](std::unique_ptr<storage_helper>& h) {
return h->write_login(username, node_ip, client_ip, error);
}
);
}
} // namespace audit

View File

@@ -1,37 +0,0 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "audit/audit.hh"
#include <seastar/core/future.hh>
#include "storage_helper.hh"
namespace audit {
class audit_composite_storage_helper : public storage_helper {
std::vector<std::unique_ptr<storage_helper>> _storage_helpers;
public:
explicit audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&&);
virtual ~audit_composite_storage_helper() = default;
virtual future<> start(const db::config& cfg) override;
virtual future<> stop() override;
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) override;
virtual future<> write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) override;
};
} // namespace audit

View File

@@ -21,6 +21,7 @@
#include <fmt/chrono.h>
#include "cql3/query_processor.hh"
#include "utils/class_registrator.hh"
namespace cql3 {
@@ -32,6 +33,20 @@ namespace audit {
namespace {
future<> syslog_send_helper(net::datagram_channel& sender,
const socket_address& address,
const sstring& msg) {
return sender.send(address, net::packet{msg.data(), msg.size()}).handle_exception([address](auto&& exception_ptr) {
auto error_msg = seastar::format(
"Syslog audit backend failed (sending a message to {} resulted in {}).",
address,
exception_ptr
);
logger.error("{}", error_msg);
throw audit_exception(std::move(error_msg));
});
}
static auto syslog_address_helper(const db::config& cfg)
{
return cfg.audit_unix_socket_path.is_set()
@@ -39,40 +54,11 @@ static auto syslog_address_helper(const db::config& cfg)
: unix_domain_addr(_PATH_LOG);
}
static std::string json_escape(std::string_view str) {
std::string result;
result.reserve(str.size() * 1.2);
for (auto c : str) {
if (c == '"' || c == '\\') {
result.push_back('\\');
}
result.push_back(c);
}
return result;
}
}
future<> audit_syslog_storage_helper::syslog_send_helper(const sstring& msg) {
try {
auto lock = co_await get_units(_semaphore, 1, std::chrono::hours(1));
co_await _sender.send(_syslog_address, net::packet{msg.data(), msg.size()});
}
catch (const std::exception& e) {
auto error_msg = seastar::format(
"Syslog audit backend failed (sending a message to {} resulted in {}).",
_syslog_address,
e
);
logger.error("{}", error_msg);
throw audit_exception(std::move(error_msg));
}
}
audit_syslog_storage_helper::audit_syslog_storage_helper(cql3::query_processor& qp, service::migration_manager&) :
_syslog_address(syslog_address_helper(qp.db().get_config())),
_sender(make_unbound_datagram_channel(AF_UNIX)),
_semaphore(1) {
_sender(make_unbound_datagram_channel(AF_UNIX)) {
}
audit_syslog_storage_helper::~audit_syslog_storage_helper() {
@@ -87,10 +73,10 @@ audit_syslog_storage_helper::~audit_syslog_storage_helper() {
*/
future<> audit_syslog_storage_helper::start(const db::config& cfg) {
if (this_shard_id() != 0) {
co_return;
return make_ready_future();
}
co_await syslog_send_helper("Initializing syslog audit backend.");
return syslog_send_helper(_sender, _syslog_address, "Initializing syslog audit backend.");
}
future<> audit_syslog_storage_helper::stop() {
@@ -107,7 +93,7 @@ future<> audit_syslog_storage_helper::write(const audit_info* audit_info,
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
tm time;
localtime_r(&now, &time);
sstring msg = seastar::format(R"(<{}>{:%h %e %T} scylla-audit: node="{}", category="{}", cl="{}", error="{}", keyspace="{}", query="{}", client_ip="{}", table="{}", username="{}")",
sstring msg = seastar::format("<{}>{:%h %e %T} scylla-audit: \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\", \"{}\"",
LOG_NOTICE | LOG_USER,
time,
node_ip,
@@ -115,12 +101,12 @@ future<> audit_syslog_storage_helper::write(const audit_info* audit_info,
cl,
(error ? "true" : "false"),
audit_info->keyspace(),
json_escape(audit_info->query()),
audit_info->query(),
client_ip,
audit_info->table(),
username);
co_await syslog_send_helper(msg);
return syslog_send_helper(_sender, _syslog_address, msg);
}
future<> audit_syslog_storage_helper::write_login(const sstring& username,
@@ -131,15 +117,18 @@ future<> audit_syslog_storage_helper::write_login(const sstring& username,
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
tm time;
localtime_r(&now, &time);
sstring msg = seastar::format(R"(<{}>{:%h %e %T} scylla-audit: node="{}", category="AUTH", cl="", error="{}", keyspace="", query="", client_ip="{}", table="", username="{}")",
sstring msg = seastar::format("<{}>{:%h %e %T} scylla-audit: \"{}\", \"AUTH\", \"\", \"\", \"\", \"\", \"{}\", \"{}\", \"{}\"",
LOG_NOTICE | LOG_USER,
time,
node_ip,
(error ? "true" : "false"),
client_ip,
username);
username,
(error ? "true" : "false"));
co_await syslog_send_helper(msg.c_str());
co_await syslog_send_helper(_sender, _syslog_address, msg.c_str());
}
using registry = class_registrator<storage_helper, audit_syslog_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_syslog_storage_helper");
}

View File

@@ -24,9 +24,6 @@ namespace audit {
class audit_syslog_storage_helper : public storage_helper {
socket_address _syslog_address;
net::datagram_channel _sender;
seastar::semaphore _semaphore;
future<> syslog_send_helper(const sstring& msg);
public:
explicit audit_syslog_storage_helper(cql3::query_processor&, service::migration_manager&);
virtual ~audit_syslog_storage_helper();

View File

@@ -9,7 +9,6 @@ target_sources(scylla_auth
allow_all_authorizer.cc
authenticated_user.cc
authenticator.cc
cache.cc
certificate_authenticator.cc
common.cc
default_authorizer.cc
@@ -45,8 +44,5 @@ target_link_libraries(scylla_auth
add_whole_archive(auth scylla_auth)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(scylla_auth REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers scylla_auth
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -9,7 +9,6 @@
#include "auth/allow_all_authenticator.hh"
#include "service/migration_manager.hh"
#include "utils/alien_worker.hh"
#include "utils/class_registrator.hh"
namespace auth {
@@ -22,8 +21,6 @@ static const class_registrator<
allow_all_authenticator,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -12,9 +12,7 @@
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
class query_processor;
@@ -30,7 +28,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {
}
virtual future<> start() override {

View File

@@ -1,180 +0,0 @@
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "auth/cache.hh"
#include "auth/common.hh"
#include "auth/roles-metadata.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
auto it = _roles.find(role);
if (it == _roles.end()) {
return {};
}
return it->second;
}
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
auto rec = make_lw_shared<role_record>();
rec->version = _current_version;
auto fetch = [this, &role](const sstring& q) {
return _qp.execute_internal(q, db::consistency_level::LOCAL_ONE,
internal_distributed_query_state(), {role},
cql3::query_processor::cache_internal::yes);
};
// roles
{
static const sstring q = format("SELECT * FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, meta::roles_table::name);
auto rs = co_await fetch(q);
if (!rs->empty()) {
auto& r = rs->one();
rec->is_superuser = r.get_or<bool>("is_superuser", false);
rec->can_login = r.get_or<bool>("can_login", false);
rec->salted_hash = r.get_or<sstring>("salted_hash", "");
if (r.has("member_of")) {
auto mo = r.get_set<sstring>("member_of");
rec->member_of.insert(
std::make_move_iterator(mo.begin()),
std::make_move_iterator(mo.end()));
}
} else {
// role got deleted
co_return nullptr;
}
}
// members
{
static const sstring q = format("SELECT role, member FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_MEMBERS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->members.insert(r.get_as<sstring>("member"));
co_await coroutine::maybe_yield();
}
}
// attributes
{
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->attributes[r.get_as<sstring>("name")] =
r.get_as<sstring>("value");
co_await coroutine::maybe_yield();
}
}
// permissions
{
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
auto resource = r.get_as<sstring>("resource");
auto perms_strings = r.get_set<sstring>("permissions");
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
auto pset = permissions::from_strings(perms_set);
rec->permissions[std::move(resource)] = std::move(pset);
co_await coroutine::maybe_yield();
}
}
co_return rec;
}
future<> cache::prune_all() noexcept {
for (auto it = _roles.begin(); it != _roles.end(); ) {
if (it->second->version != _current_version) {
_roles.erase(it++);
co_await coroutine::maybe_yield();
} else {
++it;
}
}
co_return;
}
future<> cache::load_all() {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
++_current_version;
logger.info("Loading all roles");
const uint32_t page_size = 128;
auto loader = [this](const cql3::untyped_result_set::row& r) -> future<stop_iteration> {
const auto name = r.get_as<sstring>("role");
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
}
co_return stop_iteration::no;
};
co_await _qp.query_internal(format("SELECT * FROM {}.{}",
db::system_keyspace::NAME, meta::roles_table::name),
db::consistency_level::LOCAL_ONE, {}, page_size, loader);
co_await prune_all();
for (const auto& [name, role] : _roles) {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
c._current_version = _current_version;
co_await c.prune_all();
});
}
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
} else {
_roles.erase(name);
}
co_await distribute_role(name, role);
}
}
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
if (!role_ptr) {
c._roles.erase(name);
return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c._roles[name] = std::move(role_copy);
});
}
bool cache::includes_table(const table_id& id) noexcept {
return id == db::system_keyspace::roles()->id()
|| id == db::system_keyspace::role_members()->id()
|| id == db::system_keyspace::role_attributes()->id()
|| id == db::system_keyspace::role_permissions()->id();
}
} // namespace auth

View File

@@ -1,61 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <absl/container/flat_hash_map.h>
#include "auth/permission.hh"
#include "auth/common.hh"
namespace cql3 { class query_processor; }
namespace auth {
class cache : public peering_sharded_service<cache> {
public:
using role_name_t = sstring;
using version_tag_t = char;
struct role_record {
bool can_login = false;
bool is_superuser = false;
std::unordered_set<role_name_t> member_of;
std::unordered_set<role_name_t> members;
sstring salted_hash;
std::unordered_map<sstring, sstring> attributes;
std::unordered_map<sstring, permission_set> permissions;
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
static bool includes_table(const table_id&) noexcept;
private:
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
};
} // namespace auth

View File

@@ -33,14 +33,13 @@ static const class_registrator<auth::authenticator
, auth::certificate_authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
, ::service::migration_manager&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();

View File

@@ -10,7 +10,6 @@
#pragma once
#include "auth/authenticator.hh"
#include "utils/alien_worker.hh"
#include <boost/regex_fwd.hpp> // IWYU pragma: keep
namespace cql3 {
@@ -32,7 +31,7 @@ class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
~certificate_authenticator();
future<> start() override;

Some files were not shown because too many files have changed in this diff Show More