Compare commits

..

5 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
8f6296b905 Simplify ungzip implementation per review feedback
- Remove manual gzip header parsing - libdeflate handles all format details
- Rename linearize_chunked_content to build_input_buffer and free chunks as we copy
- Add output chunking to split large decompressed data into 1MB chunks
- Add comment explaining libdeflate's whole-buffer requirement
- Use better initial size heuristic based on compression ratio

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 12:47:02 +00:00
copilot-swe-agent[bot]
4f44a61b3a Add edge case check for length limit in ungzip
- Check if total_decompressed >= length_limit before allocating output buffer
- Prevents allocating a zero-sized buffer when limit is already reached
- Ensures clear error message when limit is exceeded

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 11:50:31 +00:00
copilot-swe-agent[bot]
362491a650 Fix ungzip implementation to properly handle concatenated gzip files
- Removed unused get_gzip_member_size function
- Rely on libdeflate_gzip_decompress to tell us how many input bytes were consumed
- Added check for zero bytes consumed to detect invalid state
- Simplified the logic by removing unnecessary header size tracking

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 11:48:35 +00:00
copilot-swe-agent[bot]
b818331420 Add ungzip function implementation with libdeflate
- Created utils/gzip.hh header with ungzip function declaration
- Created utils/gzip.cc implementation using libdeflate
- Updated utils/CMakeLists.txt to include gzip.cc and link libdeflate
- Created comprehensive test suite in test/boost/gzip_test.cc
- Added gzip_test to test/boost/CMakeLists.txt

The implementation:
- Uses libdeflate for high-performance gzip decompression
- Handles chunked_content input/output (vector of temporary_buffer)
- Supports concatenated gzip files
- Validates gzip headers and detects invalid/truncated/corrupted data
- Enforces size limits to prevent memory exhaustion
- Runs in async context to avoid blocking the reactor

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2025-11-19 11:46:29 +00:00
copilot-swe-agent[bot]
c714159d5c Initial plan 2025-11-19 11:32:38 +00:00
446 changed files with 5873 additions and 12077 deletions

9
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall
auth/* @nuivall @ptrsmrn
# CACHE
row_cache* @tgrabiec
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall
cql3/* @tgrabiec @nuivall @ptrsmrn
# COUNTERS
counters* @nuivall
tests/counter_test* @nuivall
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
# DOCS
docs/* @annastuchlik @tzach
@@ -57,6 +57,7 @@ repair/* @tgrabiec @asias
# SCHEMA MANAGEMENT
db/schema_tables* @tgrabiec
db/legacy_schema_migrator* @tgrabiec
service/migration* @tgrabiec
schema* @tgrabiec

View File

@@ -1,86 +0,0 @@
# ScyllaDB Development Instructions
## Project Context
High-performance distributed NoSQL database. Core values: performance, correctness, readability.
## Build System
### Modern Build (configure.py + ninja)
```bash
# Configure (run once per mode, or when switching modes)
./configure.py --mode=<mode> # mode: dev, debug, release, sanitize
# Build everything
ninja <mode>-build # e.g., ninja dev-build
# Build Scylla binary only (sufficient for Python integration tests)
ninja build/<mode>/scylla
# Build specific test
ninja build/<mode>/test/boost/<test_name>
```
## Running Tests
### C++ Unit Tests
```bash
# Run all tests in a file
./test.py --mode=<mode> test/<suite>/<test_name>.cc
# Run a single test case from a file
./test.py --mode=<mode> test/<suite>/<test_name>.cc::<test_case_name>
# Examples
./test.py --mode=dev test/boost/memtable_test.cc
./test.py --mode=dev test/raft/raft_server_test.cc::test_check_abort_on_client_api
```
**Important:**
- Use full path with `.cc` extension (e.g., `test/boost/test_name.cc`, not `boost/test_name`)
- To run a single test case, append `::<test_case_name>` to the file path
- If you encounter permission issues with cgroup metric gathering, add `--no-gather-metrics` flag
**Rebuilding Tests:**
- test.py does NOT automatically rebuild when test source files are modified
- Many tests are part of composite binaries (e.g., `combined_tests` in test/boost contains multiple test files)
- To find which binary contains a test, check `configure.py` in the repository root (primary source) or `test/<suite>/CMakeLists.txt`
- To rebuild a specific test binary: `ninja build/<mode>/test/<suite>/<binary_name>`
- Examples:
- `ninja build/dev/test/boost/combined_tests` (contains group0_voter_calculator_test.cc and others)
- `ninja build/dev/test/raft/replication_test` (standalone Raft test)
### Python Integration Tests
```bash
# Only requires Scylla binary (full build usually not needed)
ninja build/<mode>/scylla
# Run all tests in a file
./test.py --mode=<mode> <test_path>
# Run a single test case from a file
./test.py --mode=<mode> <test_path>::<test_function_name>
# Examples
./test.py --mode=dev alternator/
./test.py --mode=dev cluster/test_raft_voters::test_raft_limited_voters_retain_coordinator
# Optional flags
./test.py --mode=dev cluster/test_raft_no_quorum -v # Verbose output
./test.py --mode=dev cluster/test_raft_no_quorum --repeat 5 # Repeat test 5 times
```
**Important:**
- Use path without `.py` extension (e.g., `cluster/test_raft_no_quorum`, not `cluster/test_raft_no_quorum.py`)
- To run a single test case, append `::<test_function_name>` to the file path
- Add `-v` for verbose output
- Add `--repeat <num>` to repeat a test multiple times
- After modifying C++ source files, only rebuild the Scylla binary for Python tests - building the entire repository is unnecessary
## Code Philosophy
- Performance matters in hot paths (data read/write, inner loops)
- Self-documenting code through clear naming
- Comments explain "why", not "what"
- Prefer standard library over custom implementations
- Strive for simplicity and clarity, add complexity only when clearly justified
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context

View File

@@ -1,115 +0,0 @@
---
applyTo: "**/*.{cc,hh}"
---
# C++ Guidelines
**Important:** Always match the style and conventions of existing code in the file and directory.
## Memory Management
- Prefer stack allocation whenever possible
- Use `std::unique_ptr` by default for dynamic allocations
- `new`/`delete` are forbidden (use RAII)
- Use `seastar::lw_shared_ptr` or `seastar::shared_ptr` for shared ownership within same shard
- Use `seastar::foreign_ptr` for cross-shard sharing
- Avoid `std::shared_ptr` except when interfacing with external C++ APIs
- Avoid raw pointers except for non-owning references or C API interop
## Seastar Asynchronous Programming
- Use `seastar::future<T>` for all async operations
- Prefer coroutines (`co_await`, `co_return`) over `.then()` chains for readability
- Coroutines are preferred over `seastar::do_with()` for managing temporary state
- In hot paths where futures are ready, continuations may be more efficient than coroutines
- Chain futures with `.then()`, don't block with `.get()` (unless in `seastar::thread` context)
- All I/O must be asynchronous (no blocking calls)
- Use `seastar::gate` for shutdown coordination
- Use `seastar::semaphore` for resource limiting (not `std::mutex`)
- Break long loops with `maybe_yield()` to avoid reactor stalls
## Coroutines
```cpp
seastar::future<T> func() {
auto result = co_await async_operation();
co_return result;
}
```
## Error Handling
- Throw exceptions for errors (futures propagate them automatically)
- In data path: avoid exceptions, use `std::expected` (or `boost::outcome`) instead
- Use standard exceptions (`std::runtime_error`, `std::invalid_argument`)
- Database-specific: throw appropriate schema/query exceptions
## Performance
- Pass large objects by `const&` or `&&` (move semantics)
- Use `std::string_view` for non-owning string references
- Avoid copies: prefer move semantics
- Use `utils::chunked_vector` instead of `std::vector` for large allocations (>128KB)
- Minimize dynamic allocations in hot paths
## Database-Specific Types
- Use `schema_ptr` for schema references
- Use `mutation` and `mutation_partition` for data modifications
- Use `partition_key` and `clustering_key` for keys
- Use `api::timestamp_type` for database timestamps
- Use `gc_clock` for garbage collection timing
## Style
- C++23 standard (prefer modern features, especially coroutines)
- Use `auto` when type is obvious from RHS
- Avoid `auto` when it obscures the type
- Use range-based for loops: `for (const auto& item : container)`
- Use standard algorithms when they clearly simplify code (e.g., replacing 10-line loops)
- Avoid chaining multiple algorithms if a straightforward loop is clearer
- Mark functions and variables `const` whenever possible
- Use scoped enums: `enum class` (not unscoped `enum`)
## Headers
- Use `#pragma once`
- Include order: own header, C++ std, Seastar, Boost, project headers
- Forward declare when possible
- Never `using namespace` in headers (exception: `using namespace seastar` is globally available via `seastarx.hh`)
## Documentation
- Public APIs require clear documentation
- Implementation details should be self-evident from code
- Use `///` or Doxygen `/** */` for public documentation, `//` for implementation notes - follow the existing style
## Naming
- `snake_case` for most identifiers (classes, functions, variables, namespaces)
- Template parameters: `CamelCase` (e.g., `template<typename ValueType>`)
- Member variables: prefix with `_` (e.g., `int _count;`)
- Structs (value-only): no `_` prefix on members
- Constants and `constexpr`: `snake_case` (e.g., `static constexpr int max_size = 100;`)
- Files: `.hh` for headers, `.cc` for source
## Formatting
- 4 spaces indentation, never tabs
- Opening braces on same line as control structure (except namespaces)
- Space after keywords: `if (`, `while (`, `return `
- Whitespace around operators matches precedence: `*a + *b` not `* a+* b`
- Line length: keep reasonable (<160 chars), use continuation lines with double indent if needed
- Brace all nested scopes, even single statements
- Minimal patches: only format code you modify, never reformat entire files
## Logging
- Use structured logging with appropriate levels: DEBUG, INFO, WARN, ERROR
- Include context in log messages (e.g., request IDs)
- Never log sensitive data (credentials, PII)
## Forbidden
- `malloc`/`free`
- `printf` family (use logging or fmt)
- Raw pointers for ownership
- `using namespace` in headers
- Blocking operations: `std::sleep`, `std::read`, `std::mutex` (use Seastar equivalents)
- `std::atomic` (reserved for very special circumstances only)
- Macros (use `inline`, `constexpr`, or templates instead)
## Testing
When modifying existing code, follow TDD: create/update test first, then implement.
- Examine existing tests for style and structure
- Use Boost.Test framework
- Use `SEASTAR_THREAD_TEST_CASE` for Seastar asynchronous tests
- Aim for high code coverage, especially for new features and bug fixes
- Maintain bisectability: all tests must pass in every commit. Mark failing tests with `BOOST_FAIL()` or similar, then fix in subsequent commit

View File

@@ -1,51 +0,0 @@
---
applyTo: "**/*.py"
---
# Python Guidelines
**Important:** Match existing code style. Some directories (like `test/cqlpy` and `test/alternator`) prefer simplicity over type hints and docstrings.
## Style
- Follow PEP 8
- Use type hints for function signatures (unless directory style omits them)
- Use f-strings for formatting
- Line length: 160 characters max
- 4 spaces for indentation
## Imports
Order: standard library, third-party, local imports
```python
import os
import sys
import pytest
from cassandra.cluster import Cluster
from test.utils import setup_keyspace
```
Never use `from module import *`
## Documentation
All public functions/classes need docstrings (unless the current directory conventions omit them):
```python
def my_function(arg1: str, arg2: int) -> bool:
"""
Brief summary of function purpose.
Args:
arg1: Description of first argument.
arg2: Description of second argument.
Returns:
Description of return value.
"""
pass
```
## Testing Best Practices
- Maintain bisectability: all tests must pass in every commit
- Mark currently-failing tests with `@pytest.mark.xfail`, unmark when fixed
- Use descriptive names that convey intent
- Docstrings/comments should explain what the test verifies and why, and if it reproduces a specific issue or how it fits into the larger test suite

View File

@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
if is_draft:
labels_to_add.append("conflicts")
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
pr_comment += "Please resolve them and mark this PR as ready for review"
backport_pr.create_issue_comment(pr_comment)
# Apply all labels at once if we have any

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -1,34 +0,0 @@
name: Docs / Validate metrics
on:
pull_request:
branches:
- master
- enterprise
paths:
- '**/*.cc'
- 'scripts/metrics-config.yml'
- 'scripts/get_description.py'
- 'docs/_ext/scylladb_metrics.py'
jobs:
validate-metrics:
runs-on: ubuntu-latest
name: Check metrics documentation coverage
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
- name: Install dependencies
run: pip install PyYAML
- name: Validate metrics
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml

View File

@@ -3,13 +3,10 @@ name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
pull_request_target:
types:
- unlabeled
jobs:
trigger-jenkins:
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI-Route Jenkins Job

View File

@@ -116,7 +116,6 @@ list(APPEND absl_cxx_flags
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
list(APPEND ABSL_GCC_FLAGS ${absl_cxx_flags})
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
list(APPEND absl_cxx_flags "-Wno-deprecated-builtins")
list(APPEND ABSL_LLVM_FLAGS ${absl_cxx_flags})
endif()
set(ABSL_DEFAULT_LINKOPTS
@@ -164,45 +163,7 @@ file(MAKE_DIRECTORY "${scylla_gen_build_dir}")
include(add_version_library)
generate_scylla_version()
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
add_library(scylla-precompiled-header STATIC exported_templates.cc)
target_link_libraries(scylla-precompiled-header PRIVATE
absl::headers
absl::btree
absl::hash
absl::raw_hash_set
Seastar::seastar
Snappy::snappy
systemd
ZLIB::ZLIB
lz4::lz4_static
zstd::zstd_static)
if (Scylla_USE_PRECOMPILED_HEADER)
set(Scylla_USE_PRECOMPILED_HEADER_USE ON)
find_program(DISTCC_EXEC NAMES distcc OPTIONAL)
if (DISTCC_EXEC)
if(DEFINED ENV{DISTCC_HOSTS})
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
message(STATUS "Disabling precompiled header usage because distcc exists and DISTCC_HOSTS is set, assuming you're using distributed compilation.")
else()
file(REAL_PATH "~/.distcc/hosts" DIST_CC_HOSTS_PATH EXPAND_TILDE)
if (EXISTS ${DIST_CC_HOSTS_PATH})
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
message(STATUS "Disabling precompiled header usage because distcc and ~/.distcc/hosts exists, assuming you're using distributed compilation.")
endif()
endif()
endif()
if (Scylla_USE_PRECOMPILED_HEADER_USE)
message(STATUS "Using precompiled header for Scylla - remember to add `sloppiness = pch_defines,time_macros` to ccache.conf, if you're using ccache.")
target_precompile_headers(scylla-precompiled-header PRIVATE "stdafx.hh")
target_compile_definitions(scylla-precompiled-header PRIVATE SCYLLA_USE_PRECOMPILED_HEADER)
endif()
else()
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
endif()
add_library(scylla-main STATIC)
target_sources(scylla-main
PRIVATE
absl-flat_hash_map.cc
@@ -247,7 +208,6 @@ target_link_libraries(scylla-main
ZLIB::ZLIB
lz4::lz4_static
zstd::zstd_static
scylla-precompiled-header
)
option(Scylla_CHECK_HEADERS

View File

@@ -34,8 +34,5 @@ target_link_libraries(alternator
idl
absl::headers)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(alternator REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers alternator
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
if (!comparison_operator.IsString()) {
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
}
std::string op = rjson::to_string(comparison_operator);
std::string op = comparison_operator.GetString();
auto it = ops.find(op);
if (it == ops.end()) {
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
}
if (kv1.name == "S") {
return cmp(rjson::to_string_view(kv1.value),
rjson::to_string_view(kv2.value));
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
}
if (kv1.name == "B") {
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(rjson::to_string_view(kv_v.value),
rjson::to_string_view(kv_lb.value),
rjson::to_string_view(kv_ub.value),
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
bounds_from_query);
}
if (kv_v.name == "B") {

View File

@@ -8,8 +8,6 @@
#include "consumed_capacity.hh"
#include "error.hh"
#include "utils/rjson.hh"
#include <fmt/format.h>
namespace alternator {
@@ -34,12 +32,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
if (!return_consumed->IsString()) {
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
}
std::string_view consumed = rjson::to_string_view(*return_consumed);
std::string consumed = return_consumed->GetString();
if (consumed == "INDEXES") {
throw api_error::validation("INDEXES consumed capacity is not supported");
}
if (consumed != "TOTAL") {
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
throw api_error::validation("Unknown consumed capacity "+ consumed);
}
return true;
}

View File

@@ -419,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
if (!table_name_value->IsString()) {
throw api_error::validation("Non-string TableName field in request");
}
std::string table_name = rjson::to_string(*table_name_value);
std::string table_name = table_name_value->GetString();
return table_name;
}
@@ -546,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
// does exist but the index does not (ValidationException).
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
@@ -587,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
attribute_name, value));
}
return rjson::to_string(*attribute_value);
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
}
// Convenience function for getting the value of a boolean attribute, or a
@@ -888,7 +888,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
schema_ptr schema = get_table(_proxy, request);
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
rjson::value response = rjson::empty_object();
@@ -989,7 +989,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
std::string table_name = get_table_name(request);
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
tracing::add_alternator_table_name(trace_state, table_name);
tracing::add_table_name(trace_state, keyspace_name, table_name);
auto& p = _proxy.container();
schema_ptr schema = get_table(_proxy, request);
@@ -1008,8 +1008,8 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
throw api_error::resource_not_found(fmt::format("Requested resource not found: Table: {} not found", table_name));
}
auto m = co_await service::prepare_column_family_drop_announcement(p.local(), keyspace_name, table_name, group0_guard.write_timestamp(), service::drop_views::yes);
auto m2 = co_await service::prepare_keyspace_drop_announcement(p.local(), keyspace_name, group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_drop_announcement(_proxy, keyspace_name, table_name, group0_guard.write_timestamp(), service::drop_views::yes);
auto m2 = co_await service::prepare_keyspace_drop_announcement(_proxy, keyspace_name, group0_guard.write_timestamp());
std::move(m2.begin(), m2.end(), std::back_inserter(m));
@@ -1080,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
}
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == name) {
auto type = attribute_info["AttributeType"].GetString();
data_type dt = parse_key_type(type);
if (computed_column) {
// Computed column for GSI (doesn't choose a real column as-is
@@ -1116,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First element of KeySchema must be an object");
}
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
throw api_error::validation("First key in KeySchema must be a HASH key");
}
v = rjson::find((*key_schema)[0], "AttributeName");
@@ -1124,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First key in KeySchema must have string AttributeName");
}
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
std::string hash_key = rjson::to_string(*v);
std::string hash_key = v->GetString();
std::string range_key;
if (key_schema->Size() == 2) {
if (!(*key_schema)[1].IsObject()) {
throw api_error::validation("Second element of KeySchema must be an object");
}
v = rjson::find((*key_schema)[1], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
throw api_error::validation("Second key in KeySchema must be a RANGE key");
}
v = rjson::find((*key_schema)[1], "AttributeName");
@@ -1583,7 +1583,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
std::unordered_set<std::string> unused_attribute_definitions =
validate_attribute_definitions("", *attribute_definitions);
tracing::add_alternator_table_name(trace_state, table_name);
tracing::add_table_name(trace_state, keyspace_name, table_name);
schema_builder builder(keyspace_name, table_name);
auto [hash_key, range_key] = parse_key_schema(request, "");
@@ -1865,10 +1865,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats, std::move(tablets_mode));
});
}
@@ -1887,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string def_type = type_to_string(def.type);
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
auto type = attribute_info["AttributeType"].GetString();
if (type != def_type) {
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
}
@@ -1930,7 +1930,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
schema_ptr tab = get_table(p.local(), request);
tracing::add_alternator_table_name(gt, tab->cf_name());
tracing::add_table_name(gt, tab->ks_name(), tab->cf_name());
// the ugly but harmless conversion to string_view here is because
// Seastar's sstring is missing a find(std::string_view) :-()
@@ -2223,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
// The put_or_delete_item class builds the mutations needed by the PutItem and
// DeleteItem operations - either as stand-alone commands or part of a list
// of commands in BatchWriteItem.
// of commands in BatchWriteItems.
// put_or_delete_item splits each operation into two stages: Constructing the
// object parses and validates the user input (throwing exceptions if there
// are input errors). Later, build() generates the actual mutation, with a
// specified timestamp. This split is needed because of the peculiar needs of
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
@@ -2362,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
@@ -2624,14 +2624,14 @@ std::optional<service::cas_shard> rmw_operation::shard_for_execute(bool needs_re
// Build the return value from the different RMW operations (UpdateItem,
// PutItem, DeleteItem). All these return nothing by default, but can
// optionally return Attributes if requested via the ReturnValues option.
static executor::request_return_type rmw_operation_return(rjson::value&& attributes, const consumed_capacity_counter& consumed_capacity, uint64_t& metric) {
static future<executor::request_return_type> rmw_operation_return(rjson::value&& attributes, const consumed_capacity_counter& consumed_capacity, uint64_t& metric) {
rjson::value ret = rjson::empty_object();
consumed_capacity.add_consumed_capacity_to_response_if_needed(ret);
metric += consumed_capacity.get_consumed_capacity_units();
if (!attributes.IsNull()) {
rjson::add(ret, "Attributes", std::move(attributes));
}
return rjson::print(std::move(ret));
return make_ready_future<executor::request_return_type>(rjson::print(std::move(ret)));
}
static future<std::unique_ptr<rjson::value>> get_previous_item(
@@ -2697,10 +2697,7 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
stats& global_stats,
stats& per_table_stats,
uint64_t& wcu_total) {
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = schema()->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
auto cdc_opts = cdc::per_request_options{};
if (needs_read_before_write) {
if (_write_isolation == write_isolation::FORBID_RMW) {
throw api_error::validation("Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
@@ -2739,13 +2736,13 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
auto read_command = needs_read_before_write ?
previous_item_read_command(proxy, schema(), _ck, selection) :
nullptr;
return proxy.cas(schema(), std::move(*cas_shard), *this, read_command, to_partition_ranges(*schema(), _pk),
return proxy.cas(schema(), std::move(*cas_shard), shared_from_this(), read_command, to_partition_ranges(*schema(), _pk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout, true, std::move(cdc_opts)).then([this, read_command, &wcu_total] (bool is_applied) mutable {
if (!is_applied) {
return make_ready_future<executor::request_return_type>(api_error::conditional_check_failed("The conditional request failed", std::move(_return_attributes)));
}
return make_ready_future<executor::request_return_type>(rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total));
return rmw_operation_return(std::move(_return_attributes), _consumed_capacity, wcu_total);
});
}
@@ -2783,10 +2780,10 @@ static void verify_all_are_used(const rjson::value* field,
return;
}
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
if (!used.contains(rjson::to_string(it->name))) {
if (!used.contains(it->name.GetString())) {
throw api_error::validation(
format("{} has spurious '{}', not used in {}",
field_name, rjson::to_string_view(it->name), operation));
field_name, it->name.GetString(), operation));
}
}
}
@@ -2859,7 +2856,7 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
elogger.trace("put_item {}", request);
auto op = make_shared<put_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
@@ -2963,7 +2960,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
auto op = make_shared<delete_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *(op->schema()));
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
@@ -3000,7 +2997,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
}
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
try {
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
} catch(data_dictionary::no_such_column_family&) {
@@ -3026,20 +3023,17 @@ struct primary_key_equal {
};
// This is a cas_request subclass for applying given put_or_delete_items to
// one partition using LWT as part as BatchWriteItem. This is a write-only
// one partition using LWT as part as BatchWriteItems. This is a write-only
// operation, not needing the previous value of the item (the mutation to be
// done is known prior to starting the operation). Nevertheless, we want to
// do this mutation via LWT to ensure that it is serialized with other LWT
// mutations to the same partition.
//
// The std::vector<put_or_delete_item> must remain alive until the
// storage_proxy::cas() future is resolved.
class put_or_delete_item_cas_request : public service::cas_request {
schema_ptr schema;
const std::vector<put_or_delete_item>& _mutation_builders;
std::vector<put_or_delete_item> _mutation_builders;
public:
put_or_delete_item_cas_request(schema_ptr s, const std::vector<put_or_delete_item>& b) :
schema(std::move(s)), _mutation_builders(b) { }
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
schema(std::move(s)), _mutation_builders(std::move(b)) { }
virtual ~put_or_delete_item_cas_request() = default;
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override {
std::optional<mutation> ret;
@@ -3055,48 +3049,17 @@ public:
}
};
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit)
{
if (!cas_shard.this_shard()) {
_stats.shard_bounce_for_lwt++;
return container().invoke_on(cas_shard.shard(), _ssg,
[cs = client_state.move_to_other_shard(),
&mb = mutation_builders,
&dk,
ks = schema->ks_name(),
cf = schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(executor& self) mutable {
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt), &self]
(service::client_state& client_state) mutable {
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
});
}
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
auto timeout = executor::default_timeout();
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
return proxy.cas(schema, std::move(cas_shard), op, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
timeout, timeout, true, std::move(cdc_opts)).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
// does not need to support conditional updates.
}
@@ -3118,11 +3081,13 @@ struct schema_decorated_key_equal {
// FIXME: if we failed writing some of the mutations, need to return a list
// of these failed mutations rather than fail the whole write (issue #5650).
future<> executor::do_batch_write(
static future<> do_batch_write(service::storage_proxy& proxy,
smp_service_group ssg,
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit) {
service_permit permit,
stats& stats) {
if (mutation_builders.empty()) {
return make_ready_future<>();
}
@@ -3139,62 +3104,64 @@ future<> executor::do_batch_write(
utils::chunked_vector<mutation> mutations;
mutations.reserve(mutation_builders.size());
api::timestamp_type now = api::new_timestamp();
bool any_cdc_enabled = false;
for (auto& b : mutation_builders) {
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
return proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
std::move(permit),
db::allow_per_partition_rate_limit::yes,
false,
cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
});
cdc::per_request_options{});
} else {
// Do the write via LWT:
// Multiple mutations may be destined for the same partition, adding
// or deleting different items of one partition. Join them together
// because we can do them in one cas() call.
using map_type = std::unordered_map<schema_decorated_key,
std::vector<put_or_delete_item>,
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto&& b : std::move(mutation_builders)) {
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
.schema = b.first,
.dk = dht::decorate_key(*b.first, b.second.pk())
});
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders.try_emplace(schema_decorated_key{b.first, dk});
it->second.push_back(std::move(b.second));
}
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
_stats.write_using_lwt++;
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
auto s = e.first.schema;
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, std::move(e.second), client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
mb = e.second,
dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
static const auto* injection_name = "alternator_executor_batch_write_wait";
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
const auto ks = handler.get("keyspace");
const auto cf = handler.get("table");
const auto shard = std::atoll(handler.get("shard")->data());
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
elogger.info("{}: hit", injection_name);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
elogger.info("{}: continue", injection_name);
}
}).then([&e, desired_shard = std::move(desired_shard),
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
{
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
});
}).finally([key_builders = std::move(key_builders)]{});
// The desired_shard on the original shard remains alive for the duration
// of cas_write on this shard and prevents any tablet operations.
// However, we need a local instance of cas_shard on this shard
// to pass it to sp::cas, so we just create a new one.
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, std::move(mb), client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
});
}
}
@@ -3237,7 +3204,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
per_table_stats->api_operations.batch_write_item++;
per_table_stats->api_operations.batch_write_item_batch_total += it->value.Size();
per_table_stats->api_operations.batch_write_item_histogram.add(it->value.Size());
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
1, primary_key_hash{schema}, primary_key_equal{schema});
@@ -3341,7 +3308,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
_stats.api_operations.batch_write_item_batch_total += total_items;
_stats.api_operations.batch_write_item_histogram.add(total_items);
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
// FIXME: Issue #5650: If we failed writing some of the updates,
// need to return a list of these failed updates in UnprocessedItems
// rather than fail the whole write (issue #5650).
@@ -3386,7 +3353,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
}
rjson::value newv = rjson::empty_object();
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
std::string attr = rjson::to_string(it->name);
std::string attr = it->name.GetString();
auto x = members.find(attr);
if (x != members.end()) {
if (x->second) {
@@ -3606,7 +3573,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
attribute_path_map_add("AttributesToGet", ret, it->GetString());
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
@@ -4272,12 +4239,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
// Note that it.key() is the name of the column, *it is the operation
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
const column_definition* cdef = _schema->get_column_definition(column_name);
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
}
std::string action = rjson::to_string((it->value)["Action"]);
std::string action = (it->value)["Action"].GetString();
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -4497,7 +4464,7 @@ future<executor::request_return_type> executor::update_item(client_state& client
elogger.trace("update_item {}", request);
auto op = make_shared<update_item_operation>(*_parsed_expression_cache, _proxy, std::move(request));
tracing::add_alternator_table_name(trace_state, op->schema()->cf_name());
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
@@ -4578,7 +4545,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
schema_ptr schema = get_table(_proxy, request);
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *schema);
per_table_stats->api_operations.get_item++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value& query_key = request["Key"];
db::consistency_level cl = get_read_consistency(request);
@@ -4727,7 +4694,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
uint batch_size = 0;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs(get_table_from_batch_request(_proxy, it));
tracing::add_alternator_table_name(trace_state, rs.schema->cf_name());
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
rs.cl = get_read_consistency(it->value);
std::unordered_set<std::string> used_attribute_names;
rs.attrs_to_get = ::make_shared<const std::optional<attrs_to_get>>(calculate_attrs_to_get(it->value, *_parsed_expression_cache, used_attribute_names));
@@ -5163,15 +5130,13 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
}
auto pos = paging_state.get_position_in_partition();
if (pos.has_key()) {
// Alternator itself allows at most one column in clustering key, but
// user can use Alternator api to access system tables which might have
// multiple clustering key columns. So we need to handle that case here.
auto cdef_it = schema.clustering_key_columns().begin();
for(const auto &exploded_ck : pos.key().explode()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef_it->name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef_it->name_as_text()];
rjson::add_with_string_name(key_entry, type_to_string(cdef_it->type), json_key_column_value(exploded_ck, *cdef_it));
++cdef_it;
auto exploded_ck = pos.key().explode();
auto exploded_ck_it = exploded_ck.begin();
for (const column_definition& cdef : schema.clustering_key_columns()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()];
rjson::add_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_ck_it, cdef));
++exploded_ck_it;
}
}
// To avoid possible conflicts (and thus having to reserve these names) we
@@ -5331,7 +5296,6 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
elogger.trace("Scanning {}", request);
auto [schema, table_type] = get_table_or_view(_proxy, request);
tracing::add_alternator_table_name(trace_state, schema->cf_name());
get_stats_from_schema(_proxy, *schema)->api_operations.scan++;
auto segment = get_int_attribute(request, "Segment");
auto total_segments = get_int_attribute(request, "TotalSegments");
@@ -5474,7 +5438,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
sstring key = rjson::to_sstring(it->name);
std::string key = it->name.GetString();
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
@@ -5482,13 +5446,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (key == pk_cdef.name_as_text()) {
if (sstring(key) == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
@@ -5811,7 +5775,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
auto [schema, table_type] = get_table_or_view(_proxy, request);
get_stats_from_schema(_proxy, *schema)->api_operations.query++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey");
db::consistency_level cl = get_read_consistency(request);
@@ -5889,7 +5853,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
int limit = limit_json ? limit_json->GetInt() : 100;
if (limit < 1 || limit > 100) {
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");

View File

@@ -40,7 +40,6 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
}
namespace cdc {
@@ -58,7 +57,6 @@ class schema_builder;
namespace alternator {
class rmw_operation;
class put_or_delete_item;
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
@@ -221,16 +219,6 @@ private:
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit);
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit);
public:
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);

View File

@@ -282,23 +282,15 @@ std::string type_to_string(data_type type) {
return it->second;
}
std::optional<bytes> try_get_key_column_value(const rjson::value& item, const column_definition& column) {
bytes get_key_column_value(const rjson::value& item, const column_definition& column) {
std::string column_name = column.name_as_text();
const rjson::value* key_typed_value = rjson::find(item, column_name);
if (!key_typed_value) {
return std::nullopt;
throw api_error::validation(fmt::format("Key column {} not found", column_name));
}
return get_key_from_typed_value(*key_typed_value, column);
}
bytes get_key_column_value(const rjson::value& item, const column_definition& column) {
auto value = try_get_key_column_value(item, column);
if (!value) {
throw api_error::validation(fmt::format("Key column {} not found", column.name_as_text()));
}
return std::move(*value);
}
// Parses the JSON encoding for a key value, which is a map with a single
// entry whose key is the type and the value is the encoded value.
// If this type does not match the desired "type_str", an api_error::validation
@@ -388,38 +380,20 @@ clustering_key ck_from_json(const rjson::value& item, schema_ptr schema) {
return clustering_key::make_empty();
}
std::vector<bytes> raw_ck;
// Note: it's possible to get more than one clustering column here, as
// Alternator can be used to read scylla internal tables.
// FIXME: this is a loop, but we really allow only one clustering key column.
for (const column_definition& cdef : schema->clustering_key_columns()) {
auto raw_value = get_key_column_value(item, cdef);
bytes raw_value = get_key_column_value(item, cdef);
raw_ck.push_back(std::move(raw_value));
}
return clustering_key::from_exploded(raw_ck);
}
clustering_key_prefix ck_prefix_from_json(const rjson::value& item, schema_ptr schema) {
if (schema->clustering_key_size() == 0) {
return clustering_key_prefix::make_empty();
}
std::vector<bytes> raw_ck;
for (const column_definition& cdef : schema->clustering_key_columns()) {
auto raw_value = try_get_key_column_value(item, cdef);
if (!raw_value) {
break;
}
raw_ck.push_back(std::move(*raw_value));
}
return clustering_key_prefix::from_exploded(raw_ck);
}
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema) {
const bool is_alternator_ks = is_alternator_keyspace(schema->ks_name());
if (is_alternator_ks) {
return position_in_partition::for_key(ck_from_json(item, schema));
auto ck = ck_from_json(item, schema);
if (is_alternator_keyspace(schema->ks_name())) {
return position_in_partition::for_key(std::move(ck));
}
const auto region_item = rjson::find(item, scylla_paging_region);
const auto weight_item = rjson::find(item, scylla_paging_weight);
if (bool(region_item) != bool(weight_item)) {
@@ -439,9 +413,8 @@ position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema)
} else {
throw std::runtime_error(fmt::format("Invalid value for weight: {}", weight_view));
}
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(ck_prefix_from_json(item, schema)) : std::nullopt);
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(std::move(ck)) : std::nullopt);
}
auto ck = ck_from_json(item, schema);
if (ck.is_empty()) {
return position_in_partition::for_partition_start();
}
@@ -496,7 +469,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
return {"", nullptr};
}
auto it = v.MemberBegin();
const std::string it_key = rjson::to_string(it->name);
const std::string it_key = it->name.GetString();
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
return {std::move(it_key), nullptr};
}

View File

@@ -13,7 +13,6 @@
#include <seastar/http/function_handlers.hh>
#include <seastar/http/short_streams.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/short_streams.hh>
#include "seastarx.hh"
@@ -33,7 +32,6 @@
#include "utils/aws_sigv4.hh"
#include "client_data.hh"
#include "utils/updateable_value.hh"
#include <zlib.h>
static logging::logger slogger("alternator-server");
@@ -553,106 +551,6 @@ read_entire_stream(input_stream<char>& inp, size_t length_limit) {
co_return ret;
}
// safe_gzip_stream is an exception-safe wrapper for zlib's z_stream.
// The "z_stream" struct is used by zlib to hold state while decompressing a
// stream of data. It allocates memory which must be freed with inflateEnd(),
// which the destructor of this class does.
class safe_gzip_zstream {
z_stream _zs;
public:
safe_gzip_zstream() {
memset(&_zs, 0, sizeof(_zs));
// The strange 16 + WMAX_BITS tells zlib to expect and decode
// a gzip header, not a zlib header.
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
}
~safe_gzip_zstream() {
inflateEnd(&_zs);
}
z_stream* operator->() {
return &_zs;
}
z_stream* get() {
return &_zs;
}
void reset() {
inflateReset(&_zs);
}
};
// ungzip() takes a chunked_content with a gzip-compressed request body,
// uncompresses it, and returns the uncompressed content as a chunked_content.
// If the uncompressed content exceeds length_limit, an error is thrown.
static future<chunked_content>
ungzip(chunked_content&& compressed_body, size_t length_limit) {
chunked_content ret;
// output_buf can be any size - when uncompressing input_buf, it doesn't
// need to fit in a single output_buf, we'll use multiple output_buf for
// a single input_buf if needed.
constexpr size_t OUTPUT_BUF_SIZE = 4096;
temporary_buffer<char> output_buf;
safe_gzip_zstream strm;
bool complete_stream = false; // empty input is not a valid gzip
size_t total_out_bytes = 0;
for (const temporary_buffer<char>& input_buf : compressed_body) {
if (input_buf.empty()) {
continue;
}
complete_stream = false;
strm->next_in = (Bytef*) input_buf.get();
strm->avail_in = (uInt) input_buf.size();
do {
co_await coroutine::maybe_yield();
if (output_buf.empty()) {
output_buf = temporary_buffer<char>(OUTPUT_BUF_SIZE);
}
strm->next_out = (Bytef*) output_buf.get();
strm->avail_out = OUTPUT_BUF_SIZE;
int e = inflate(strm.get(), Z_NO_FLUSH);
size_t out_bytes = OUTPUT_BUF_SIZE - strm->avail_out;
if (out_bytes > 0) {
// If output_buf is nearly full, we save it as-is in ret. But
// if it only has little data, better copy to a small buffer.
if (out_bytes > OUTPUT_BUF_SIZE/2) {
ret.push_back(std::move(output_buf).prefix(out_bytes));
// output_buf is now empty. if this loop finds more input,
// we'll allocate a new output buffer.
} else {
ret.push_back(temporary_buffer<char>(output_buf.get(), out_bytes));
}
total_out_bytes += out_bytes;
if (total_out_bytes > length_limit) {
throw api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", length_limit));
}
}
if (e == Z_STREAM_END) {
// There may be more input after the first gzip stream - in
// either this input_buf or the next one. The additional input
// should be a second concatenated gzip. We need to allow that
// by resetting the gzip stream and continuing the input loop
// until there's no more input.
strm.reset();
if (strm->avail_in == 0) {
complete_stream = true;
break;
}
} else if (e != Z_OK && e != Z_BUF_ERROR) {
// DynamoDB returns an InternalServerError when given a bad
// gzip request body. See test test_broken_gzip_content
throw api_error::internal("Error during gzip decompression of request body");
}
} while (strm->avail_in > 0 || strm->avail_out == 0);
}
if (!complete_stream) {
// The gzip stream was not properly finished with Z_STREAM_END
throw api_error::internal("Truncated gzip in request body");
}
co_return ret;
}
future<executor::request_return_type> server::handle_api_request(std::unique_ptr<request> req) {
_executor._stats.total_operations++;
sstring target = req->get_header("X-Amz-Target");
@@ -690,21 +588,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
units.return_units(mem_estimate - new_mem_estimate);
}
auto username = co_await verify_signature(*req, content);
// If the request is compressed, uncompress it now, after we checked
// the signature (the signature is computed on the compressed content).
// We apply the request_content_length_limit again to the uncompressed
// content - we don't want to allow a tiny compressed request to
// expand to a huge uncompressed request.
sstring content_encoding = req->get_header("Content-Encoding");
if (content_encoding == "gzip") {
content = co_await ungzip(std::move(content), request_content_length_limit);
} else if (!content_encoding.empty()) {
// DynamoDB returns a 500 error for unsupported Content-Encoding.
// I'm not sure if this is the best error code, but let's do it too.
// See the test test_garbage_content_encoding confirming this case.
co_return api_error::internal("Unsupported Content-Encoding");
}
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().

View File

@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
}
sstring attribute_name = rjson::to_sstring(*v);
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {

View File

@@ -106,8 +106,5 @@ target_link_libraries(api
wasmtime_bindings
absl::headers)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(api REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers api
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -3051,7 +3051,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -66,13 +66,6 @@ static future<json::json_return_type> get_cf_stats(sharded<replica::database>&
}, std::plus<int64_t>());
}
static future<json::json_return_type> get_cf_stats(sharded<replica::database>& db,
std::function<int64_t(const replica::column_family_stats&)> f) {
return map_reduce_cf(db, int64_t(0), [f](const replica::column_family& cf) {
return f(cf.get_stats());
}, std::plus<int64_t>());
}
static future<json::json_return_type> for_tables_on_all_shards(sharded<replica::database>& db, std::vector<table_info> tables, std::function<future<>(replica::table&)> set) {
return do_with(std::move(tables), [&db, set] (const std::vector<table_info>& tables) {
return db.invoke_on_all([&tables, set] (replica::database& db) {
@@ -1073,14 +1066,10 @@ void set_column_family(http_context& ctx, routes& r, sharded<replica::database>&
});
ss::get_load.set(r, [&db] (std::unique_ptr<http::request> req) {
return get_cf_stats(db, [](const replica::column_family_stats& stats) {
return stats.live_disk_space_used.on_disk;
});
return get_cf_stats(db, &replica::column_family_stats::live_disk_space_used);
});
ss::get_metrics_load.set(r, [&db] (std::unique_ptr<http::request> req) {
return get_cf_stats(db, [](const replica::column_family_stats& stats) {
return stats.live_disk_space_used.on_disk;
});
return get_cf_stats(db, &replica::column_family_stats::live_disk_space_used);
});
ss::get_keyspaces.set(r, [&db] (const_req req) {

View File

@@ -17,7 +17,4 @@ target_link_libraries(scylla_audit
PRIVATE
cql3)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(scylla_audit REUSE_FROM scylla-precompiled-header)
endif()
add_whole_archive(audit scylla_audit)

View File

@@ -9,7 +9,6 @@ target_sources(scylla_auth
allow_all_authorizer.cc
authenticated_user.cc
authenticator.cc
cache.cc
certificate_authenticator.cc
common.cc
default_authorizer.cc
@@ -45,8 +44,5 @@ target_link_libraries(scylla_auth
add_whole_archive(auth scylla_auth)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(scylla_auth REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers scylla_auth
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -23,7 +23,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -12,7 +12,6 @@
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
@@ -30,7 +29,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&) {
}
virtual future<> start() override {

View File

@@ -1,180 +0,0 @@
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "auth/cache.hh"
#include "auth/common.hh"
#include "auth/roles-metadata.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "db/consistency_level_type.hh"
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
auto it = _roles.find(role);
if (it == _roles.end()) {
return {};
}
return it->second;
}
future<lw_shared_ptr<cache::role_record>> cache::fetch_role(const role_name_t& role) const {
auto rec = make_lw_shared<role_record>();
rec->version = _current_version;
auto fetch = [this, &role](const sstring& q) {
return _qp.execute_internal(q, db::consistency_level::LOCAL_ONE,
internal_distributed_query_state(), {role},
cql3::query_processor::cache_internal::yes);
};
// roles
{
static const sstring q = format("SELECT * FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, meta::roles_table::name);
auto rs = co_await fetch(q);
if (!rs->empty()) {
auto& r = rs->one();
rec->is_superuser = r.get_or<bool>("is_superuser", false);
rec->can_login = r.get_or<bool>("can_login", false);
rec->salted_hash = r.get_or<sstring>("salted_hash", "");
if (r.has("member_of")) {
auto mo = r.get_set<sstring>("member_of");
rec->member_of.insert(
std::make_move_iterator(mo.begin()),
std::make_move_iterator(mo.end()));
}
} else {
// role got deleted
co_return nullptr;
}
}
// members
{
static const sstring q = format("SELECT role, member FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_MEMBERS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->members.insert(r.get_as<sstring>("member"));
co_await coroutine::maybe_yield();
}
}
// attributes
{
static const sstring q = format("SELECT role, name, value FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, ROLE_ATTRIBUTES_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
rec->attributes[r.get_as<sstring>("name")] =
r.get_as<sstring>("value");
co_await coroutine::maybe_yield();
}
}
// permissions
{
static const sstring q = format("SELECT role, resource, permissions FROM {}.{} WHERE role = ?", db::system_keyspace::NAME, PERMISSIONS_CF);
auto rs = co_await fetch(q);
for (const auto& r : *rs) {
auto resource = r.get_as<sstring>("resource");
auto perms_strings = r.get_set<sstring>("permissions");
std::unordered_set<sstring> perms_set(perms_strings.begin(), perms_strings.end());
auto pset = permissions::from_strings(perms_set);
rec->permissions[std::move(resource)] = std::move(pset);
co_await coroutine::maybe_yield();
}
}
co_return rec;
}
future<> cache::prune_all() noexcept {
for (auto it = _roles.begin(); it != _roles.end(); ) {
if (it->second->version != _current_version) {
_roles.erase(it++);
co_await coroutine::maybe_yield();
} else {
++it;
}
}
co_return;
}
future<> cache::load_all() {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
++_current_version;
logger.info("Loading all roles");
const uint32_t page_size = 128;
auto loader = [this](const cql3::untyped_result_set::row& r) -> future<stop_iteration> {
const auto name = r.get_as<sstring>("role");
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
}
co_return stop_iteration::no;
};
co_await _qp.query_internal(format("SELECT * FROM {}.{}",
db::system_keyspace::NAME, meta::roles_table::name),
db::consistency_level::LOCAL_ONE, {}, page_size, loader);
co_await prune_all();
for (const auto& [name, role] : _roles) {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
c._current_version = _current_version;
co_await c.prune_all();
});
}
future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);
if (role) {
_roles[name] = role;
} else {
_roles.erase(name);
}
co_await distribute_role(name, role);
}
}
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
if (!role_ptr) {
c._roles.erase(name);
return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c._roles[name] = std::move(role_copy);
});
}
bool cache::includes_table(const table_id& id) noexcept {
return id == db::system_keyspace::roles()->id()
|| id == db::system_keyspace::role_members()->id()
|| id == db::system_keyspace::role_attributes()->id()
|| id == db::system_keyspace::role_permissions()->id();
}
} // namespace auth

View File

@@ -1,61 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <absl/container/flat_hash_map.h>
#include "auth/permission.hh"
#include "auth/common.hh"
namespace cql3 { class query_processor; }
namespace auth {
class cache : public peering_sharded_service<cache> {
public:
using role_name_t = sstring;
using version_tag_t = char;
struct role_record {
bool can_login = false;
bool is_superuser = false;
std::unordered_set<role_name_t> member_of;
std::unordered_set<role_name_t> members;
sstring salted_hash;
std::unordered_map<sstring, sstring> attributes;
std::unordered_map<sstring, permission_set> permissions;
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
static bool includes_table(const table_id&) noexcept;
private:
using roles_map = absl::flat_hash_map<role_name_t, lw_shared_ptr<role_record>>;
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;
future<> distribute_role(const role_name_t& name, const lw_shared_ptr<role_record> role);
};
} // namespace auth

View File

@@ -8,7 +8,6 @@
*/
#include "auth/certificate_authenticator.hh"
#include "auth/cache.hh"
#include <boost/regex.hpp>
#include <fmt/ranges.h>
@@ -35,14 +34,13 @@ static const class_registrator<auth::authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();
@@ -77,9 +75,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
}
continue;
} catch (const std::out_of_range&) {
} catch (std::out_of_range&) {
// just fallthrough
} catch (const boost::regex_error&) {
} catch (boost::regex_error&) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
}
}

View File

@@ -26,15 +26,13 @@ class raft_group0_client;
namespace auth {
class cache;
extern const std::string_view certificate_authenticator_name;
class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
~certificate_authenticator();
future<> start() override;

View File

@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
try {
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
} catch (const exceptions::already_exists_exception&) {}
} catch (exceptions::already_exists_exception&) {}
}
}

View File

@@ -48,10 +48,6 @@ extern constinit const std::string_view AUTH_PACKAGE_NAME;
} // namespace meta
constexpr std::string_view PERMISSIONS_CF = "role_permissions";
constexpr std::string_view ROLE_MEMBERS_CF = "role_members";
constexpr std::string_view ROLE_ATTRIBUTES_CF = "role_attributes";
// This is a helper to check whether auth-v2 is on.
bool legacy_mode(cql3::query_processor& qp);

View File

@@ -37,6 +37,7 @@ std::string_view default_authorizer::qualified_java_name() const {
static constexpr std::string_view ROLE_NAME = "role";
static constexpr std::string_view RESOURCE_NAME = "resource";
static constexpr std::string_view PERMISSIONS_NAME = "permissions";
static constexpr std::string_view PERMISSIONS_CF = "role_permissions";
static logging::logger alogger("default_authorizer");
@@ -256,7 +257,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
} else {
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
}
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
}
}
@@ -293,13 +294,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
[resource](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}

View File

@@ -83,18 +83,17 @@ static const class_registrator<
ldap_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(ldap_role_manager_full_name);
::service::migration_manager&> registration(ldap_role_manager_full_name);
ldap_role_manager::ldap_role_manager(
std::string_view query_template, std::string_view target_attr, std::string_view bind_name, std::string_view bind_password,
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
: _std_mgr(qp, rg0c, mm, cache), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
: _std_mgr(qp, rg0c, mm), _group0_client(rg0c), _query_template(query_template), _target_attr(target_attr), _bind_name(bind_name)
, _bind_password(bind_password)
, _connection_factory(bind(std::mem_fn(&ldap_role_manager::reconnect), std::ref(*this))) {
}
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache)
ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm)
: ldap_role_manager(
qp.db().get_config().ldap_url_template(),
qp.db().get_config().ldap_attr_role(),
@@ -102,8 +101,7 @@ ldap_role_manager::ldap_role_manager(cql3::query_processor& qp, ::service::raft_
qp.db().get_config().ldap_bind_passwd(),
qp,
rg0c,
mm,
cache) {
mm) {
}
std::string_view ldap_role_manager::qualified_java_name() const noexcept {

View File

@@ -14,7 +14,6 @@
#include "ent/ldap/ldap_connection.hh"
#include "standard_role_manager.hh"
#include "auth/cache.hh"
namespace auth {
@@ -44,13 +43,12 @@ class ldap_role_manager : public role_manager {
std::string_view bind_password, ///< LDAP bind credentials.
cql3::query_processor& qp, ///< Passed to standard_role_manager.
::service::raft_group0_client& rg0c, ///< Passed to standard_role_manager.
::service::migration_manager& mm, ///< Passed to standard_role_manager.
cache& cache ///< Passed to standard_role_manager.
::service::migration_manager& mm ///< Passed to standard_role_manager.
);
/// Retrieves LDAP configuration entries from qp and invokes the other constructor. Required by
/// class_registrator<role_manager>.
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm, cache& cache);
ldap_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& rg0c, ::service::migration_manager& mm);
/// Thrown when query-template parsing fails.
struct url_error : public std::runtime_error {

View File

@@ -11,7 +11,6 @@
#include <seastar/core/future.hh>
#include <stdexcept>
#include <string_view>
#include "auth/cache.hh"
#include "cql3/description.hh"
#include "utils/class_registrator.hh"
@@ -24,8 +23,7 @@ static const class_registrator<
maintenance_socket_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration(sstring{maintenance_socket_role_manager_name});
::service::migration_manager&> registration(sstring{maintenance_socket_role_manager_name});
std::string_view maintenance_socket_role_manager::qualified_java_name() const noexcept {

View File

@@ -8,7 +8,6 @@
#pragma once
#include "auth/cache.hh"
#include "auth/resource.hh"
#include "auth/role_manager.hh"
#include <seastar/core/future.hh>
@@ -30,7 +29,7 @@ extern const std::string_view maintenance_socket_role_manager_name;
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
class maintenance_socket_role_manager final : public role_manager {
public:
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {}
maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {}
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -49,7 +49,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -64,11 +63,10 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
, _hashing_worker(hashing_worker)
@@ -317,33 +315,24 @@ future<authenticated_user> password_authenticator::authenticate(
const sstring password = credentials.at(PASSWORD_KEY);
try {
std::optional<sstring> salted_hash;
if (legacy_mode(_qp)) {
salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
} else {
auto role = _cache.get(username);
if (!role || role->salted_hash.empty()) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
salted_hash = role->salted_hash;
const std::optional<sstring> salted_hash = co_await get_password_hash(username);
if (!salted_hash) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash = std::move(salted_hash)]{
return passwords::check(password, *salted_hash);
});
if (!password_match) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
co_return username;
} catch (const std::system_error &) {
} catch (std::system_error &) {
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (const exceptions::authentication_exception& e) {
} catch (exceptions::authentication_exception& e) {
std::throw_with_nested(e);
} catch (const exceptions::unavailable_exception& e) {
} catch (exceptions::unavailable_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));

View File

@@ -16,7 +16,6 @@
#include "db/consistency_level_type.hh"
#include "auth/authenticator.hh"
#include "auth/passwords.hh"
#include "auth/cache.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
@@ -42,7 +41,6 @@ class password_authenticator : public authenticator {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser; // default superuser name from the config (may or may not be present in roles table)
@@ -55,7 +53,7 @@ public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
~password_authenticator();

View File

@@ -35,10 +35,9 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -11,7 +11,6 @@
#pragma once
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
@@ -30,7 +29,7 @@ namespace auth {
class saslauthd_authenticator : public authenticator {
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
public:
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, utils::alien_worker&);
future<> start() override;

View File

@@ -17,7 +17,6 @@
#include <chrono>
#include <seastar/core/future-util.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
@@ -158,7 +157,6 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
service::service(
utils::loading_cache_config c,
cache& cache,
cql3::query_processor& qp,
::service::raft_group0_client& g0,
::service::migration_notifier& mn,
@@ -168,7 +166,6 @@ service::service(
maintenance_socket_enabled used_by_maintenance_socket)
: _loading_cache_config(std::move(c))
, _permissions_cache(nullptr)
, _cache(cache)
, _qp(qp)
, _group0_client(g0)
, _mnotifier(mn)
@@ -191,17 +188,15 @@ service::service(
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache,
utils::alien_worker& hashing_worker)
: service(
std::move(c),
cache,
qp,
g0,
mn,
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm),
used_by_maintenance_socket) {
}
@@ -226,7 +221,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (const ::service::group0_concurrent_modification&) {
} catch (::service::group0_concurrent_modification&) {
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}
@@ -237,9 +232,6 @@ future<> service::start(::service::migration_manager& mm, db::system_keyspace& s
auto auth_version = co_await sys_ks.get_auth_version();
// version is set in query processor to be easily available in various places we call auth::legacy_mode check.
_qp.auth_version = auth_version;
if (this_shard_id() == 0) {
co_await _cache.load_all();
}
if (!_used_by_maintenance_socket) {
// this legacy keyspace is only used by cqlsh
// it's needed when executing `list roles` or `list users`

View File

@@ -21,7 +21,6 @@
#include "auth/authorizer.hh"
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "auth/cache.hh"
#include "auth/role_manager.hh"
#include "auth/common.hh"
#include "cql3/description.hh"
@@ -78,7 +77,6 @@ public:
class service final : public seastar::peering_sharded_service<service> {
utils::loading_cache_config _loading_cache_config;
std::unique_ptr<permissions_cache> _permissions_cache;
cache& _cache;
cql3::query_processor& _qp;
@@ -109,7 +107,6 @@ class service final : public seastar::peering_sharded_service<service> {
public:
service(
utils::loading_cache_config,
cache& cache,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_notifier&,
@@ -131,7 +128,6 @@ public:
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&,
utils::alien_worker&);
future<> start(::service::migration_manager&, db::system_keyspace&);

View File

@@ -41,6 +41,21 @@
namespace auth {
namespace meta {
namespace role_members_table {
constexpr std::string_view name{"role_members" , 12};
}
namespace role_attributes_table {
constexpr std::string_view name{"role_attributes", 15};
}
}
static logging::logger log("standard_role_manager");
@@ -49,8 +64,7 @@ static const class_registrator<
standard_role_manager,
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.CassandraRoleManager");
::service::migration_manager&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
sstring name;
@@ -107,11 +121,10 @@ static bool has_can_login(const cql3::untyped_result_set_row& row) {
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob_unfragmented("can_login")).is_null());
}
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
{}
@@ -123,7 +136,7 @@ std::string_view standard_role_manager::qualified_java_name() const noexcept {
const resource_set& standard_role_manager::protected_resources() const {
static const resource_set resources({
make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name),
make_data_resource(meta::legacy::AUTH_KS, ROLE_MEMBERS_CF)});
make_data_resource(meta::legacy::AUTH_KS, meta::role_members_table::name)});
return resources;
}
@@ -147,7 +160,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY (role, member)"
")",
meta::legacy::AUTH_KS,
ROLE_MEMBERS_CF);
meta::role_members_table::name);
static const sstring create_role_attributes_query = seastar::format(
"CREATE TABLE {}.{} ("
" role text,"
@@ -156,7 +169,7 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
" PRIMARY KEY(role, name)"
")",
meta::legacy::AUTH_KS,
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
return when_all_succeed(
create_legacy_metadata_table_if_missing(
meta::roles_table::name,
@@ -164,12 +177,12 @@ future<> standard_role_manager::create_legacy_metadata_tables_if_missing() const
create_roles_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
ROLE_MEMBERS_CF,
meta::role_members_table::name,
_qp,
create_role_members_query,
_migration_manager),
create_legacy_metadata_table_if_missing(
ROLE_ATTRIBUTES_CF,
meta::role_attributes_table::name,
_qp,
create_role_attributes_query,
_migration_manager)).discard_result();
@@ -192,7 +205,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
log.info("Created default superuser role '{}'.", _superuser);
} catch (const exceptions::unavailable_exception& e) {
} catch(const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}
@@ -416,7 +429,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto revoke_from_members = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("SELECT member FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
const auto members = co_await _qp.execute_internal(
query,
consistency_for_role(role_name),
@@ -448,7 +461,7 @@ future<> standard_role_manager::drop(std::string_view role_name, ::service::grou
const auto remove_attributes_of = [this, role_name, &mc] () -> future<> {
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name)},
cql3::query_processor::cache_internal::yes).discard_result();
@@ -504,7 +517,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::add: {
const sstring insert_query = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
co_return co_await _qp.execute_internal(
insert_query,
consistency_for_role(role_name),
@@ -516,7 +529,7 @@ standard_role_manager::legacy_modify_membership(
case membership_change::remove: {
const sstring delete_query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
co_return co_await _qp.execute_internal(
delete_query,
consistency_for_role(role_name),
@@ -554,12 +567,12 @@ standard_role_manager::modify_membership(
case membership_change::add:
modify_role_members = seastar::format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
break;
case membership_change::remove:
modify_role_members = seastar::format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
break;
default:
on_internal_error(log, format("unknown membership_change value: {}", int(ch)));
@@ -653,7 +666,7 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
const sstring query = seastar::format("SELECT * FROM {}.{}",
get_auth_ks_name(_qp),
ROLE_MEMBERS_CF);
meta::role_members_table::name);
const auto results = co_await _qp.execute_internal(
query,
@@ -718,21 +731,15 @@ future<bool> standard_role_manager::is_superuser(std::string_view role_name) {
}
future<bool> standard_role_manager::can_login(std::string_view role_name) {
if (legacy_mode(_qp)) {
const auto r = co_await require_record(_qp, role_name);
co_return r.can_login;
}
auto role = _cache.get(sstring(role_name));
if (!role) {
throw nonexistant_role(role_name);
}
co_return role->can_login;
return require_record(_qp, role_name).then([](record r) {
return r.can_login;
});
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
if (!result_set->empty()) {
const cql3::untyped_result_set_row &row = result_set->one();
@@ -763,7 +770,7 @@ future<> standard_role_manager::set_attribute(std::string_view role_name, std::s
}
const sstring query = seastar::format("INSERT INTO {}.{} (role, name, value) VALUES (?, ?, ?)",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {
@@ -778,7 +785,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
}
const sstring query = seastar::format("DELETE FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
ROLE_ATTRIBUTES_CF);
meta::role_attributes_table::name);
if (legacy_mode(_qp)) {
co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
} else {

View File

@@ -10,7 +10,6 @@
#include "auth/common.hh"
#include "auth/role_manager.hh"
#include "auth/cache.hh"
#include <string_view>
@@ -37,14 +36,13 @@ class standard_role_manager final : public role_manager {
cql3::query_processor& _qp;
::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
cache& _cache;
future<> _stopped;
abort_source _as;
std::string _superuser;
shared_promise<> _superuser_created_promise;
public:
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
virtual std::string_view qualified_java_name() const noexcept override;

View File

@@ -13,7 +13,6 @@
#include "auth/authorizer.hh"
#include "auth/default_authorizer.hh"
#include "auth/password_authenticator.hh"
#include "auth/cache.hh"
#include "auth/permission.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
@@ -38,8 +37,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, hashing_worker)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -81,7 +80,7 @@ public:
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -126,7 +125,7 @@ public:
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
_complete = true;
return {};
}
@@ -141,7 +140,7 @@ public:
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -241,7 +240,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&,
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<

View File

@@ -15,7 +15,6 @@
#include <cmath>
#include "seastarx.hh"
#include "backlog_controller_fwd.hh"
// Simple proportional controller to adjust shares for processes for which a backlog can be clearly
// defined.
@@ -129,21 +128,11 @@ public:
static constexpr unsigned normalization_factor = 30;
static constexpr float disable_backlog = std::numeric_limits<double>::infinity();
static constexpr float backlog_disabled(float backlog) { return std::isinf(backlog); }
static inline const std::vector<backlog_controller::control_point> default_control_points = {
backlog_controller::control_point{0.0, 50}, {1.5, 100}, {normalization_factor, default_compaction_maximum_shares}};
compaction_controller(backlog_controller::scheduling_group sg, float static_shares, std::optional<float> max_shares,
std::chrono::milliseconds interval, std::function<float()> current_backlog)
compaction_controller(backlog_controller::scheduling_group sg, float static_shares, std::chrono::milliseconds interval, std::function<float()> current_backlog)
: backlog_controller(std::move(sg), std::move(interval),
default_control_points,
std::vector<backlog_controller::control_point>({{0.0, 50}, {1.5, 100} , {normalization_factor, 1000}}),
std::move(current_backlog),
static_shares
)
{
if (max_shares) {
set_max_shares(*max_shares);
}
}
// Updates the maximum output value for control points.
void set_max_shares(float max_shares);
{}
};

View File

@@ -1,13 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <cstdint>
static constexpr uint64_t default_compaction_maximum_shares = 1000;

View File

@@ -17,8 +17,5 @@ target_link_libraries(cdc
PRIVATE
replica)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(cdc REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers cdc
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -25,7 +25,6 @@
#include "locator/abstract_replication_strategy.hh"
#include "locator/topology.hh"
#include "replica/database.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "gms/feature_service.hh"
#include "schema/schema.hh"
@@ -587,9 +586,11 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
}
static void set_default_properties_log_table(schema_builder& b, const schema& s,
const replica::database& db, const keyspace_metadata& ksm)
static schema_ptr create_log_schema(const schema& s, const replica::database& db,
const keyspace_metadata& ksm, api::timestamp_type timestamp, std::optional<table_id> uuid, schema_ptr old)
{
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner(cdc::cdc_partitioner::classname);
b.set_compaction_strategy(compaction::compaction_strategy_type::time_window);
b.set_comment(fmt::format("CDC log for {}.{}", s.ks_name(), s.cf_name()));
auto ttl_seconds = s.cdc_options().ttl();
@@ -615,22 +616,13 @@ static void set_default_properties_log_table(schema_builder& b, const schema& s,
std::to_string(std::max(1, window_seconds / 2))},
});
}
b.set_caching_options(caching_options::get_disabled_caching_options());
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, db.get_token_metadata(), false));
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
}
static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
const api::timestamp_type timestamp, const schema_ptr old)
{
b.with_column(log_meta_column_name_bytes("stream_id"), bytes_type, column_kind::partition_key);
b.with_column(log_meta_column_name_bytes("time"), timeuuid_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
b.set_caching_options(caching_options::get_disabled_caching_options());
auto validate_new_column = [&] (const sstring& name) {
// When dropping a column from a CDC log table, we set the drop timestamp to be
@@ -700,28 +692,15 @@ static void add_columns_to_cdc_log(schema_builder& b, const schema& s,
add_columns(s.clustering_key_columns());
add_columns(s.static_columns(), true);
add_columns(s.regular_columns(), true);
}
static schema_ptr create_log_schema(const schema& s, const replica::database& db,
const keyspace_metadata& ksm, api::timestamp_type timestamp, std::optional<table_id> uuid, schema_ptr old)
{
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner(cdc::cdc_partitioner::classname);
if (old) {
// If the user reattaches the log table, do not change its properties.
b.set_properties(old->get_properties());
} else {
set_default_properties_log_table(b, s, db, ksm);
}
add_columns_to_cdc_log(b, s, timestamp, old);
if (uuid) {
b.set_uuid(*uuid);
}
auto rs = generate_replication_strategy(ksm, db.get_token_metadata().get_topology());
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(*rs, db.get_token_metadata()));
b.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
/**
* #10473 - if we are redefining the log table, we need to ensure any dropped
* columns are registered in "dropped_columns" table, otherwise clients will not
@@ -952,6 +931,9 @@ static managed_bytes merge(const abstract_type& type, const managed_bytes_opt& p
throw std::runtime_error(format("cdc merge: unknown type {}", type.name()));
}
using cell_map = std::unordered_map<const column_definition*, managed_bytes_opt>;
using row_states_map = std::unordered_map<clustering_key, cell_map, clustering_key::hashing, clustering_key::equality>;
static managed_bytes_opt get_col_from_row_state(const cell_map* state, const column_definition& cdef) {
if (state) {
if (auto it = state->find(&cdef); it != state->end()) {
@@ -961,12 +943,7 @@ static managed_bytes_opt get_col_from_row_state(const cell_map* state, const col
return std::nullopt;
}
cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck) {
auto it = row_states.find(ck);
return it == row_states.end() ? nullptr : &it->second;
}
const cell_map* get_row_state(const row_states_map& row_states, const clustering_key& ck) {
static cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck) {
auto it = row_states.find(ck);
return it == row_states.end() ? nullptr : &it->second;
}
@@ -1436,8 +1413,6 @@ struct process_change_visitor {
row_states_map& _clustering_row_states;
cell_map& _static_row_state;
const bool _is_update = false;
const bool _generate_delta_values = true;
void static_row_cells(auto&& visit_row_cells) {
@@ -1461,13 +1436,12 @@ struct process_change_visitor {
struct clustering_row_cells_visitor : public process_row_visitor {
operation _cdc_op = operation::update;
operation _marker_op = operation::insert;
using process_row_visitor::process_row_visitor;
void marker(const row_marker& rm) {
_ttl_column = get_ttl(rm);
_cdc_op = _marker_op;
_cdc_op = operation::insert;
}
};
@@ -1475,9 +1449,6 @@ struct process_change_visitor {
log_ck, _touched_parts, _builder,
_enable_updating_state, &ckey, get_row_state(_clustering_row_states, ckey),
_clustering_row_states, _generate_delta_values);
if (_is_update && _request_options.alternator) {
v._marker_op = operation::update;
}
visit_row_cells(v);
if (_enable_updating_state) {
@@ -1631,11 +1602,6 @@ private:
row_states_map _clustering_row_states;
cell_map _static_row_state;
// True if the mutated row existed before applying the mutation. In other
// words, if the preimage is enabled and it isn't empty (otherwise, we
// assume that the row is non-existent). Used for Alternator Streams (see
// #6918).
bool _is_update = false;
const bool _uses_tablets;
@@ -1762,7 +1728,6 @@ public:
._enable_updating_state = _enable_updating_state,
._clustering_row_states = _clustering_row_states,
._static_row_state = _static_row_state,
._is_update = _is_update,
._generate_delta_values = generate_delta_values(_builder->base_schema())
};
cdc::inspect_mutation(m, v);
@@ -1773,10 +1738,6 @@ public:
_builder->end_record();
}
const row_states_map& clustering_row_states() const override {
return _clustering_row_states;
}
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<utils::chunked_vector<mutation>, stats::part_type_set> finish() && {
@@ -1900,7 +1861,6 @@ public:
_static_row_state[&c] = std::move(*maybe_cell_view);
}
}
_is_update = true;
}
if (static_only) {
@@ -1988,7 +1948,6 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
return make_ready_future<>();
}
const bool alternator_increased_compatibility = options.alternator && options.alternator_streams_increased_compatibility;
transformer trans(_ctxt, s, m.decorated_key(), options);
auto f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(nullptr);
@@ -1996,7 +1955,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
// Preimage has been fetched by upper layers.
tracing::trace(tr_state, "CDC: Using a prefetched preimage");
f = make_ready_future<lw_shared_ptr<cql3::untyped_result_set>>(options.preimage);
} else if (s->cdc_options().preimage() || s->cdc_options().postimage() || alternator_increased_compatibility) {
} else if (s->cdc_options().preimage() || s->cdc_options().postimage()) {
// Note: further improvement here would be to coalesce the pre-image selects into one
// if a batch contains several modifications to the same table. Otoh, batch is rare(?)
// so this is premature.
@@ -2013,7 +1972,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key());
}
return f.then([alternator_increased_compatibility, trans = std::move(trans), &mutations, idx, tr_state, &details, &options] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
return f.then([trans = std::move(trans), &mutations, idx, tr_state, &details] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
auto& m = mutations[idx];
auto& s = m.schema();
@@ -2028,13 +1987,13 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
details.had_preimage |= preimage;
details.had_postimage |= postimage;
tracing::trace(tr_state, "CDC: Generating log mutations for {}", m.decorated_key());
if (should_split(m, options)) {
if (should_split(m)) {
tracing::trace(tr_state, "CDC: Splitting {}", m.decorated_key());
details.was_split = true;
process_changes_with_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
process_changes_with_splitting(m, trans, preimage, postimage);
} else {
tracing::trace(tr_state, "CDC: No need to split {}", m.decorated_key());
process_changes_without_splitting(m, trans, preimage, postimage, alternator_increased_compatibility);
process_changes_without_splitting(m, trans, preimage, postimage);
}
auto [log_mut, touched_parts] = std::move(trans).finish();
const int generated_count = log_mut.size();

View File

@@ -52,9 +52,6 @@ class database;
namespace cdc {
using cell_map = std::unordered_map<const column_definition*, managed_bytes_opt>;
using row_states_map = std::unordered_map<clustering_key, cell_map, clustering_key::hashing, clustering_key::equality>;
// cdc log table operation
enum class operation : int8_t {
// note: these values will eventually be read by a third party, probably not privvy to this
@@ -76,14 +73,6 @@ struct per_request_options {
// Scylla. Currently, only TTL expiration implementation for Alternator
// uses this.
const bool is_system_originated = false;
// True if this mutation was emitted by Alternator.
const bool alternator = false;
// Sacrifice performance for the sake of better compatibility with DynamoDB
// Streams. It's important for correctness that
// alternator_streams_increased_compatibility config flag be read once per
// request, because it's live-updateable. As a result, the flag may change
// between reads.
const bool alternator_streams_increased_compatibility = false;
};
struct operation_result_tracker;
@@ -153,7 +142,4 @@ bool is_cdc_metacolumn_name(const sstring& name);
utils::UUID generate_timeuuid(api::timestamp_type t);
cell_map* get_row_state(row_states_map& row_states, const clustering_key& ck);
const cell_map* get_row_state(const row_states_map& row_states, const clustering_key& ck);
} // namespace cdc

View File

@@ -6,28 +6,15 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "bytes.hh"
#include "bytes_fwd.hh"
#include "mutation/atomic_cell.hh"
#include "mutation/atomic_cell_or_collection.hh"
#include "mutation/collection_mutation.hh"
#include "mutation/mutation.hh"
#include "mutation/tombstone.hh"
#include "schema/schema.hh"
#include "seastar/core/sstring.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "types/user.hh"
#include "split.hh"
#include "log.hh"
#include "change_visitor.hh"
#include "utils/managed_bytes.hh"
#include <string_view>
#include <unordered_map>
extern logging::logger cdc_log;
struct atomic_column_update {
column_id id;
@@ -503,8 +490,6 @@ struct should_split_visitor {
// Otherwise we store the change's ttl.
std::optional<gc_clock::duration> _ttl = std::nullopt;
virtual ~should_split_visitor() = default;
inline bool finished() const { return _result; }
inline void stop() { _result = true; }
@@ -527,7 +512,7 @@ struct should_split_visitor {
void collection_tombstone(const tombstone& t) { visit(t.timestamp + 1); }
virtual void live_collection_cell(bytes_view, const atomic_cell_view& cell) {
void live_collection_cell(bytes_view, const atomic_cell_view& cell) {
if (_had_row_marker) {
// nonatomic updates cannot be expressed with an INSERT.
return stop();
@@ -537,7 +522,7 @@ struct should_split_visitor {
void dead_collection_cell(bytes_view, const atomic_cell_view& cell) { visit(cell); }
void collection_column(const column_definition&, auto&& visit_collection) { visit_collection(*this); }
virtual void marker(const row_marker& rm) {
void marker(const row_marker& rm) {
_had_row_marker = true;
visit(rm.timestamp(), get_ttl(rm));
}
@@ -578,29 +563,7 @@ struct should_split_visitor {
}
};
// This is the same as the above, but it doesn't split a row marker away from
// an update. As a result, updates that create an item appear as a single log
// row.
class alternator_should_split_visitor : public should_split_visitor {
public:
~alternator_should_split_visitor() override = default;
void live_collection_cell(bytes_view, const atomic_cell_view& cell) override {
visit(cell.timestamp());
}
void marker(const row_marker& rm) override {
visit(rm.timestamp());
}
};
bool should_split(const mutation& m, const per_request_options& options) {
if (options.alternator) {
alternator_should_split_visitor v;
cdc::inspect_mutation(m, v);
return v._result || v._ts == api::missing_timestamp;
}
bool should_split(const mutation& m) {
should_split_visitor v;
cdc::inspect_mutation(m, v);
@@ -610,109 +573,8 @@ bool should_split(const mutation& m, const per_request_options& options) {
|| v._ts == api::missing_timestamp;
}
// Returns true if the row state and the atomic and nonatomic entries represent
// an equivalent item.
static bool entries_match_row_state(const schema_ptr& base_schema, const cell_map& row_state, const std::vector<atomic_column_update>& atomic_entries,
std::vector<nonatomic_column_update>& nonatomic_entries) {
for (const auto& update : atomic_entries) {
const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id);
const auto it = row_state.find(&cdef);
if (it == row_state.end()) {
return false;
}
if (to_managed_bytes_opt(update.cell.value().linearize()) != it->second) {
return false;
}
}
if (nonatomic_entries.empty()) {
return true;
}
for (const auto& update : nonatomic_entries) {
const column_definition& cdef = base_schema->column_at(column_kind::regular_column, update.id);
const auto it = row_state.find(&cdef);
if (it == row_state.end()) {
return false;
}
// The only collection used by Alternator is a non-frozen map.
auto current_raw_map = cdef.type->deserialize(*it->second);
map_type_impl::native_type current_values = value_cast<map_type_impl::native_type>(current_raw_map);
if (current_values.size() != update.cells.size()) {
return false;
}
std::unordered_map<sstring_view, bytes> current_values_map;
for (const auto& entry : current_values) {
const auto attr_name = std::string_view(value_cast<sstring>(entry.first));
current_values_map[attr_name] = value_cast<bytes>(entry.second);
}
for (const auto& [key, value] : update.cells) {
const auto key_str = to_string_view(key);
if (!value.is_live()) {
if (current_values_map.contains(key_str)) {
return false;
}
} else if (current_values_map[key_str] != value.value().linearize()) {
return false;
}
}
}
return true;
}
bool should_skip(batch& changes, const mutation& base_mutation, change_processor& processor) {
const schema_ptr& base_schema = base_mutation.schema();
// Alternator doesn't use static updates and clustered range deletions.
if (!changes.static_updates.empty() || !changes.clustered_range_deletions.empty()) {
return false;
}
for (clustered_row_insert& u : changes.clustered_inserts) {
const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key);
if (!row_state) {
return false;
}
if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) {
return false;
}
}
for (clustered_row_update& u : changes.clustered_updates) {
const cell_map* row_state = get_row_state(processor.clustering_row_states(), u.key);
if (!row_state) {
return false;
}
if (!entries_match_row_state(base_schema, *row_state, u.atomic_entries, u.nonatomic_entries)) {
return false;
}
}
// Skip only if the row being deleted does not exist (i.e. the deletion is a no-op).
for (const auto& row_deletion : changes.clustered_row_deletions) {
if (processor.clustering_row_states().contains(row_deletion.key)) {
return false;
}
}
// Don't skip if the item exists.
//
// Increased DynamoDB Streams compatibility guarantees that single-item
// operations will read the item and store it in the clustering row states.
// If it is not found there, we may skip CDC. This is safe as long as the
// assumptions of this operation's write isolation are not violated.
if (changes.partition_deletions && processor.clustering_row_states().contains(clustering_key::make_empty())) {
return false;
}
cdc_log.trace("Skipping CDC log for mutation {}", base_mutation);
return true;
}
void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) {
bool enable_preimage, bool enable_postimage) {
const auto base_schema = base_mutation.schema();
auto changes = extract_changes(base_mutation);
auto pk = base_mutation.key();
@@ -724,6 +586,9 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
const auto last_timestamp = changes.rbegin()->first;
for (auto& [change_ts, btch] : changes) {
const bool is_last = change_ts == last_timestamp;
processor.begin_timestamp(change_ts, is_last);
clustered_column_set affected_clustered_columns_per_row{clustering_key::less_compare(*base_schema)};
one_kind_column_set affected_static_columns{base_schema->static_columns_count()};
@@ -732,12 +597,6 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
affected_clustered_columns_per_row = btch.get_affected_clustered_columns_per_row(*base_mutation.schema());
}
if (alternator_strict_compatibility && should_skip(btch, base_mutation, processor)) {
continue;
}
const bool is_last = change_ts == last_timestamp;
processor.begin_timestamp(change_ts, is_last);
if (enable_preimage) {
if (affected_static_columns.count() > 0) {
processor.produce_preimage(nullptr, affected_static_columns);
@@ -825,13 +684,7 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
}
void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility) {
if (alternator_strict_compatibility) {
auto changes = extract_changes(base_mutation);
if (should_skip(changes.begin()->second, base_mutation, processor)) {
return;
}
}
bool enable_preimage, bool enable_postimage) {
auto ts = find_timestamp(base_mutation);
processor.begin_timestamp(ts, true);

View File

@@ -9,7 +9,6 @@
#pragma once
#include <boost/dynamic_bitset.hpp> // IWYU pragma: keep
#include "cdc/log.hh"
#include "replica/database_fwd.hh"
#include "mutation/timestamp.hh"
@@ -66,14 +65,12 @@ public:
// Tells processor we have reached end of record - last part
// of a given timestamp batch
virtual void end_record() = 0;
virtual const row_states_map& clustering_row_states() const = 0;
};
bool should_split(const mutation& base_mutation, const per_request_options& options);
bool should_split(const mutation& base_mutation);
void process_changes_with_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility);
bool enable_preimage, bool enable_postimage);
void process_changes_without_splitting(const mutation& base_mutation, change_processor& processor,
bool enable_preimage, bool enable_postimage, bool alternator_strict_compatibility);
bool enable_preimage, bool enable_postimage);
}

View File

@@ -21,8 +21,5 @@ target_link_libraries(compaction
mutation_writer
replica)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(compaction REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers compaction
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -867,8 +867,8 @@ auto fmt::formatter<compaction::compaction_task_executor>::format(const compacti
namespace compaction {
inline compaction_controller make_compaction_controller(const compaction_manager::scheduling_group& csg, uint64_t static_shares, std::optional<float> max_shares, std::function<double()> fn) {
return compaction_controller(csg, static_shares, max_shares, 250ms, std::move(fn));
inline compaction_controller make_compaction_controller(const compaction_manager::scheduling_group& csg, uint64_t static_shares, std::function<double()> fn) {
return compaction_controller(csg, static_shares, 250ms, std::move(fn));
}
compaction::compaction_state::~compaction_state() {
@@ -1014,7 +1014,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
, _sys_ks("compaction_manager::system_keyspace")
, _cfg(std::move(cfg))
, _compaction_submission_timer(compaction_sg(), compaction_submission_callback())
, _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), _cfg.max_shares.get(), [this] () -> float {
, _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), [this] () -> float {
_last_backlog = backlog();
auto b = _last_backlog / available_memory();
// This means we are using an unimplemented strategy
@@ -1033,10 +1033,6 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
, _throughput_updater(serialized_action([this] { return update_throughput(throughput_mbs()); }))
, _update_compaction_static_shares_action([this] { return update_static_shares(static_shares()); })
, _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer()))
, _compaction_max_shares_observer(_cfg.max_shares.observe([this] (const float& max_shares) {
cmlog.info("Updating max shares to {}", max_shares);
_compaction_controller.set_max_shares(max_shares);
}))
, _strategy_control(std::make_unique<strategy_control>(*this))
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);
@@ -1055,12 +1051,11 @@ compaction_manager::compaction_manager(tasks::task_manager& tm)
, _sys_ks("compaction_manager::system_keyspace")
, _cfg(config{ .available_memory = 1 })
, _compaction_submission_timer(compaction_sg(), compaction_submission_callback())
, _compaction_controller(make_compaction_controller(compaction_sg(), 1, std::nullopt, [] () -> float { return 1.0; }))
, _compaction_controller(make_compaction_controller(compaction_sg(), 1, [] () -> float { return 1.0; }))
, _backlog_manager(_compaction_controller)
, _throughput_updater(serialized_action([this] { return update_throughput(throughput_mbs()); }))
, _update_compaction_static_shares_action([] { return make_ready_future<>(); })
, _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer()))
, _compaction_max_shares_observer(_cfg.max_shares.observe([] (const float& max_shares) {}))
, _strategy_control(std::make_unique<strategy_control>(*this))
, _tombstone_gc_state(_shared_tombstone_gc_state) {
tm.register_module(_task_manager_module->get_name(), _task_manager_module);

View File

@@ -80,7 +80,6 @@ public:
scheduling_group maintenance_sched_group;
size_t available_memory = 0;
utils::updateable_value<float> static_shares = utils::updateable_value<float>(0);
utils::updateable_value<float> max_shares = utils::updateable_value<float>(0);
utils::updateable_value<uint32_t> throughput_mb_per_sec = utils::updateable_value<uint32_t>(0);
std::chrono::seconds flush_all_tables_before_major = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::days(1));
};
@@ -160,7 +159,6 @@ private:
std::optional<utils::observer<uint32_t>> _throughput_option_observer;
serialized_action _update_compaction_static_shares_action;
utils::observer<float> _compaction_static_shares_observer;
utils::observer<float> _compaction_max_shares_observer;
uint64_t _validation_errors = 0;
class strategy_control;
@@ -293,10 +291,6 @@ public:
return _cfg.static_shares.get();
}
float max_shares() const noexcept {
return _cfg.max_shares.get();
}
uint32_t throughput_mbs() const noexcept {
return _cfg.throughput_mb_per_sec.get();
}

View File

@@ -227,7 +227,7 @@ future<> run_table_tasks(replica::database& db, std::vector<table_tasks_info> ta
// Tables will be kept in descending order.
std::ranges::sort(table_tasks, std::greater<>(), [&] (const table_tasks_info& tti) {
try {
return db.find_column_family(tti.ti.id).get_stats().live_disk_space_used.on_disk;
return db.find_column_family(tti.ti.id).get_stats().live_disk_space_used;
} catch (const replica::no_such_column_family& e) {
return int64_t(-1);
}
@@ -281,7 +281,7 @@ future<> run_keyspace_tasks(replica::database& db, std::vector<keyspace_tasks_in
try {
return std::accumulate(kti.table_infos.begin(), kti.table_infos.end(), int64_t(0), [&] (int64_t sum, const table_info& t) {
try {
sum += db.find_column_family(t.id).get_stats().live_disk_space_used.on_disk;
sum += db.find_column_family(t.id).get_stats().live_disk_space_used;
} catch (const replica::no_such_column_family&) {
// ignore
}

View File

@@ -888,18 +888,9 @@ rf_rack_valid_keyspaces: false
#
# Vector Store options
#
# HTTP and HTTPS schemes are supported. Port number is mandatory.
# If both `vector_store_primary_uri` and `vector_store_secondary_uri` are unset or empty, vector search is disabled.
#
# A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.
# A comma-separated list of URIs for the vector store using DNS name. Only HTTP schema is supported. Port number is mandatory.
# Default is empty, which means that the vector store is not used.
# vector_store_primary_uri: http://vector-store.dns.name:{port}
#
# A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.
# vector_store_secondary_uri: http://vector-store.dns.name:{port}
#
# Options for encrypted connections to the vector store. These options are used for HTTPS URIs in vector_store_primary_uri and vector_store_secondary_uri.
# vector_store_encryption_options:
# truststore: <not set, use system trust>
#
# io-streaming rate limiting

View File

@@ -445,7 +445,6 @@ ldap_tests = set([
scylla_tests = set([
'test/boost/combined_tests',
'test/boost/UUID_test',
'test/boost/url_parse_test',
'test/boost/advanced_rpc_compressor_test',
'test/boost/allocation_strategy_test',
'test/boost/alternator_unit_test',
@@ -647,28 +646,6 @@ vector_search_tests = set([
'test/vector_search/client_test'
])
vector_search_validator_bin = 'vector-search-validator/bin/vector-search-validator'
vector_search_validator_deps = set([
'test/vector_search_validator/build-validator',
'test/vector_search_validator/Cargo.toml',
'test/vector_search_validator/crates/validator/Cargo.toml',
'test/vector_search_validator/crates/validator/src/main.rs',
'test/vector_search_validator/crates/validator-scylla/Cargo.toml',
'test/vector_search_validator/crates/validator-scylla/src/lib.rs',
'test/vector_search_validator/crates/validator-scylla/src/cql.rs',
])
vector_store_bin = 'vector-search-validator/bin/vector-store'
vector_store_deps = set([
'test/vector_search_validator/build-env',
'test/vector_search_validator/build-vector-store',
])
vector_search_validator_bins = set([
vector_search_validator_bin,
vector_store_bin,
])
wasms = set([
'wasm/return_input.wat',
'wasm/test_complex_null_values.wat',
@@ -702,7 +679,7 @@ other = set([
'iotune',
])
all_artifacts = apps | cpp_apps | tests | other | wasms | vector_search_validator_bins
all_artifacts = apps | cpp_apps | tests | other | wasms
arg_parser = argparse.ArgumentParser('Configure scylla', add_help=False, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
arg_parser.add_argument('--out', dest='buildfile', action='store', default='build.ninja',
@@ -786,7 +763,6 @@ arg_parser.add_argument('--use-cmake', action=argparse.BooleanOptionalAction, de
arg_parser.add_argument('--coverage', action = 'store_true', help = 'Compile scylla with coverage instrumentation')
arg_parser.add_argument('--build-dir', action='store', default='build',
help='Build directory path')
arg_parser.add_argument('--disable-precompiled-header', action='store_true', default=False, help='Disable precompiled header for scylla binary')
arg_parser.add_argument('-h', '--help', action='store_true', help='show this help message and exit')
args = arg_parser.parse_args()
if args.help:
@@ -1062,6 +1038,7 @@ scylla_core = (['message/messaging_service.cc',
'db/hints/resource_manager.cc',
'db/hints/sync_point.cc',
'db/large_data_handler.cc',
'db/legacy_schema_migrator.cc',
'db/marshal/type_parser.cc',
'db/per_partition_rate_limit_options.cc',
'db/rate_limiter.cc',
@@ -1195,7 +1172,6 @@ scylla_core = (['message/messaging_service.cc',
'auth/allow_all_authorizer.cc',
'auth/authenticated_user.cc',
'auth/authenticator.cc',
'auth/cache.cc',
'auth/common.cc',
'auth/default_authorizer.cc',
'auth/resource.cc',
@@ -1292,8 +1268,7 @@ scylla_core = (['message/messaging_service.cc',
'vector_search/vector_store_client.cc',
'vector_search/dns.cc',
'vector_search/client.cc',
'vector_search/clients.cc',
'vector_search/truststore.cc'
'vector_search/clients.cc'
] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core
)
@@ -1604,7 +1579,6 @@ deps['test/boost/combined_tests'] += [
'test/boost/query_processor_test.cc',
'test/boost/reader_concurrency_semaphore_test.cc',
'test/boost/repair_test.cc',
'test/boost/replicator_test.cc',
'test/boost/restrictions_test.cc',
'test/boost/role_manager_test.cc',
'test/boost/row_cache_test.cc',
@@ -1647,7 +1621,6 @@ deps['test/boost/bytes_ostream_test'] = [
]
deps['test/boost/input_stream_test'] = ['test/boost/input_stream_test.cc']
deps['test/boost/UUID_test'] = ['clocks-impl.cc', 'utils/UUID_gen.cc', 'test/boost/UUID_test.cc', 'utils/uuid.cc', 'utils/dynamic_bitset.cc', 'utils/hashers.cc', 'utils/on_internal_error.cc']
deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test.cc', ]
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']
@@ -2212,15 +2185,7 @@ if os.path.exists(kmipc_lib):
user_cflags += f' -I{kmipc_dir}/include -DHAVE_KMIP'
def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
cxxflags = [
# we need this flag for correct precompiled header handling in connection with ccache (or similar)
# `git` tools don't preserve timestamps, so when using ccache it might be possible to add pch to ccache
# and then later (after for example rebase) get `stdafx.hh` with different timestamp, but the same content.
# this will tell ccache to bring pch from its cache. Later on clang will check if timestamps match and complain.
# Adding `-fpch-validate-input-files-content` tells clang to check content of stdafx.hh if timestamps don't match.
# The flag seems to be present in gcc as well.
"" if args.disable_precompiled_header else '-fpch-validate-input-files-content'
]
cxxflags = []
optimization_level = mode_config['optimization-level']
cxxflags.append(f'-O{optimization_level}')
@@ -2285,7 +2250,6 @@ def write_build_file(f,
scylla_version,
scylla_release,
args):
use_precompiled_header = not args.disable_precompiled_header
warnings = get_warning_options(args.cxx)
rustc_target = pick_rustc_target('wasm32-wasi', 'wasm32-wasip1')
f.write(textwrap.dedent('''\
@@ -2392,10 +2356,7 @@ def write_build_file(f,
for mode in build_modes:
modeval = modes[mode]
seastar_lib_ext = 'so' if modeval['build_seastar_shared_libs'] else 'a'
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
seastar_testing_dep = f'$builddir/{mode}/seastar/libseastar_testing.{seastar_lib_ext}'
abseil_dep = ' '.join(f'$builddir/{mode}/abseil/{lib}' for lib in abseil_libs)
fmt_lib = 'fmt'
f.write(textwrap.dedent('''\
cxx_ld_flags_{mode} = {cxx_ld_flags}
@@ -2408,14 +2369,6 @@ def write_build_file(f,
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags_{mode} $cxxflags $obj_cxxflags -c -o $out $in
description = CXX $out
depfile = $out.d
rule cxx_build_precompiled_header.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags_{mode} $cxxflags $obj_cxxflags -c -o $out $in -Winvalid-pch -fpch-instantiate-templates -Xclang -emit-pch -DSCYLLA_USE_PRECOMPILED_HEADER
description = CXX-PRECOMPILED-HEADER $out
depfile = $out.d
rule cxx_with_pch.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags_{mode} $cxxflags $obj_cxxflags -c -o $out $in -Winvalid-pch -Xclang -include-pch -Xclang $builddir/{mode}/stdafx.hh.pch
description = CXX $out
depfile = $out.d
rule link.{mode}
command = $cxx $ld_flags_{mode} $ldflags -o $out $in $libs $libs_{mode}
description = LINK $out
@@ -2449,7 +2402,7 @@ def write_build_file(f,
$builddir/{mode}/gen/${{stem}}Parser.cpp
description = ANTLR3 $in
rule checkhh.{mode}
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags $cxxflags_{mode} $obj_cxxflags --include $in -c -o $out $builddir/{mode}/gen/empty.cc -USCYLLA_USE_PRECOMPILED_HEADER
command = $cxx -MD -MT $out -MF $out.d {seastar_cflags} $cxxflags $cxxflags_{mode} $obj_cxxflags --include $in -c -o $out $builddir/{mode}/gen/empty.cc
description = CHECKHH $in
depfile = $out.d
rule test.{mode}
@@ -2463,11 +2416,10 @@ def write_build_file(f,
description = RUST_LIB $out
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, **modeval))
f.write(
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
'build {mode}-build: phony {artifacts} {wasms}\n'.format(
mode=mode,
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms - vector_search_validator_bins)]),
artifacts=str.join(' ', ['$builddir/' + mode + '/' + x for x in sorted(build_artifacts - wasms)]),
wasms = str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & wasms)]),
vector_search_validator_bins=str.join(' ', ['$builddir/' + x for x in sorted(build_artifacts & vector_search_validator_bins)]),
)
)
if profile_recipe := modes[mode].get('profile_recipe'):
@@ -2476,7 +2428,6 @@ def write_build_file(f,
include_dist_target = f'dist-{mode}' if args.enable_dist is None or args.enable_dist else ''
f.write(f'build {mode}: phony {include_cxx_target} {include_dist_target}\n')
compiles = {}
compiles_with_pch = set()
swaggers = set()
serializers = {}
ragels = {}
@@ -2491,16 +2442,16 @@ def write_build_file(f,
# object code. And we enable LTO when linking the main Scylla executable, while disable
# it when linking anything else.
seastar_lib_ext = 'so' if modeval['build_seastar_shared_libs'] else 'a'
for binary in sorted(build_artifacts):
if modeval['is_profile'] and binary != "scylla":
# Just to avoid clutter in build.ninja
continue
profile_dep = modes[mode].get('profile_target', "")
if binary in other or binary in wasms or binary in vector_search_validator_bins:
if binary in other or binary in wasms:
continue
srcs = deps[binary]
# 'scylla'
objs = ['$builddir/' + mode + '/' + src.replace('.cc', '.o')
for src in srcs
if src.endswith('.cc')]
@@ -2536,6 +2487,9 @@ def write_build_file(f,
continue
do_lto = modes[mode]['has_lto'] and binary in lto_binaries
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
seastar_testing_dep = f'$builddir/{mode}/seastar/libseastar_testing.{seastar_lib_ext}'
abseil_dep = ' '.join(f'$builddir/{mode}/abseil/{lib}' for lib in abseil_libs)
seastar_testing_libs = f'$seastar_testing_libs_{mode}'
local_libs = f'$seastar_libs_{mode} $libs'
@@ -2545,7 +2499,6 @@ def write_build_file(f,
local_libs += ' -flto=thin -ffat-lto-objects'
else:
local_libs += ' -fno-lto'
use_pch = use_precompiled_header and binary == 'scylla'
if binary in tests:
if binary in pure_boost_tests:
local_libs += ' ' + maybe_static(args.staticboost, '-lboost_unit_test_framework')
@@ -2574,8 +2527,6 @@ def write_build_file(f,
if src.endswith('.cc'):
obj = '$builddir/' + mode + '/' + src.replace('.cc', '.o')
compiles[obj] = src
if use_pch:
compiles_with_pch.add(obj)
elif src.endswith('.idl.hh'):
hh = '$builddir/' + mode + '/gen/' + src.replace('.idl.hh', '.dist.hh')
serializers[hh] = src
@@ -2608,11 +2559,10 @@ def write_build_file(f,
)
f.write(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms} {vector_search_validator_bins} \n'.format(
'build {mode}-test: test.{mode} {test_executables} $builddir/{mode}/scylla {wasms}\n'.format(
mode=mode,
test_executables=' '.join(['$builddir/{}/{}'.format(mode, binary) for binary in sorted(tests)]),
wasms=' '.join([f'$builddir/{binary}' for binary in sorted(wasms)]),
vector_search_validator_bins=' '.join([f'$builddir/{binary}' for binary in sorted(vector_search_validator_bins)]),
)
)
f.write(
@@ -2655,9 +2605,7 @@ def write_build_file(f,
src = compiles[obj]
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
abseil_dep = ' '.join(f'$builddir/{mode}/abseil/{lib}' for lib in abseil_libs)
pch_dep = f'$builddir/{mode}/stdafx.hh.pch' if obj in compiles_with_pch else ''
cxx_cmd = 'cxx_with_pch' if obj in compiles_with_pch else 'cxx'
f.write(f'build {obj}: {cxx_cmd}.{mode} {src} | {profile_dep} {seastar_dep} {abseil_dep} {gen_headers_dep} {pch_dep}\n')
f.write(f'build {obj}: cxx.{mode} {src} | {profile_dep} || {seastar_dep} {abseil_dep} {gen_headers_dep}\n')
if src in modeval['per_src_extra_cxxflags']:
f.write(' cxxflags = {seastar_cflags} $cxxflags $cxxflags_{mode} {extra_cxxflags}\n'.format(mode=mode, extra_cxxflags=modeval["per_src_extra_cxxflags"][src], **modeval))
for swagger in swaggers:
@@ -2718,8 +2666,6 @@ def write_build_file(f,
f.write(' target = {lib}\n'.format(**locals()))
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
f.write(f'build $builddir/{mode}/stdafx.hh.pch: cxx_build_precompiled_header.{mode} stdafx.hh | {profile_dep} {seastar_dep} {abseil_dep} {gen_headers_dep} {pch_dep}\n')
f.write('build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n'
.format(**locals()))
f.write(' pool = submodule_pool\n')
@@ -2783,19 +2729,6 @@ def write_build_file(f,
'build compiler-training: phony {}\n'.format(' '.join(['{mode}-compiler-training'.format(mode=mode) for mode in default_modes]))
)
f.write(textwrap.dedent(f'''\
rule build-vector-search-validator
command = test/vector_search_validator/build-validator $builddir
rule build-vector-store
command = test/vector_search_validator/build-vector-store $builddir
'''))
f.write(
'build $builddir/{vector_search_validator_bin}: build-vector-search-validator {}\n'.format(' '.join([dep for dep in sorted(vector_search_validator_deps)]), vector_search_validator_bin=vector_search_validator_bin)
)
f.write(
'build $builddir/{vector_store_bin}: build-vector-store {}\n'.format(' '.join([dep for dep in sorted(vector_store_deps)]), vector_store_bin=vector_store_bin)
)
f.write(textwrap.dedent(f'''\
build dist-unified-tar: phony {' '.join([f'$builddir/{mode}/dist/tar/{scylla_product}-unified-{scylla_version}-{scylla_release}.{arch}.tar.gz' for mode in default_modes])}
build dist-unified: phony dist-unified-tar
@@ -3009,7 +2942,7 @@ def configure_using_cmake(args):
'CMAKE_DEFAULT_CONFIGS': selected_configs,
'CMAKE_C_COMPILER': args.cc,
'CMAKE_CXX_COMPILER': args.cxx,
'CMAKE_CXX_FLAGS': args.user_cflags + ("" if args.disable_precompiled_header else " -fpch-validate-input-files-content"),
'CMAKE_CXX_FLAGS': args.user_cflags,
'CMAKE_EXE_LINKER_FLAGS': args.user_ldflags,
'CMAKE_EXPORT_COMPILE_COMMANDS': 'ON',
'Scylla_CHECK_HEADERS': 'ON',
@@ -3018,7 +2951,6 @@ def configure_using_cmake(args):
'Scylla_TEST_REPEAT': args.test_repeat,
'Scylla_ENABLE_LTO': 'ON' if args.lto else 'OFF',
'Scylla_WITH_DEBUG_INFO' : 'ON' if args.debuginfo else 'OFF',
'Scylla_USE_PRECOMPILED_HEADER': 'OFF' if args.disable_precompiled_header else 'ON',
}
if args.date_stamp:
settings['Scylla_DATE_STAMP'] = args.date_stamp

View File

@@ -138,8 +138,5 @@ target_link_libraries(cql3
lang
transport)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(cql3 REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers cql3
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -575,15 +575,6 @@ usingTimeoutServiceLevelClauseObjective[std::unique_ptr<cql3::attributes::raw>&
| serviceLevel sl_name=serviceLevelOrRoleName { attrs->service_level = std::move(sl_name); }
;
usingTimeoutConcurrencyClause[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_USING usingTimeoutConcurrencyClauseObjective[attrs] ( K_AND usingTimeoutConcurrencyClauseObjective[attrs] )*
;
usingTimeoutConcurrencyClauseObjective[std::unique_ptr<cql3::attributes::raw>& attrs]
: K_TIMEOUT to=term { attrs->timeout = std::move(to); }
| K_CONCURRENCY c=term { attrs->concurrency = std::move(c); }
;
/**
* UPDATE <CF>
* USING TIMESTAMP <long>
@@ -675,7 +666,7 @@ pruneMaterializedViewStatement returns [std::unique_ptr<raw::select_statement> e
auto attrs = std::make_unique<cql3::attributes::raw>();
expression wclause = conjunction{};
}
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingTimeoutConcurrencyClause[attrs] )?
: K_PRUNE K_MATERIALIZED K_VIEW cf=columnFamilyName (K_WHERE w=whereClause { wclause = std::move(w); } )? ( usingClause[attrs] )?
{
auto params = make_lw_shared<raw::select_statement::parameters>(std::move(orderings), is_distinct, allow_filtering, statement_subtype, bypass_cache);
return std::make_unique<raw::select_statement>(std::move(cf), std::move(params),
@@ -1569,10 +1560,6 @@ serviceLevelOrRoleName returns [sstring name]
| t=QUOTED_NAME { $name = sstring($t.text); }
| k=unreserved_keyword { $name = k;
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower);}
// The literal `default` will not be parsed by any of the previous
// rules, so we need to cover it manually. Needed by CREATE SERVICE
// LEVEL and ATTACH SERVICE LEVEL.
| t=K_DEFAULT { $name = sstring("default"); }
| QMARK {add_recognition_error("Bind variables cannot be used for service levels or role names");}
;
@@ -2379,7 +2366,6 @@ K_LIKE: L I K E;
K_TIMEOUT: T I M E O U T;
K_PRUNE: P R U N E;
K_CONCURRENCY: C O N C U R R E N C Y;
K_EXECUTE: E X E C U T E;

View File

@@ -20,21 +20,19 @@
namespace cql3 {
std::unique_ptr<attributes> attributes::none() {
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}, {}}};
return std::unique_ptr<attributes>{new attributes{{}, {}, {}, {}}};
}
attributes::attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency)
std::optional<sstring> service_level)
: _timestamp_unset_guard(timestamp)
, _timestamp{std::move(timestamp)}
, _time_to_live_unset_guard(time_to_live)
, _time_to_live{std::move(time_to_live)}
, _timeout{std::move(timeout)}
, _service_level(std::move(service_level))
, _concurrency{std::move(concurrency)}
{ }
bool attributes::is_timestamp_set() const {
@@ -53,10 +51,6 @@ bool attributes::is_service_level_set() const {
return bool(_service_level);
}
bool attributes::is_concurrency_set() const {
return bool(_concurrency);
}
int64_t attributes::get_timestamp(int64_t now, const query_options& options) {
if (!_timestamp.has_value() || _timestamp_unset_guard.is_unset(options)) {
return now;
@@ -129,27 +123,6 @@ qos::service_level_options attributes::get_service_level(qos::service_level_cont
return sl_controller.get_service_level(sl_name).slo;
}
std::optional<int32_t> attributes::get_concurrency(const query_options& options) const {
if (!_concurrency.has_value()) {
return std::nullopt;
}
cql3::raw_value concurrency_raw = expr::evaluate(*_concurrency, options);
if (concurrency_raw.is_null()) {
throw exceptions::invalid_request_exception("Invalid null value of concurrency");
}
int32_t concurrency;
try {
concurrency = concurrency_raw.view().validate_and_deserialize<int32_t>(*int32_type);
} catch (marshal_exception& e) {
throw exceptions::invalid_request_exception("Invalid concurrency value");
}
if (concurrency <= 0) {
throw exceptions::invalid_request_exception("Concurrency must be a positive integer");
}
return concurrency;
}
void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timestamp.has_value()) {
expr::fill_prepare_context(*_timestamp, ctx);
@@ -160,13 +133,10 @@ void attributes::fill_prepare_context(prepare_context& ctx) {
if (_timeout.has_value()) {
expr::fill_prepare_context(*_timeout, ctx);
}
if (_concurrency.has_value()) {
expr::fill_prepare_context(*_concurrency, ctx);
}
}
std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const {
std::optional<expr::expression> ts, ttl, to, conc;
std::optional<expr::expression> ts, ttl, to;
if (timestamp.has_value()) {
ts = prepare_expression(*timestamp, db, ks_name, nullptr, timestamp_receiver(ks_name, cf_name));
@@ -183,12 +153,7 @@ std::unique_ptr<attributes> attributes::raw::prepare(data_dictionary::database d
verify_no_aggregate_functions(*timeout, "USING clause");
}
if (concurrency.has_value()) {
conc = prepare_expression(*concurrency, db, ks_name, nullptr, concurrency_receiver(ks_name, cf_name));
verify_no_aggregate_functions(*concurrency, "USING clause");
}
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level), std::move(conc)}};
return std::unique_ptr<attributes>{new attributes{std::move(ts), std::move(ttl), std::move(to), std::move(service_level)}};
}
lw_shared_ptr<column_specification> attributes::raw::timestamp_receiver(const sstring& ks_name, const sstring& cf_name) const {
@@ -203,8 +168,4 @@ lw_shared_ptr<column_specification> attributes::raw::timeout_receiver(const sstr
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[timeout]", true), duration_type);
}
lw_shared_ptr<column_specification> attributes::raw::concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const {
return make_lw_shared<column_specification>(ks_name, cf_name, ::make_shared<column_identifier>("[concurrency]", true), data_type_for<int32_t>());
}
}

View File

@@ -36,15 +36,13 @@ private:
std::optional<cql3::expr::expression> _time_to_live;
std::optional<cql3::expr::expression> _timeout;
std::optional<sstring> _service_level;
std::optional<cql3::expr::expression> _concurrency;
public:
static std::unique_ptr<attributes> none();
private:
attributes(std::optional<cql3::expr::expression>&& timestamp,
std::optional<cql3::expr::expression>&& time_to_live,
std::optional<cql3::expr::expression>&& timeout,
std::optional<sstring> service_level,
std::optional<cql3::expr::expression>&& concurrency);
std::optional<sstring> service_level);
public:
bool is_timestamp_set() const;
@@ -54,8 +52,6 @@ public:
bool is_service_level_set() const;
bool is_concurrency_set() const;
int64_t get_timestamp(int64_t now, const query_options& options);
std::optional<int32_t> get_time_to_live(const query_options& options);
@@ -64,8 +60,6 @@ public:
qos::service_level_options get_service_level(qos::service_level_controller& sl_controller) const;
std::optional<int32_t> get_concurrency(const query_options& options) const;
void fill_prepare_context(prepare_context& ctx);
class raw final {
@@ -74,7 +68,6 @@ public:
std::optional<cql3::expr::expression> time_to_live;
std::optional<cql3::expr::expression> timeout;
std::optional<sstring> service_level;
std::optional<cql3::expr::expression> concurrency;
std::unique_ptr<attributes> prepare(data_dictionary::database db, const sstring& ks_name, const sstring& cf_name) const;
private:
@@ -83,8 +76,6 @@ public:
lw_shared_ptr<column_specification> time_to_live_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> timeout_receiver(const sstring& ks_name, const sstring& cf_name) const;
lw_shared_ptr<column_specification> concurrency_receiver(const sstring& ks_name, const sstring& cf_name) const;
};
};

View File

@@ -37,12 +37,6 @@ future<::shared_ptr<cql_transport::messages::result_message>>
alter_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &, std::optional<service::group0_guard> guard) const {
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, cannot be altered",
qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
service::group0_batch mc{std::move(guard)};
validate_shares_option(qp, _slo);
qos::service_level& sl = state.get_service_level_controller().get_service_level(_service_level);

View File

@@ -422,14 +422,7 @@ std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_sche
throw exceptions::invalid_request_exception(format("The synchronous_updates option is only applicable to materialized views, not to base tables"));
}
if (is_cdc_log_table) {
auto gc_opts = _properties->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on CDC log tables.");
}
}
_properties->apply_to_builder(cfm, std::move(schema_extensions), db, keyspace(), !is_cdc_log_table);
_properties->apply_to_builder(cfm, std::move(schema_extensions), db, keyspace());
}
break;

View File

@@ -55,29 +55,8 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
_properties->validate(db, keyspace(), schema_extensions);
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
return false;
}
auto base_schema = db.find_schema(schema->view_info()->base_id());
if (!base_schema) {
return false;
}
return std::ranges::equal(
schema->partition_key_columns(),
base_schema->partition_key_columns(),
[](const column_definition& a, const column_definition& b) { return a.name() == b.name(); });
}();
if (is_colocated) {
auto gc_opts = _properties->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables.");
}
}
auto builder = schema_builder(schema);
_properties->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
_properties->apply_to_builder(builder, std::move(schema_extensions), db, keyspace());
if (builder.get_gc_grace_seconds() == 0) {
throw exceptions::invalid_request_exception(

View File

@@ -43,14 +43,6 @@ attach_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &,
std::optional<service::group0_guard> guard) const {
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, cannot be "
"attached to a role. If you want to detach an attached service level, "
"use the DETACH SERVICE LEVEL statement",
qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
auto sli = co_await state.get_service_level_controller().get_distributed_service_level(_service_level);
if (sli.empty()) {
throw qos::nonexistant_service_level_exception(_service_level);

View File

@@ -331,7 +331,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (!cl_for_paxos) [[unlikely]] {
return make_exception_future<shared_ptr<cql_transport::messages::result_message>>(std::move(cl_for_paxos).assume_error());
}
std::unique_ptr<cas_request> request;
seastar::shared_ptr<cas_request> request;
schema_ptr schema;
db::timeout_clock::time_point now = db::timeout_clock::now();
@@ -354,9 +354,9 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
if (keys.empty()) {
continue;
}
if (!request) {
if (request.get() == nullptr) {
schema = statement.s;
request = std::make_unique<cas_request>(schema, std::move(keys));
request = seastar::make_shared<cas_request>(schema, std::move(keys));
} else if (keys.size() != 1 || keys.front().equal(request->key().front(), dht::ring_position_comparator(*schema)) == false) {
throw exceptions::invalid_request_exception("BATCH with conditions cannot span multiple partitions");
}
@@ -366,7 +366,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
request->add_row_update(statement, std::move(ranges), std::move(json_cache), statement_options);
}
if (!request) {
if (request.get() == nullptr) {
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional BATCH"));
}
@@ -377,10 +377,9 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
);
}
auto* request_ptr = request.get();
return qp.proxy().cas(schema, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
return qp.proxy().cas(schema, std::move(cas_shard), request, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request = std::move(request)] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, batch_timeout, cas_timeout).then([this, request] (bool is_applied) {
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
});
}

View File

@@ -293,7 +293,7 @@ std::optional<db::tablet_options::map_type> cf_prop_defs::get_tablet_options() c
return std::nullopt;
}
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name, bool supports_repair) const {
void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name) const {
if (has_property(KW_COMMENT)) {
builder.set_comment(get_string(KW_COMMENT, ""));
}
@@ -379,7 +379,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
}
// Set default tombstone_gc mode.
if (!schema_extensions.contains(tombstone_gc_extension::NAME)) {
auto ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, ks_name, supports_repair));
auto ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, ks_name));
schema_extensions.emplace(tombstone_gc_extension::NAME, std::move(ext));
}
builder.set_extensions(std::move(schema_extensions));

View File

@@ -110,7 +110,7 @@ public:
bool get_synchronous_updates_flag() const;
std::optional<db::tablet_options::map_type> get_tablet_options() const;
void apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name, bool supports_repair) const;
void apply_to_builder(schema_builder& builder, schema::extensions_map schema_extensions, const data_dictionary::database& db, sstring ks_name) const;
void validate_minimum_int(const sstring& field, int32_t minimum_value, int32_t default_value) const;
};

View File

@@ -201,14 +201,7 @@ view_ptr create_index_statement::create_view_for_index(const schema_ptr schema,
"";
builder.with_view_info(schema, false, where_clause);
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
return false;
}
return im.local();
}();
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, schema->ks_name(), !is_colocated));
auto tombstone_gc_ext = seastar::make_shared<tombstone_gc_extension>(get_default_tombstone_gc_mode(db, schema->ks_name()));
builder.add_extension(tombstone_gc_extension::NAME, std::move(tombstone_gc_ext));
// A local secondary index should be backed by a *synchronous* view,
@@ -279,15 +272,11 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
}
// Regular secondary indexes require rf-rack-validity.
// Custom indexes need to validate this property themselves, if they need it.
if (!_properties || !_properties->custom_class) {
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
validate_for_local_index(*schema);
@@ -303,7 +292,7 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("Non-supported custom class \'{}\' provided", *(_properties->custom_class)));
}
auto custom_index = (*custom_index_factory)();
custom_index->validate(*schema, *_properties, targets, db.features(), db);
custom_index->validate(*schema, *_properties, targets, db.features());
_properties->index_version = custom_index->index_version(*schema);
}

View File

@@ -45,12 +45,6 @@ create_service_level_statement::execute(query_processor& qp,
throw exceptions::invalid_request_exception("Names starting with '$' are reserved for internal tenants. Use a different name.");
}
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, already exists "
"and cannot be created", qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
service::group0_batch mc{std::move(guard)};
validate_shares_option(qp, _slo);

View File

@@ -128,7 +128,7 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
builder.set_compressor_params(db.get_config().sstable_compression_user_table_options());
}
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace(), true);
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace());
}
void create_table_statement::add_column_metadata_from_aliases(schema_builder& builder, std::vector<bytes> aliases, const std::vector<data_type>& types, column_kind kind) const

View File

@@ -373,30 +373,7 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
db::view::create_virtual_column(builder, def->name(), def->type);
}
}
bool is_colocated = [&] {
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
return false;
}
if (target_partition_keys.size() != schema->partition_key_columns().size()) {
return false;
}
for (size_t i = 0; i < target_partition_keys.size(); ++i) {
if (target_partition_keys[i] != &schema->partition_key_columns()[i]) {
return false;
}
}
return true;
}();
if (is_colocated) {
auto gc_opts = _properties.properties()->get_tombstone_gc_options(schema_extensions);
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables.");
}
}
_properties.properties()->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
_properties.properties()->apply_to_builder(builder, std::move(schema_extensions), db, keyspace());
if (builder.default_time_to_live().count() > 0) {
throw exceptions::invalid_request_exception(

View File

@@ -34,11 +34,6 @@ drop_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &,
std::optional<service::group0_guard> guard) const {
if (_service_level == qos::service_level_controller::default_service_level_name) {
sstring reason = seastar::format("The default service level, {}, cannot be dropped",
qos::service_level_controller::default_service_level_name);
throw exceptions::invalid_request_exception(std::move(reason));
}
service::group0_batch mc{std::move(guard)};
auto& sl = state.get_service_level_controller();
co_await sl.drop_distributed_service_level(_service_level, _if_exists, mc);

View File

@@ -8,7 +8,6 @@
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "seastar/core/format.hh"
#include "seastar/core/sstring.hh"
#include "utils/assert.hh"
#include "cql3/statements/ks_prop_defs.hh"
@@ -114,17 +113,6 @@ static locator::replication_strategy_config_options prepare_options(
return options;
}
if (uses_tablets) {
for (const auto& opt: old_options) {
if (opt.first == ks_prop_defs::REPLICATION_FACTOR_KEY) {
on_internal_error(logger, format("prepare_options: old_options contains invalid key '{}'", ks_prop_defs::REPLICATION_FACTOR_KEY));
}
if (!options.contains(opt.first)) {
throw exceptions::configuration_exception(fmt::format("Attempted to implicitly drop replicas in datacenter {}. If this is the desired behavior, set replication factor to 0 in {} explicitly.", opt.first, opt.first));
}
}
}
// For users' convenience, expand the 'replication_factor' option into a replication factor for each DC.
// If the user simply switches from another strategy without providing any options,
// but the other strategy used the 'replication_factor' option, it will also be expanded.

View File

@@ -401,8 +401,7 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
type.is_update() ? "update" : "deletion"));
}
auto request = std::make_unique<cas_request>(s, std::move(keys));
auto* request_ptr = request.get();
auto request = seastar::make_shared<cas_request>(s, std::move(keys));
// cas_request can be used for batches as well single statements; Here we have just a single
// modification in the list of CAS commands, since we're handling single-statement execution.
request->add_row_update(*this, std::move(ranges), std::move(json_cache), options);
@@ -428,9 +427,9 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
tablet_info = erm->check_locality(token);
}
return qp.proxy().cas(s, std::move(cas_shard), *request_ptr, request->read_command(qp), request->key(),
return qp.proxy().cas(s, std::move(cas_shard), request, request->read_command(qp), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request = std::move(request), tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
std::move(cl_for_paxos).assume_value(), cl_for_learn, statement_timeout, cas_timeout).then([this, request, tablet_replicas = std::move(tablet_info->tablet_replicas), token_range = tablet_info->token_range] (bool is_applied) {
auto result = request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
result->add_tablet_info(tablet_replicas, token_range);
return result;

View File

@@ -21,7 +21,7 @@ namespace cql3 {
namespace statements {
static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges, std::vector<query::clustering_range> clustering_bounds, view_ptr view,
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration, size_t concurrency) {
service::storage_proxy& proxy, service::query_state& state, const query_options& options, cql_stats& stats, db::timeout_clock::duration timeout_duration) {
auto key_columns = std::ranges::to<std::vector<const column_definition*>>(
view->all_columns()
| std::views::filter([] (const column_definition& cdef) { return cdef.is_primary_key(); })
@@ -35,7 +35,7 @@ static future<> delete_ghost_rows(dht::partition_range_vector partition_ranges,
tracing::trace(state.get_trace_state(), "Deleting ghost rows from partition ranges {}", partition_ranges);
auto p = service::pager::query_pagers::ghost_row_deleting_pager(schema_ptr(view), selection, state,
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration, concurrency);
options, std::move(command), std::move(partition_ranges), stats, proxy, timeout_duration);
int32_t page_size = std::max(options.get_page_size(), 1000);
auto now = gc_clock::now();
@@ -62,8 +62,7 @@ future<::shared_ptr<cql_transport::messages::result_message>> prune_materialized
auto timeout_duration = get_timeout(state.get_client_state(), options);
dht::partition_range_vector key_ranges = _restrictions->get_partition_key_ranges(options);
std::vector<query::clustering_range> clustering_bounds = _restrictions->get_clustering_bounds(options);
size_t concurrency = _attrs->is_concurrency_set() ? _attrs->get_concurrency(options).value() : 1;
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration, concurrency).then([] {
return delete_ghost_rows(std::move(key_ranges), std::move(clustering_bounds), view_ptr(_schema), qp.proxy(), state, options, _stats, timeout_duration).then([] {
return make_ready_future<::shared_ptr<cql_transport::messages::result_message>>(::make_shared<cql_transport::messages::result_message::void_message>());
});
}

View File

@@ -2031,16 +2031,14 @@ future<shared_ptr<cql_transport::messages::result_message>> vector_indexed_table
fmt::format("Use of ANN OF in an ORDER BY clause requires a LIMIT that is not greater than {}. LIMIT was {}", max_ann_query_limit, limit)));
}
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
auto aoe = abort_on_expiry(timeout);
auto pkeys = co_await qp.vector_store_client().ann(
_schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), limit, aoe.abort_source());
auto as = abort_source();
auto pkeys = co_await qp.vector_store_client().ann(_schema->ks_name(), _index.metadata().name(), _schema, get_ann_ordering_vector(options), limit, as);
if (!pkeys.has_value()) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception(std::visit(vector_search::vector_store_client::ann_error_visitor{}, pkeys.error())));
}
co_return co_await query_base_table(qp, state, options, pkeys.value(), timeout);
co_return co_await query_base_table(qp, state, options, pkeys.value());
});
auto page_size = options.get_page_size();
@@ -2075,10 +2073,10 @@ std::vector<float> vector_indexed_table_select_statement::get_ann_ordering_vecto
return util::to_vector<float>(values);
}
future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::query_base_table(query_processor& qp,
service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys,
lowres_clock::time_point timeout) const {
future<::shared_ptr<cql_transport::messages::result_message>> vector_indexed_table_select_statement::query_base_table(
query_processor& qp, service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys) const {
auto command = prepare_command_for_base_query(qp, state, options);
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
// For tables without clustering columns, we can optimize by querying
// partition ranges instead of individual primary keys, since the

View File

@@ -389,8 +389,8 @@ private:
std::vector<float> get_ann_ordering_vector(const query_options& options) const;
future<::shared_ptr<cql_transport::messages::result_message>> query_base_table(query_processor& qp, service::query_state& state,
const query_options& options, const std::vector<vector_search::primary_key>& pkeys, lowres_clock::time_point timeout) const;
future<::shared_ptr<cql_transport::messages::result_message>> query_base_table(
query_processor& qp, service::query_state& state, const query_options& options, const std::vector<vector_search::primary_key>& pkeys) const;
future<::shared_ptr<cql_transport::messages::result_message>> query_base_table(query_processor& qp, service::query_state& state,
const query_options& options, lw_shared_ptr<query::read_command> command, lowres_clock::time_point timeout,

View File

@@ -12,8 +12,5 @@ target_link_libraries(data_dictionary
Seastar::seastar
xxHash::xxhash)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(data_dictionary REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers data_dictionary
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -10,6 +10,7 @@ target_sources(db
schema_applier.cc
schema_tables.cc
cql_type_parser.cc
legacy_schema_migrator.cc
commitlog/commitlog.cc
commitlog/commitlog_replayer.cc
commitlog/commitlog_entry.cc
@@ -59,8 +60,5 @@ target_link_libraries(db
data_dictionary
cql3)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_precompile_headers(db REUSE_FROM scylla-precompiled-header)
endif()
check_headers(check-headers db
GLOB_RECURSE ${CMAKE_CURRENT_SOURCE_DIR}/*.hh)

View File

@@ -3461,15 +3461,12 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
clogger.debug("Read {} bytes of data ({}, {})", size, pos, rem);
while (rem < size) {
const auto initial_size = initial.size_bytes();
if (eof) {
auto reason = fmt::format("unexpected EOF, pos={}, rem={}, size={}, alignment={}, initial_size={}",
pos, rem, size, alignment, initial_size);
auto reason = fmt::format("unexpected EOF, rem={}, size={}", rem, size);
throw segment_truncation(std::move(reason), block_boundry);
}
auto block_size = alignment - initial_size;
auto block_size = alignment - initial.size_bytes();
// using a stream is perhaps not 100% effective, but we need to
// potentially address data in pages smaller than the current
// disk/fs we are reading from can handle (but please no).
@@ -3477,9 +3474,8 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
if (tmp.size_bytes() == 0) {
eof = true;
auto reason = fmt::format("read 0 bytes, while tried to read {} bytes. "
"pos={}, rem={}, size={}, alignment={}, initial_size={}",
block_size, pos, rem, size, alignment, initial_size);
auto reason = fmt::format("read 0 bytes, while tried to read {} bytes. rem={}, size={}",
block_size, rem, size);
throw segment_truncation(std::move(reason), block_boundry);
}
@@ -3515,13 +3511,13 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
auto checksum = crc.checksum();
if (check != checksum) {
auto reason = fmt::format("checksums do not match: {:x} vs. {:x}. pos={}, rem={}, size={}, alignment={}, initial_size={}",
check, checksum, pos, rem, size, alignment, initial_size);
auto reason = fmt::format("checksums do not match: {:x} vs. {:x}. rem={}, size={}",
check, checksum, rem, size);
throw segment_data_corruption_error(std::move(reason), alignment);
}
if (id != this->id) {
auto reason = fmt::format("IDs do not match: {} vs. {}. pos={}, rem={}, size={}, alignment={}, initial_size={}",
id, this->id, pos, rem, size, alignment, initial_size);
auto reason = fmt::format("IDs do not match: {} vs. {}. rem={}, size={}",
id, this->id, rem, size);
throw segment_truncation(std::move(reason), pos + rem);
}
}
@@ -3630,10 +3626,6 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
auto old = pos;
pos = next_pos(off);
clogger.trace("Pos {} -> {} ({})", old, pos, off);
// #24346 check eof status whenever we move file pos.
if (pos >= file_size) {
eof = true;
}
}
future<> read_entry() {

View File

@@ -36,7 +36,6 @@
#include "sstables/compressor.hh"
#include "utils/log.hh"
#include "service/tablet_allocator_fwd.hh"
#include "backlog_controller_fwd.hh"
#include "utils/config_file_impl.hh"
#include "exceptions/exceptions.hh"
#include <seastar/core/metrics_api.hh>
@@ -631,8 +630,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"If set to higher than 0, ignore the controller's output and set the memtable shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_static_shares(this, "compaction_static_shares", liveness::LiveUpdate, value_status::Used, 0,
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_max_shares(this, "compaction_max_shares", liveness::LiveUpdate, value_status::Used, default_compaction_maximum_shares,
"Set the maximum shares of regular compaction to the specific value. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity.")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold.")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
@@ -1038,9 +1035,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Controls whether traffic between nodes is compressed. The valid values are:\n"
"* all: All traffic is compressed.\n"
"* dc : Traffic between data centers is compressed.\n"
"* rack : Traffic between racks is compressed.\n"
"* none : No compression.",
{"all", "dc", "rack", "none"})
{"all", "dc", "none"})
, internode_compression_zstd_max_cpu_fraction(this, "internode_compression_zstd_max_cpu_fraction", liveness::LiveUpdate, value_status::Used, 0.000,
"ZSTD compression of RPC will consume at most this fraction of each internode_compression_zstd_quota_refresh_period_ms time slice.\n"
"If you wish to try out zstd for RPC compression, 0.05 is a reasonable starting point.")
@@ -1172,17 +1168,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"* default_weight: (Default: 1 **) How many requests are handled during each turn of the RoundRobin.\n"
"* weights: (Default: Keyspace: 1) Takes a list of keyspaces. It sets how many requests are handled during each turn of the RoundRobin, based on the request_scheduler_id.")
/**
* @Group Vector search settings
* @GroupDescription Settings for configuring and tuning vector search functionality.
*/
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of primary vector store node URIs. These nodes are preferred for vector search operations.")
, vector_store_secondary_uri(this, "vector_store_secondary_uri", liveness::LiveUpdate, value_status::Used, "",
"A comma-separated list of secondary vector store node URIs. These nodes are used as a fallback when all primary nodes are unavailable, and are typically located in a different availability zone for high availability.")
, vector_store_encryption_options(this, "vector_store_encryption_options", value_status::Used, {},
"Options for encrypted connections to the vector store. These options are used for HTTPS URIs in `vector_store_primary_uri` and `vector_store_secondary_uri`. The available options are:\n"
"* truststore: (Default: <not set, use system truststore>) Location of the truststore containing the trusted certificate for authenticating remote servers.")
/**
* @Group Security properties
* @GroupDescription Server and client security settings.
*/
@@ -1444,11 +1429,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.")
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.")
, alternator_streams_increased_compatibility(this, "alternator_streams_increased_compatibility", liveness::LiveUpdate, value_status::Used, false,
"Increases compatibility with DynamoDB Streams at the cost of performance. "
"If enabled, Alternator compares the existing item with the new one during "
"data-modifying operations to determine which event type should be emitted. "
"This penalty is incurred only for tables with Alternator Streams enabled.")
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
"The server-side timeout for completing Alternator API requests.")
, alternator_ttl_period_in_seconds(this, "alternator_ttl_period_in_seconds", value_status::Used,
@@ -1470,6 +1450,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
, vector_store_primary_uri(this, "vector_store_primary_uri", liveness::LiveUpdate, value_status::Used, "", "A comma-separated list of vector store node URIs. If not set, vector search is disabled.")
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")

View File

@@ -189,7 +189,6 @@ public:
named_value<bool> auto_adjust_flush_quota;
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<float> compaction_max_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
named_value<sstring> cluster_name;
@@ -344,9 +343,6 @@ public:
named_value<sstring> request_scheduler;
named_value<sstring> request_scheduler_id;
named_value<string_map> request_scheduler_options;
named_value<sstring> vector_store_primary_uri;
named_value<sstring> vector_store_secondary_uri;
named_value<string_map> vector_store_encryption_options;
named_value<sstring> authenticator;
named_value<sstring> internode_authenticator;
named_value<sstring> authorizer;
@@ -465,7 +461,6 @@ public:
named_value<bool> alternator_warn_authorization;
named_value<sstring> alternator_write_isolation;
named_value<uint32_t> alternator_streams_time_window_s;
named_value<bool> alternator_streams_increased_compatibility;
named_value<uint32_t> alternator_timeout_in_ms;
named_value<double> alternator_ttl_period_in_seconds;
named_value<sstring> alternator_describe_endpoints;
@@ -474,6 +469,8 @@ public:
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
named_value<sstring> vector_store_primary_uri;
named_value<bool> abort_on_ebadf;
named_value<bool> sanitizer_report_backtrace;

View File

@@ -0,0 +1,602 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
// Since Scylla 2.0, we use system tables whose schemas were introduced in
// Cassandra 3. If Scylla boots to find a data directory with system tables
// with older schemas - produced by pre-2.0 Scylla or by pre-3.0 Cassandra,
// we need to migrate these old tables to the new format.
//
// We provide here a function, db::legacy_schema_migrator::migrate(),
// for a one-time migration from old to new system tables. The function
// reads old system tables, write them back in the new format, and finally
// delete the old system tables. Scylla's main should call this function and
// wait for the returned future, before starting to serve the database.
#include <boost/iterator/filter_iterator.hpp>
#include <seastar/core/future-util.hh>
#include <seastar/util/log.hh>
#include <map>
#include <unordered_set>
#include <chrono>
#include "replica/database.hh"
#include "legacy_schema_migrator.hh"
#include "system_keyspace.hh"
#include "schema_tables.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "utils/rjson.hh"
#include "cql3/query_processor.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/util.hh"
#include "cql3/statements/property_definitions.hh"
static seastar::logger mlogger("legacy_schema_migrator");
namespace db {
namespace legacy_schema_migrator {
// local data carriers
class migrator {
public:
static const std::unordered_set<sstring> legacy_schema_tables;
migrator(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp)
: _sp(sp), _db(db), _sys_ks(sys_ks), _qp(qp) {
}
migrator(migrator&&) = default;
typedef db_clock::time_point time_point;
// TODO: we don't support triggers.
// this is a placeholder.
struct trigger {
time_point timestamp;
sstring name;
std::unordered_map<sstring, sstring> options;
};
struct table {
time_point timestamp;
schema_ptr metadata;
std::vector<trigger> triggers;
};
struct type {
time_point timestamp;
user_type metadata;
};
struct function {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
bool called_on_null_input;
sstring language;
sstring body;
};
struct aggregate {
time_point timestamp;
sstring ks_name;
sstring fn_name;
std::vector<sstring> arg_names;
std::vector<sstring> arg_types;
sstring return_type;
sstring final_func;
sstring initcond;
sstring state_func;
sstring state_type;
};
struct keyspace {
time_point timestamp;
sstring name;
bool durable_writes;
std::map<sstring, sstring> replication_params;
std::vector<table> tables;
std::vector<type> types;
std::vector<function> functions;
std::vector<aggregate> aggregates;
};
class unsupported_feature : public std::runtime_error {
public:
using runtime_error::runtime_error;
};
static sstring fmt_query(const char* fmt, const char* table) {
return fmt::format(fmt::runtime(fmt), db::system_keyspace::NAME, table);
}
typedef ::shared_ptr<cql3::untyped_result_set> result_set_type;
typedef const cql3::untyped_result_set::row row_type;
future<> read_table(keyspace& dst, sstring cf_name, time_point timestamp) {
auto fmt = "SELECT * FROM {}.{} WHERE keyspace_name = ? AND columnfamily_name = ?";
auto tq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNFAMILIES);
auto cq = fmt_query(fmt, db::system_keyspace::legacy::COLUMNS);
auto zq = fmt_query(fmt, db::system_keyspace::legacy::TRIGGERS);
typedef std::tuple<future<result_set_type>, future<result_set_type>, future<result_set_type>, future<db::schema_tables::legacy::schema_mutations>> result_tuple;
return when_all(_qp.execute_internal(tq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(cq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
_qp.execute_internal(zq, { dst.name, cf_name }, cql3::query_processor::cache_internal::yes),
db::schema_tables::legacy::read_table_mutations(_sp, dst.name, cf_name, db::system_keyspace::legacy::column_families()))
.then([&dst, cf_name, timestamp](result_tuple&& t) {
result_set_type tables = std::get<0>(t).get();
result_set_type columns = std::get<1>(t).get();
result_set_type triggers = std::get<2>(t).get();
db::schema_tables::legacy::schema_mutations sm = std::get<3>(t).get();
row_type& td = tables->one();
auto ks_name = td.get_as<sstring>("keyspace_name");
auto cf_name = td.get_as<sstring>("columnfamily_name");
auto id = table_id(td.get_or("cf_id", generate_legacy_id(ks_name, cf_name).uuid()));
schema_builder builder(dst.name, cf_name, id);
builder.with_version(sm.digest());
cf_type cf = sstring_to_cf_type(td.get_or("type", sstring("standard")));
if (cf == cf_type::super) {
fail(unimplemented::cause::SUPER);
}
auto comparator = td.get_as<sstring>("comparator");
bool is_compound = cell_comparator::check_compound(comparator);
builder.set_is_compound(is_compound);
cell_comparator::read_collections(builder, comparator);
bool filter_sparse = false;
data_type default_validator = {};
if (td.has("default_validator")) {
default_validator = db::schema_tables::parse_type(td.get_as<sstring>("default_validator"));
if (default_validator->is_counter()) {
builder.set_is_counter(true);
}
builder.set_default_validation_class(default_validator);
}
/*
* Determine whether or not the table is *really* dense
* We cannot trust is_dense value of true (see CASSANDRA-11502, that fixed the issue for 2.2 only, and not retroactively),
* but we can trust is_dense value of false.
*/
auto is_dense = td.get_opt<bool>("is_dense");
if (!is_dense || *is_dense) {
is_dense = [&] {
/*
* As said above, this method is only here because we need to deal with thrift upgrades.
* Once a CF has been "upgraded", i.e. we've rebuilt and save its CQL3 metadata at least once,
* then we'll have saved the "is_dense" value and will be good to go.
*
* But non-upgraded thrift CF (and pre-7744 CF) will have no value for "is_dense", so we need
* to infer that information without relying on it in that case. And for the most part this is
* easy, a CF that has at least one REGULAR definition is not dense. But the subtlety is that not
* having a REGULAR definition may not mean dense because of CQL3 definitions that have only the
* PRIMARY KEY defined.
*
* So we need to recognize those special case CQL3 table with only a primary key. If we have some
* clustering columns, we're fine as said above. So the only problem is that we cannot decide for
* sure if a CF without REGULAR columns nor CLUSTERING_COLUMN definition is meant to be dense, or if it
* has been created in CQL3 by say:
* CREATE TABLE test (k int PRIMARY KEY)
* in which case it should not be dense. However, we can limit our margin of error by assuming we are
* in the latter case only if the comparator is exactly CompositeType(UTF8Type).
*/
std::optional<column_id> max_cl_idx;
const cql3::untyped_result_set::row * regular = nullptr;
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
if (kind_str == "compact_value") {
continue;
}
auto kind = db::schema_tables::deserialize_kind(kind_str);
if (kind == column_kind::regular_column) {
if (regular != nullptr) {
return false;
}
regular = &row;
continue;
}
if (kind == column_kind::clustering_key) {
max_cl_idx = std::max(column_id(row.get_or("component_index", 0)), max_cl_idx.value_or(column_id()));
}
}
auto is_cql3_only_pk_comparator = [](const sstring& comparator) {
if (!cell_comparator::check_compound(comparator)) {
return false;
}
// CMH. We don't have composites, nor a parser for it. This is a simple way of c
// checking the same.
auto comma = comparator.find(',');
if (comma != sstring::npos) {
return false;
}
auto off = comparator.find('(');
auto end = comparator.find(')');
return comparator.compare(off, end - off, utf8_type->name()) == 0;
};
if (max_cl_idx) {
auto n = std::count(comparator.begin(), comparator.end(), ','); // num comp - 1
return *max_cl_idx == n;
}
if (regular) {
return false;
}
return !is_cql3_only_pk_comparator(comparator);
}();
// now, if switched to sparse, remove redundant compact_value column and the last clustering column,
// directly copying CASSANDRA-11502 logic. See CASSANDRA-11315.
filter_sparse = !*is_dense;
}
builder.set_is_dense(*is_dense);
auto is_cql = !*is_dense && is_compound;
auto is_static_compact = !*is_dense && !is_compound;
// org.apache.cassandra.schema.LegacySchemaMigrator#isEmptyCompactValueColumn
auto is_empty_compact_value = [](const cql3::untyped_result_set::row& column_row) {
auto kind_str = column_row.get_as<sstring>("type");
// Cassandra only checks for "compact_value", but Scylla generates "regular" instead (#2586)
return (kind_str == "compact_value" || kind_str == "regular")
&& column_row.get_as<sstring>("column_name").empty();
};
for (auto& row : *columns) {
auto kind_str = row.get_as<sstring>("type");
auto kind = db::schema_tables::deserialize_kind(kind_str);
auto component_index = kind > column_kind::clustering_key ? 0 : column_id(row.get_or("component_index", 0));
auto name = row.get_or<sstring>("column_name", sstring());
auto validator = db::schema_tables::parse_type(row.get_as<sstring>("validator"));
if (is_empty_compact_value(row)) {
continue;
}
if (filter_sparse) {
if (kind_str == "compact_value") {
continue;
}
if (kind == column_kind::clustering_key) {
if (cf == cf_type::super && component_index != 0) {
continue;
}
if (cf != cf_type::super && !is_compound) {
continue;
}
}
}
std::optional<index_metadata_kind> index_kind;
sstring index_name;
index_options_map options;
if (row.has("index_type")) {
index_kind = schema_tables::deserialize_index_kind(row.get_as<sstring>("index_type"));
}
if (row.has("index_name")) {
index_name = row.get_as<sstring>("index_name");
}
if (row.has("index_options")) {
sstring index_options_str = row.get_as<sstring>("index_options");
options = rjson::parse_to_map<index_options_map>(std::string_view(index_options_str));
sstring type;
auto i = options.find("index_keys");
if (i != options.end()) {
options.erase(i);
type = "KEYS";
}
i = options.find("index_keys_and_values");
if (i != options.end()) {
options.erase(i);
type = "KEYS_AND_VALUES";
}
if (type.empty()) {
if (validator->is_collection() && validator->is_multi_cell()) {
type = "FULL";
} else {
type = "VALUES";
}
}
auto column = cql3::util::maybe_quote(name);
options["target"] = validator->is_collection()
? type + "(" + column + ")"
: column;
}
if (index_kind) {
// Origin assumes index_name is always set, so let's do the same
builder.with_index(index_metadata(index_name, options, *index_kind, index_metadata::is_local_index::no));
}
data_type column_name_type = [&] {
if (is_static_compact && kind == column_kind::regular_column) {
return db::schema_tables::parse_type(comparator);
}
return utf8_type;
}();
auto column_name = [&] {
try {
return column_name_type->from_string(name);
} catch (marshal_exception&) {
// #2597: Scylla < 2.0 writes names in serialized form, try to recover
column_name_type->validate(to_bytes_view(name));
return to_bytes(name);
}
}();
builder.with_column_ordered(column_definition(std::move(column_name), std::move(validator), kind, component_index));
}
if (is_static_compact) {
builder.set_regular_column_name_type(db::schema_tables::parse_type(comparator));
}
if (td.has("gc_grace_seconds")) {
builder.set_gc_grace_seconds(td.get_as<int32_t>("gc_grace_seconds"));
}
if (td.has("min_compaction_threshold")) {
builder.set_min_compaction_threshold(td.get_as<int32_t>("min_compaction_threshold"));
}
if (td.has("max_compaction_threshold")) {
builder.set_max_compaction_threshold(td.get_as<int32_t>("max_compaction_threshold"));
}
if (td.has("comment")) {
builder.set_comment(td.get_as<sstring>("comment"));
}
if (td.has("memtable_flush_period_in_ms")) {
builder.set_memtable_flush_period(td.get_as<int32_t>("memtable_flush_period_in_ms"));
}
if (td.has("caching")) {
builder.set_caching_options(caching_options::from_sstring(td.get_as<sstring>("caching")));
}
if (td.has("default_time_to_live")) {
builder.set_default_time_to_live(gc_clock::duration(td.get_as<int32_t>("default_time_to_live")));
}
if (td.has("speculative_retry")) {
builder.set_speculative_retry(td.get_as<sstring>("speculative_retry"));
}
if (td.has("compaction_strategy_class")) {
auto strategy = td.get_as<sstring>("compaction_strategy_class");
try {
builder.set_compaction_strategy(compaction::compaction_strategy::type(strategy));
} catch (const exceptions::configuration_exception& e) {
// If compaction strategy class isn't supported, fallback to incremental.
mlogger.warn("Falling back to incremental compaction strategy after the problem: {}", e.what());
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
}
}
if (td.has("compaction_strategy_options")) {
sstring strategy_options_str = td.get_as<sstring>("compaction_strategy_options");
builder.set_compaction_strategy_options(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options_str)));
}
auto comp_param = td.get_as<sstring>("compression_parameters");
compression_parameters cp(rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(comp_param)));
builder.set_compressor_params(cp);
if (td.has("min_index_interval")) {
builder.set_min_index_interval(td.get_as<int32_t>("min_index_interval"));
} else if (td.has("index_interval")) { // compatibility
builder.set_min_index_interval(td.get_as<int32_t>("index_interval"));
}
if (td.has("max_index_interval")) {
builder.set_max_index_interval(td.get_as<int32_t>("max_index_interval"));
}
if (td.has("bloom_filter_fp_chance")) {
builder.set_bloom_filter_fp_chance(td.get_as<double>("bloom_filter_fp_chance"));
} else {
builder.set_bloom_filter_fp_chance(builder.get_bloom_filter_fp_chance());
}
if (td.has("dropped_columns")) {
auto map = td.get_map<sstring, int64_t>("dropped_columns");
for (auto&& e : map) {
builder.without_column(e.first, api::timestamp_type(e.second));
};
}
// ignore version. we're transient
if (!triggers->empty()) {
throw unsupported_feature("triggers");
}
dst.tables.emplace_back(table{timestamp, builder.build() });
});
}
future<> read_tables(keyspace& dst) {
auto query = fmt_query("SELECT columnfamily_name, writeTime(type) AS timestamp FROM {}.{} WHERE keyspace_name = ?",
db::system_keyspace::legacy::COLUMNFAMILIES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
return read_table(dst, row.get_as<sstring>("columnfamily_name"), row.get_as<time_point>("timestamp"));
}).finally([result] {});
});
}
future<time_point> read_type_timestamp(keyspace& dst, sstring type_name) {
// TODO: Unfortunately there is not a single REGULAR column in system.schema_usertypes, so annoyingly we cannot
// use the writeTime() CQL function, and must resort to a lower level.
// Origin digs up the actual cells of target partition and gets timestamp from there.
// We should do the same, but g-dam that's messy. Lets give back dung value for now.
return make_ready_future<time_point>(dst.timestamp);
}
future<> read_types(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::USERTYPES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([this, &dst](result_set_type result) {
return parallel_for_each(*result, [this, &dst](row_type& row) {
auto name = row.get_blob_unfragmented("type_name");
auto columns = row.get_list<bytes>("field_names");
auto types = row.get_list<sstring>("field_types");
std::vector<data_type> field_types;
for (auto&& value : types) {
field_types.emplace_back(db::schema_tables::parse_type(value));
}
auto ut = user_type_impl::get_instance(dst.name, name, columns, field_types, false);
return read_type_timestamp(dst, value_cast<sstring>(utf8_type->deserialize(name))).then([ut = std::move(ut), &dst](time_point timestamp) {
dst.types.emplace_back(type{timestamp, ut});
});
}).finally([result] {});
});
}
future<> read_functions(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::FUNCTIONS);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("functions");
}
});
}
future<> read_aggregates(keyspace& dst) {
auto query = fmt_query("SELECT * FROM {}.{} WHERE keyspace_name = ?", db::system_keyspace::legacy::AGGREGATES);
return _qp.execute_internal(query, {dst.name}, cql3::query_processor::cache_internal::yes).then([](result_set_type result) {
if (!result->empty()) {
throw unsupported_feature("aggregates");
}
});
}
future<keyspace> read_keyspace(sstring ks_name, bool durable_writes, sstring strategy_class, sstring strategy_options, time_point timestamp) {
auto map = rjson::parse_to_map<std::map<sstring, sstring>>(std::string_view(strategy_options));
map.emplace("class", std::move(strategy_class));
auto ks = ::make_lw_shared<keyspace>(keyspace{timestamp, std::move(ks_name), durable_writes, std::move(map) });
return read_tables(*ks).then([this, ks] {
//Collection<Type> types = readTypes(keyspaceName);
return read_types(*ks);
}).then([this, ks] {
return read_functions(*ks);
}).then([this, ks] {
return read_aggregates(*ks);
}).then([ks] {
return make_ready_future<keyspace>(std::move(*ks));
});
}
future<> read_all_keyspaces() {
static auto ks_filter = [](row_type& row) {
auto ks_name = row.get_as<sstring>("keyspace_name");
return ks_name != db::system_keyspace::NAME && ks_name != db::schema_tables::v3::NAME;
};
auto query = fmt_query("SELECT keyspace_name, durable_writes, strategy_options, strategy_class, writeTime(durable_writes) AS timestamp FROM {}.{}",
db::system_keyspace::legacy::KEYSPACES);
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([this](result_set_type result) {
auto i = boost::make_filter_iterator(ks_filter, result->begin(), result->end());
auto e = boost::make_filter_iterator(ks_filter, result->end(), result->end());
return parallel_for_each(i, e, [this](row_type& row) {
return read_keyspace(row.get_as<sstring>("keyspace_name")
, row.get_as<bool>("durable_writes")
, row.get_as<sstring>("strategy_class")
, row.get_as<sstring>("strategy_options")
, row.get_as<db_clock::time_point>("timestamp")
).then([this](keyspace ks) {
_keyspaces.emplace_back(std::move(ks));
});
}).finally([result] {});
});
}
future<> drop_legacy_tables() {
mlogger.info("Dropping legacy schema tables");
auto with_snapshot = !_keyspaces.empty();
for (const sstring& cfname : legacy_schema_tables) {
co_await replica::database::legacy_drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
}
}
future<> store_keyspaces_in_new_schema_tables() {
mlogger.info("Moving {} keyspaces from legacy schema tables to the new schema keyspace ({})",
_keyspaces.size(), db::schema_tables::v3::NAME);
utils::chunked_vector<mutation> mutations;
for (auto& ks : _keyspaces) {
auto ksm = ::make_lw_shared<keyspace_metadata>(ks.name
, ks.replication_params["class"] // TODO, make ksm like c3?
, cql3::statements::property_definitions::to_extended_map(ks.replication_params)
, std::nullopt
, std::nullopt
, ks.durable_writes);
// we want separate time stamps for tables/types, so cannot bulk them into the ksm.
for (auto&& m : db::schema_tables::make_create_keyspace_mutations(schema_features::full(), ksm, ks.timestamp.time_since_epoch().count(), false)) {
mutations.emplace_back(std::move(m));
}
for (auto& t : ks.tables) {
db::schema_tables::add_table_or_view_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), true, mutations);
}
for (auto& t : ks.types) {
db::schema_tables::add_type_to_schema_mutation(t.metadata, t.timestamp.time_since_epoch().count(), mutations);
}
}
return _qp.proxy().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
}
future<> flush_schemas() {
auto& db = _qp.db().real_database().container();
return replica::database::flush_tables_on_all_shards(db, db::schema_tables::all_table_infos(schema_features::full()));
}
future<> migrate() {
return read_all_keyspaces().then([this]() {
// write metadata to the new schema tables
return store_keyspaces_in_new_schema_tables()
.then(std::bind(&migrator::flush_schemas, this))
.then(std::bind(&migrator::drop_legacy_tables, this))
.then([] { mlogger.info("Completed migration of legacy schema tables"); });
});
}
sharded<service::storage_proxy>& _sp;
sharded<replica::database>& _db;
sharded<db::system_keyspace>& _sys_ks;
cql3::query_processor& _qp;
std::vector<keyspace> _keyspaces;
};
const std::unordered_set<sstring> migrator::legacy_schema_tables = {
db::system_keyspace::legacy::KEYSPACES,
db::system_keyspace::legacy::COLUMNFAMILIES,
db::system_keyspace::legacy::COLUMNS,
db::system_keyspace::legacy::TRIGGERS,
db::system_keyspace::legacy::USERTYPES,
db::system_keyspace::legacy::FUNCTIONS,
db::system_keyspace::legacy::AGGREGATES,
};
}
}
future<>
db::legacy_schema_migrator::migrate(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp) {
return do_with(migrator(sp, db, sys_ks, qp), std::bind(&migrator::migrate, std::placeholders::_1));
}

View File

@@ -0,0 +1,37 @@
/*
* Modified by ScyllaDB
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "seastarx.hh"
namespace replica {
class database;
}
namespace cql3 {
class query_processor;
}
namespace service {
class storage_proxy;
}
namespace db {
class system_keyspace;
namespace legacy_schema_migrator {
future<> migrate(sharded<service::storage_proxy>&, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor&);
}
}

View File

@@ -542,7 +542,6 @@ public:
// Returns the range tombstone for the key range adjacent to the cursor's position from the side of smaller keys.
// Excludes the range for the row itself. That information is returned by range_tombstone_for_row().
// It's possible that range_tombstone() is empty and range_tombstone_for_row() is not empty.
// Note that this is different from the meaning of rows_entry::range_tombstone(), which includes the row itself.
tombstone range_tombstone() const { return _range_tombstone; }
// Can be called when cursor is pointing at a row.

View File

@@ -1287,15 +1287,6 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker,
, _partitions(dht::raw_token_less_comparator{})
, _underlying(src())
, _snapshot_source(std::move(src))
, _update_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.update {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _populate_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.populate {}.{}", _schema->ks_name(), _schema->cf_name());
}))
, _read_section(abstract_formatter([this] (fmt::context& ctx) {
fmt::format_to(ctx.out(), "cache.read {}.{}", _schema->ks_name(), _schema->cf_name());
}))
{
try {
with_allocator(_tracker.allocator(), [this, cont] {

View File

@@ -404,7 +404,10 @@ const std::unordered_set<table_id>& schema_tables_holding_schema_mutations() {
computed_columns(),
dropped_columns(),
indexes(),
scylla_tables()}) {
scylla_tables(),
db::system_keyspace::legacy::column_families(),
db::system_keyspace::legacy::columns(),
db::system_keyspace::legacy::triggers()}) {
SCYLLA_ASSERT(s->clustering_key_size() > 0);
auto&& first_column_name = s->clustering_column_at(0).name_as_text();
SCYLLA_ASSERT(first_column_name == "table_name"
@@ -2837,6 +2840,26 @@ void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view
}
namespace legacy {
table_schema_version schema_mutations::digest() const {
md5_hasher h;
const db::schema_features no_features;
db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, no_features);
db::schema_tables::feed_hash_for_schema_digest(h, _columns, no_features);
return table_schema_version(utils::UUID_gen::get_name_UUID(h.finalize()));
}
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s)
{
mutation cf_m = co_await read_schema_partition_for_table(proxy, s, keyspace_name, table_name);
mutation col_m = co_await read_schema_partition_for_table(proxy, db::system_keyspace::legacy::columns(), keyspace_name, table_name);
co_return schema_mutations{std::move(cf_m), std::move(col_m)};
}
} // namespace legacy
static auto GET_COLUMN_MAPPING_QUERY = format("SELECT column_name, clustering_order, column_name_bytes, kind, position, type FROM system.{} WHERE cf_id = ? AND schema_version = ?",
db::schema_tables::SCYLLA_TABLE_SCHEMA_HISTORY);

View File

@@ -155,6 +155,24 @@ schema_ptr scylla_table_schema_history();
const std::unordered_set<table_id>& schema_tables_holding_schema_mutations();
}
namespace legacy {
class schema_mutations {
mutation _columnfamilies;
mutation _columns;
public:
schema_mutations(mutation columnfamilies, mutation columns)
: _columnfamilies(std::move(columnfamilies))
, _columns(std::move(columns))
{ }
table_schema_version digest() const;
};
future<schema_mutations> read_table_mutations(sharded<service::storage_proxy>& proxy,
sstring keyspace_name, sstring table_name, schema_ptr s);
}
struct qualified_name {
sstring keyspace_name;
sstring table_name;

View File

@@ -766,6 +766,9 @@ schema_ptr system_keyspace::size_estimates() {
"partitions larger than specified threshold"
);
builder.set_gc_grace_seconds(0);
// FIXME re-enable caching for this and the other two
// system.large_* tables once
// https://github.com/scylladb/scylla/issues/3288 is fixed
builder.set_caching_options(caching_options::get_disabled_caching_options());
builder.with_hash_version();
return builder.build(schema_builder::compact_storage::no);
@@ -847,6 +850,8 @@ schema_ptr system_keyspace::corrupt_data() {
return corrupt_data;
}
static constexpr auto schema_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
/*static*/ schema_ptr system_keyspace::scylla_local() {
static thread_local auto scylla_local = [] {
schema_builder builder(generate_legacy_id(NAME, SCYLLA_LOCAL), NAME, SCYLLA_LOCAL,
@@ -1358,6 +1363,289 @@ schema_ptr system_keyspace::role_permissions() {
return schema;
}
schema_ptr system_keyspace::legacy::hints() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
// partition key
{{"target_id", uuid_type}},
// clustering key
{{"hint_id", timeuuid_type}, {"message_version", int32_type}},
// regular columns
{{"mutation", bytes_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* hints awaiting delivery"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"enabled", "false"}});
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::batchlog() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, BATCHLOG), NAME, BATCHLOG,
// partition key
{{"id", uuid_type}},
// clustering key
{},
// regular columns
{{"data", bytes_type}, {"version", int32_type}, {"written_at", timestamp_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* batchlog entries"
);
builder.set_gc_grace_seconds(0);
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::keyspaces() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, KEYSPACES), NAME, KEYSPACES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{},
// regular columns
{
{"durable_writes", boolean_type},
{"strategy_class", utf8_type},
{"strategy_options", utf8_type}
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* keyspace definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::yes);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::column_families() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNFAMILIES), NAME, COLUMNFAMILIES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}},
// regular columns
{
{"bloom_filter_fp_chance", double_type},
{"caching", utf8_type},
{"cf_id", uuid_type},
{"comment", utf8_type},
{"compaction_strategy_class", utf8_type},
{"compaction_strategy_options", utf8_type},
{"comparator", utf8_type},
{"compression_parameters", utf8_type},
{"default_time_to_live", int32_type},
{"default_validator", utf8_type},
{"dropped_columns", map_type_impl::get_instance(utf8_type, long_type, true)},
{"gc_grace_seconds", int32_type},
{"is_dense", boolean_type},
{"key_validator", utf8_type},
{"max_compaction_threshold", int32_type},
{"max_index_interval", int32_type},
{"memtable_flush_period_in_ms", int32_type},
{"min_compaction_threshold", int32_type},
{"min_index_interval", int32_type},
{"speculative_retry", utf8_type},
{"subcomparator", utf8_type},
{"type", utf8_type},
// The following 4 columns are only present up until 2.1.8 tables
{"key_aliases", utf8_type},
{"value_alias", utf8_type},
{"column_aliases", utf8_type},
{"index_interval", int32_type},},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* table definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::columns() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, COLUMNS), NAME, COLUMNS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"column_name", utf8_type}},
// regular columns
{
{"component_index", int32_type},
{"index_name", utf8_type},
{"index_options", utf8_type},
{"index_type", utf8_type},
{"type", utf8_type},
{"validator", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"column definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::triggers() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, TRIGGERS), NAME, TRIGGERS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"columnfamily_name", utf8_type}, {"trigger_name", utf8_type}},
// regular columns
{
{"trigger_options", map_type_impl::get_instance(utf8_type, utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"trigger definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::usertypes() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, USERTYPES), NAME, USERTYPES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"type_name", utf8_type}},
// regular columns
{
{"field_names", list_type_impl::get_instance(utf8_type, true)},
{"field_types", list_type_impl::get_instance(utf8_type, true)},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::functions() {
/**
* Note: we have our own "legacy" version of this table (in schema_tables),
* but it is (afaik) not used, and differs slightly from the origin one.
* This is based on the origin schema, since we're more likely to encounter
* installations of that to migrate, rather than our own (if we dont use the table).
*/
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, FUNCTIONS), NAME, FUNCTIONS,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"function_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_names", list_type_impl::get_instance(utf8_type, true)},
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"body", utf8_type},
{"language", utf8_type},
{"return_type", utf8_type},
{"called_on_null_input", boolean_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined type definitions"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::legacy::aggregates() {
static thread_local auto schema = [] {
schema_builder builder(generate_legacy_id(NAME, AGGREGATES), NAME, AGGREGATES,
// partition key
{{"keyspace_name", utf8_type}},
// clustering key
{{"aggregate_name", utf8_type},{"signature", list_type_impl::get_instance(utf8_type, false)}},
// regular columns
{
{"argument_types", list_type_impl::get_instance(utf8_type, true)},
{"final_func", utf8_type},
{"initcond", bytes_type},
{"return_type", utf8_type},
{"state_func", utf8_type},
{"state_type", utf8_type},
},
// static columns
{},
// regular column name type
utf8_type,
// comment
"*DEPRECATED* user defined aggregate definition"
);
builder.set_gc_grace_seconds(schema_gc_grace);
builder.with(schema_builder::compact_storage::no);
builder.with_hash_version();
return builder.build();
}();
return schema;
}
schema_ptr system_keyspace::dicts() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, DICTS);
@@ -1379,7 +1667,7 @@ schema_ptr system_keyspace::view_building_tasks() {
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("id", timeuuid_type, column_kind::clustering_key)
.with_column("type", utf8_type)
.with_column("aborted", boolean_type)
.with_column("state", utf8_type)
.with_column("base_id", uuid_type)
.with_column("view_id", uuid_type)
.with_column("last_token", long_type)
@@ -2330,6 +2618,13 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
r.insert(r.end(), {sstables_registry()});
}
// legacy schema
r.insert(r.end(), {
// TODO: once we migrate hints/batchlog and add converter
// legacy::hints(), legacy::batchlog(),
legacy::keyspaces(), legacy::column_families(),
legacy::columns(), legacy::triggers(), legacy::usertypes(),
legacy::functions(), legacy::aggregates(), });
return r;
}
@@ -2767,14 +3062,14 @@ future<mutation> system_keyspace::make_remove_view_build_status_on_host_mutation
static constexpr auto VIEW_BUILDING_KEY = "view_building";
future<db::view::building_tasks> system_keyspace::get_view_building_tasks() {
static const sstring query = format("SELECT id, type, aborted, base_id, view_id, last_token, host_id, shard FROM {}.{} WHERE key = '{}'", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
static const sstring query = format("SELECT id, type, state, base_id, view_id, last_token, host_id, shard FROM {}.{} WHERE key = '{}'", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
using namespace db::view;
building_tasks tasks;
co_await _qp.query_internal(query, [&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
auto id = row.get_as<utils::UUID>("id");
auto type = task_type_from_string(row.get_as<sstring>("type"));
auto aborted = row.get_as<bool>("aborted");
auto state = task_state_from_string(row.get_as<sstring>("state"));
auto base_id = table_id(row.get_as<utils::UUID>("base_id"));
auto view_id = row.get_opt<utils::UUID>("view_id").transform([] (const utils::UUID& uuid) { return table_id(uuid); });
auto last_token = dht::token::from_int64(row.get_as<int64_t>("last_token"));
@@ -2782,7 +3077,7 @@ future<db::view::building_tasks> system_keyspace::get_view_building_tasks() {
auto shard = unsigned(row.get_as<int32_t>("shard"));
locator::tablet_replica replica{host_id, shard};
view_building_task task{id, type, aborted, base_id, view_id, replica, last_token};
view_building_task task{id, type, state, base_id, view_id, replica, last_token};
switch (type) {
case db::view::view_building_task::task_type::build_range:
@@ -2801,7 +3096,7 @@ future<db::view::building_tasks> system_keyspace::get_view_building_tasks() {
}
future<mutation> system_keyspace::make_view_building_task_mutation(api::timestamp_type ts, const db::view::view_building_task& task) {
static const sstring stmt = format("INSERT INTO {}.{}(key, id, type, aborted, base_id, view_id, last_token, host_id, shard) VALUES ('{}', ?, ?, ?, ?, ?, ?, ?, ?)", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
static const sstring stmt = format("INSERT INTO {}.{}(key, id, type, state, base_id, view_id, last_token, host_id, shard) VALUES ('{}', ?, ?, ?, ?, ?, ?, ?, ?)", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
using namespace db::view;
data_value_or_unset view_id = unset_value{};
@@ -2812,7 +3107,7 @@ future<mutation> system_keyspace::make_view_building_task_mutation(api::timestam
view_id = data_value(task.view_id->uuid());
}
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {
task.id, task_type_to_sstring(task.type), task.aborted,
task.id, task_type_to_sstring(task.type), task_state_to_sstring(task.state),
task.base_id.uuid(), view_id, dht::token::to_int64(task.last_token),
task.replica.host.uuid(), int32_t(task.replica.shard)
});
@@ -2822,6 +3117,18 @@ future<mutation> system_keyspace::make_view_building_task_mutation(api::timestam
co_return std::move(muts[0]);
}
future<mutation> system_keyspace::make_update_view_building_task_state_mutation(api::timestamp_type ts, utils::UUID id, db::view::view_building_task::task_state state) {
static const sstring stmt = format("UPDATE {}.{} SET state = ? WHERE key = '{}' AND id = ?", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);
auto muts = co_await _qp.get_mutations_internal(stmt, internal_system_query_state(), ts, {
task_state_to_sstring(state), id
});
if (muts.size() != 1) {
on_internal_error(slogger, fmt::format("expected 1 mutation got {}", muts.size()));
}
co_return std::move(muts[0]);
}
future<mutation> system_keyspace::make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id) {
static const sstring stmt = format("DELETE FROM {}.{} WHERE key = '{}' AND id = ?", NAME, VIEW_BUILDING_TASKS, VIEW_BUILDING_KEY);

View File

@@ -241,6 +241,28 @@ public:
static schema_ptr cdc_local();
};
struct legacy {
static constexpr auto HINTS = "hints";
static constexpr auto BATCHLOG = "batchlog";
static constexpr auto KEYSPACES = "schema_keyspaces";
static constexpr auto COLUMNFAMILIES = "schema_columnfamilies";
static constexpr auto COLUMNS = "schema_columns";
static constexpr auto TRIGGERS = "schema_triggers";
static constexpr auto USERTYPES = "schema_usertypes";
static constexpr auto FUNCTIONS = "schema_functions";
static constexpr auto AGGREGATES = "schema_aggregates";
static schema_ptr keyspaces();
static schema_ptr column_families();
static schema_ptr columns();
static schema_ptr triggers();
static schema_ptr usertypes();
static schema_ptr functions();
static schema_ptr aggregates();
static schema_ptr hints();
static schema_ptr batchlog();
};
// Partition estimates for a given range of tokens.
struct range_estimates {
schema_ptr schema;
@@ -554,6 +576,7 @@ public:
// system.view_building_tasks
future<db::view::building_tasks> get_view_building_tasks();
future<mutation> make_view_building_task_mutation(api::timestamp_type ts, const db::view::view_building_task& task);
future<mutation> make_update_view_building_task_state_mutation(api::timestamp_type ts, utils::UUID id, db::view::view_building_task::task_state state);
future<mutation> make_remove_view_building_task_mutation(api::timestamp_type ts, utils::UUID id);
// system.scylla_local, view_building_processing_base key

View File

@@ -9,8 +9,6 @@
#include "query/query-result-reader.hh"
#include "replica/database_fwd.hh"
#include "db/timeout_clock.hh"
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
namespace service {
class storage_proxy;
@@ -27,14 +25,8 @@ class delete_ghost_rows_visitor {
replica::table& _view_table;
schema_ptr _base_schema;
std::optional<partition_key> _view_pk;
db::timeout_semaphore _concurrency_semaphore;
seastar::gate _gate;
std::exception_ptr& _ex;
public:
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex);
delete_ghost_rows_visitor(delete_ghost_rows_visitor&&) = default;
~delete_ghost_rows_visitor() noexcept;
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration);
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
}
@@ -53,9 +45,6 @@ public:
uint32_t accept_partition_end(const query::result_row_view& static_row) {
return 0;
}
private:
future<> do_accept_new_row(partition_key pk, clustering_key ck);
};
} //namespace db::view

View File

@@ -1744,115 +1744,6 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
&& std::ranges::contains(shards, this_shard_id());
}
static endpoints_to_update get_view_natural_endpoint_vnodes(
locator::host_id me,
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
locator::endpoint_dc_rack my_location,
const locator::network_topology_strategy* network_topology,
replica::cf_stats& cf_stats) {
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector base_endpoints, view_endpoints;
auto& my_datacenter = my_location.dc;
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (!network_topology || node.get().dc() == my_datacenter) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
view_endpoints.push_back(view_node);
}
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
}
static std::optional<locator::host_id> get_unpaired_view_endpoint(
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
replica::cf_stats& cf_stats) {
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
for (auto&& base_node : base_nodes) {
if (base_dc_racks.contains(base_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
base_node.get().dc(), base_node.get().rack());
return std::nullopt;
}
base_dc_racks.insert(base_node.get().dc_rack());
}
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
for (auto&& view_node : view_nodes) {
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
view_node.get().dc(), view_node.get().rack());
return std::nullopt;
}
// Track unpaired replicas in both sets
if (base_dc_racks.contains(view_node.get().dc_rack())) {
paired_view_dc_racks.insert(view_node.get().dc_rack());
} else {
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
}
}
if (unpaired_view_dc_rack_replicas.size() > 0) {
// There are view replicas that can't be paired with any base replica
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
if (unpaired_view_dc_rack_replicas.size() > 0) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
unpaired_view_dc_rack_replicas | std::views::values);
}
return extra_replica;
}
return std::nullopt;
}
// Calculate the node ("natural endpoint") to which this node should send
// a view update.
//
@@ -1865,19 +1756,29 @@ static std::optional<locator::host_id> get_unpaired_view_endpoint(
// of this function is to find, assuming that this node is one of the base
// replicas for a given partition, the paired view replica.
//
// When using vnodes, we have an optimization called "self-pairing" - if a single
// node is both a base replica and a view replica for a write, the pairing is
// modified so that this node sends the update to itself and this node is removed
// from the lists of nodes paired by index. This self-pairing optimization can
// cause the pairing to change after view ranges are moved between nodes.
// In the past, we used an optimization called "self-pairing" that if a single
// node was both a base replica and a view replica for a write, the pairing is
// modified so that this node would send the update to itself. This self-
// pairing optimization could cause the pairing to change after view ranges
// are moved between nodes, so currently we only use it if
// use_legacy_self_pairing is set to true. When using tablets - where range
// movements are common - it is strongly recommended to set it to false.
//
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
// we pair only nodes in the same datacenter.
//
// If the table uses tablets, then pairing is rack-aware. In this case, in each
// rack where we have a base replica there is also one replica of each view tablet.
// Therefore, the base replicas are naturally paired with the view replicas that
// are in the same rack.
// When use_legacy_self_pairing is enabled, if one of the base replicas
// also happens to be a view replica, it is paired with itself
// (with the other nodes paired by order in the list
// after taking this node out).
//
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
// and the replication factor in the node's datacenter is a multiple of the number
// of racks in the datacenter, then pairing is rack-aware. In this case,
// all racks have the same number of replicas, and those are never migrated
// outside their racks. Therefore, the base replicas are naturally paired with the
// view replicas that are in the same rack, based on the ordinal position.
// Note that typically, there is a single replica per rack and pairing is trivial.
//
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
@@ -1905,12 +1806,19 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_rack_aware_view_pairing,
replica::cf_stats& cf_stats) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
auto& my_location = topology.get_location(me);
auto& my_datacenter = my_location.dc;
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
bool simple_rack_aware_pairing = false;
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector orig_base_endpoints, orig_view_endpoints;
node_vector base_endpoints, view_endpoints;
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
if (auto* np = topology.find_node(ep)) {
@@ -1921,7 +1829,6 @@ endpoints_to_update get_view_natural_endpoint(
// We need to use get_replicas() for pairing to be stable in case base or view tablet
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
return resolve(topology, ep, false);
}) | std::ranges::to<node_vector>();
@@ -1945,43 +1852,231 @@ endpoints_to_update get_view_natural_endpoint(
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
auto leaving_base = it->get().host_id();
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
view_token, use_tablets, cf_stats);
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
}
}
}
if (!use_tablets) {
return get_view_natural_endpoint_vnodes(
me,
base_nodes,
view_nodes,
my_location,
network_topology,
cf_stats);
std::function<bool(const locator::node&)> is_candidate;
if (network_topology) {
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
} else {
is_candidate = [&] (const locator::node&) { return true; };
}
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (is_candidate(node)) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
std::optional<locator::host_id> paired_replica;
for (auto&& view_node : view_nodes) {
if (view_node.get().dc_rack() == my_location) {
paired_replica = view_node.get().host_id();
break;
if (use_legacy_self_pairing) {
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (is_candidate(view_node)) {
view_endpoints.push_back(view_node);
}
}
} else {
for (auto&& view_node : view_nodes) {
process_candidate(view_endpoints, view_node);
}
}
if (paired_replica && base_nodes.size() == view_nodes.size()) {
// We don't need to find any extra replicas, so we can return early
return {.natural_endpoint = paired_replica};
// Try optimizing for simple rack-aware pairing
// If the numbers of base and view replica differ, that means an RF change is taking place
// and we can't use simple rack-aware pairing.
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
// Simple rack-aware pairing is possible when the datacenter replication factor
// is a multiple of the number of racks in the datacenter.
if (dc_rf % racks.size() == 0) {
simple_rack_aware_pairing = true;
size_t rack_rf = dc_rf / racks.size();
// If any rack doesn't have enough nodes to satisfy the per-rack rf
// simple rack-aware pairing is disabled.
for (const auto& [rack, nodes] : racks) {
if (nodes.size() < rack_rf) {
simple_rack_aware_pairing = false;
break;
}
}
}
if (dc_rf != base_endpoints.size()) {
// If the datacenter replication factor is not equal to the number of base replicas,
// we're in progress of a RF change and we can't use simple rack-aware pairing.
simple_rack_aware_pairing = false;
}
if (simple_rack_aware_pairing) {
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
}
}
if (!paired_replica) {
// We couldn't find any view replica in our rack
orig_base_endpoints = base_endpoints;
orig_view_endpoints = view_endpoints;
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
// Use best-match, for the minimum number of base and view replicas in each rack,
// and ordinal match for the rest.
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
if (rack_aware_pairing && !simple_rack_aware_pairing) {
struct indexed_replica {
size_t idx;
std::reference_wrapper<const locator::node> node;
};
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
// First, index all replicas by rack
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
size_t idx = 0;
for (const auto& r: replicas) {
racks[r.get().rack()].emplace_back(idx++, r);
}
};
index_replica_set(base_racks, base_endpoints);
index_replica_set(view_racks, view_endpoints);
// Try optimistically pairing `me` first
const auto& my_base_replicas = base_racks[my_location.rack];
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
if (base_it == my_base_replicas.end()) {
return {};
}
const auto& my_view_replicas = view_racks[my_location.rack];
size_t idx = base_it - my_base_replicas.begin();
if (idx < my_view_replicas.size()) {
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
} else {
// If the number of view replicas is larger than the number of base replicas,
// we need to find the unpaired view replica, so we can't return yet.
paired_replica = my_view_replicas[idx].node;
}
}
// Collect all unpaired base and view replicas,
// where the number of replicas in the base rack is different than the respective view rack
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
for (const auto& [rack, base_replicas] : base_racks) {
const auto& view_replicas = view_racks[rack];
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
unpaired_base_replicas.emplace_back(base_replicas[i]);
}
}
for (const auto& [rack, view_replicas] : view_racks) {
const auto& base_replicas = base_racks[rack];
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
unpaired_view_replicas.emplace_back(view_replicas[i]);
}
}
// Sort by the original ordinality, and copy the sorted results
// back into {base,view}_endpoints, for backward compatible processing below.
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
base_endpoints.clear();
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
view_endpoints.clear();
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (!paired_replica && base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
if (!paired_replica && idx >= view_endpoints.size()) {
// There are fewer view replicas than base replicas
// FIXME: This might still happen when reducing replication factor with tablets,
// see https://github.com/scylladb/scylladb/issues/21492
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
me,
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
} else if (base_endpoints.size() < view_endpoints.size()) {
// There are fewer base replicas than view replicas.
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_endpoints.back();
if (base_endpoints.size() < view_endpoints.size() - 1) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
}
}
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
return {.natural_endpoint = paired_replica,
.endpoint_with_no_pairing = no_pairing_replica};
if (!paired_replica) {
paired_replica = view_endpoints[idx];
}
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
// This can happen when the view replica with no pairing is in another DC.
// We need to send an update to it if there are no base replicas in that DC yet,
// as it won't receive updates otherwise.
std::unordered_set<sstring> dcs_with_base_replicas;
for (const auto& base_node : base_nodes) {
dcs_with_base_replicas.insert(base_node.get().dc());
}
for (const auto& view_node : view_nodes) {
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_node;
break;
}
}
}
// https://github.com/scylladb/scylladb/issues/19439
// With tablets, a node being replaced might transition to "left" state
// but still be kept as a replica.
// As of writing this hints are not prepared to handle nodes that are left
// but are still replicas. Therefore, there is no other sensible option
// right now but to give up attempt to send the update or write a hint
// to the paired, permanently down replica.
// We use the same workaround for the extra replica.
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
if (!replica) {
return std::nullopt;
}
const auto& node = replica->get();
if (!node.left()) {
return node.host_id();
} else {
return std::nullopt;
}
};
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
}
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
@@ -2041,6 +2136,12 @@ future<> view_update_generator::mutate_MV(
{
auto& ks = _db.find_keyspace(base->ks_name());
auto& replication = ks.get_replication_strategy();
// We set legacy self-pairing for old vnode-based tables (for backward
// compatibility), and unset it for tablets - where range movements
// are more frequent and backward compatibility is less important.
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
// on a view, like we have the synchronous_updates_flag.
bool use_legacy_self_pairing = !ks.uses_tablets();
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
auto get_erm = [&] (table_id id) {
auto it = erms.find(id);
@@ -2053,6 +2154,10 @@ future<> view_update_generator::mutate_MV(
for (const auto& mut : view_updates) {
(void)get_erm(mut.s->id());
}
// Enable rack-aware view updates pairing for tablets
// when the cluster feature is enabled so that all replicas agree
// on the pairing algorithm.
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
auto me = base_ermp->get_topology().my_host_id();
static constexpr size_t max_concurrent_updates = 128;
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
@@ -2060,7 +2165,7 @@ future<> view_update_generator::mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto view_ermp = erms.at(mut.s->id());
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
ks.uses_tablets(), cf_stats);
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
if (no_pairing_endpoint) {
@@ -3492,7 +3597,7 @@ view_updating_consumer::view_updating_consumer(view_update_generator& gen, schem
})
{ }
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration, size_t concurrency, std::exception_ptr& ex)
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration)
: _proxy(proxy)
, _state(state)
, _timeout_duration(timeout_duration)
@@ -3500,20 +3605,8 @@ delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& pro
, _view_table(_proxy.get_db().local().find_column_family(view))
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
, _view_pk()
, _concurrency_semaphore(concurrency)
, _ex(ex)
{}
delete_ghost_rows_visitor::~delete_ghost_rows_visitor() noexcept {
try {
_gate.close().get();
} catch (...) {
// Closing the gate should never throw, but if it does anyway, capture the exception.
_ex = std::current_exception();
}
}
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
SCYLLA_ASSERT(thread::running_in_thread());
_view_pk = key;
@@ -3521,18 +3614,7 @@ void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, u
// Assumes running in seastar::thread
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
auto units = get_units(_concurrency_semaphore, 1).get();
(void)seastar::try_with_gate(_gate, [this, pk = _view_pk.value(), units = std::move(units), ck] () mutable {
return do_accept_new_row(std::move(pk), std::move(ck)).then_wrapped([this, units = std::move(units)] (future<>&& f) mutable {
if (f.failed()) {
_ex = f.get_exception();
}
});
});
}
future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clustering_key ck) {
auto view_exploded_pk = pk.explode();
auto view_exploded_pk = _view_pk->explode();
auto view_exploded_ck = ck.explode();
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
@@ -3567,17 +3649,17 @@ future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clusteri
_proxy.get_max_result_size(partition_slice), query::tombstone_limit(_proxy.get_tombstone_limit()));
auto timeout = db::timeout_clock::now() + _timeout_duration;
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
auto base_qr = co_await _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts);
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get();
query::result& result = *base_qr.query_result;
auto delete_ghost_row = [&]() -> future<> {
mutation m(_view, pk);
auto delete_ghost_row = [&]() {
mutation m(_view, *_view_pk);
auto& row = m.partition().clustered_row(*_view, ck);
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
timeout = db::timeout_clock::now() + _timeout_duration;
return _proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no);
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit(), db::allow_per_partition_rate_limit::no).get();
};
if (result.row_count().value_or(0) == 0) {
co_await delete_ghost_row();
delete_ghost_row();
} else if (!view_key_cols_not_in_base_key.empty()) {
if (result.row_count().value_or(0) != 1) {
on_internal_error(vlogger, format("Got multiple base rows corresponding to a single view row when pruning {}.{}", _view->ks_name(), _view->cf_name()));
@@ -3587,7 +3669,7 @@ future<> delete_ghost_rows_visitor::do_accept_new_row(partition_key pk, clusteri
for (const auto& [col_def, col_val] : view_key_cols_not_in_base_key) {
const data_value* base_val = base_row.get_data_value(col_def->name_as_text());
if (!base_val || base_val->is_null() || col_val != base_val->serialize_nonnull()) {
co_await delete_ghost_row();
delete_ghost_row();
break;
}
}

View File

@@ -305,7 +305,8 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_basic_rack_aware_view_pairing,
replica::cf_stats& cf_stats);
/// Verify that the provided keyspace is eligible for storing materialized views.

View File

@@ -104,8 +104,6 @@ future<> view_building_coordinator::run() {
_vb_sm.event.broadcast();
});
auto finished_tasks_gc_fiber = finished_task_gc_fiber();
while (!_as.abort_requested()) {
co_await utils::get_local_injector().inject("view_building_coordinator_pause_main_loop", utils::wait_for_message(std::chrono::minutes(2)));
if (utils::get_local_injector().enter("view_building_coordinator_skip_main_loop")) {
@@ -123,7 +121,12 @@ future<> view_building_coordinator::run() {
continue;
}
co_await work_on_view_building(std::move(*guard_opt));
auto started_new_work = co_await work_on_view_building(std::move(*guard_opt));
if (started_new_work) {
// If any tasks were started, do another iteration, so the coordinator can attach itself to the tasks (via RPC)
vbc_logger.debug("view building coordinator started new tasks, do next iteration without waiting for event");
continue;
}
co_await await_event();
} catch (...) {
handle_coordinator_error(std::current_exception());
@@ -139,66 +142,6 @@ future<> view_building_coordinator::run() {
}
}
}
co_await std::move(finished_tasks_gc_fiber);
}
future<> view_building_coordinator::finished_task_gc_fiber() {
static auto task_gc_interval = 200ms;
while (!_as.abort_requested()) {
try {
co_await clean_finished_tasks();
co_await sleep_abortable(task_gc_interval, _as);
} catch (abort_requested_exception&) {
vbc_logger.debug("view_building_coordinator::finished_task_gc_fiber got abort_requested_exception");
} catch (service::group0_concurrent_modification&) {
vbc_logger.info("view_building_coordinator::finished_task_gc_fiber got group0_concurrent_modification");
} catch (raft::request_aborted&) {
vbc_logger.debug("view_building_coordinator::finished_task_gc_fiber got raft::request_aborted");
} catch (service::term_changed_error&) {
vbc_logger.debug("view_building_coordinator::finished_task_gc_fiber notices term change {} -> {}", _term, _raft.get_current_term());
} catch (raft::commit_status_unknown&) {
vbc_logger.warn("view_building_coordinator::finished_task_gc_fiber got raft::commit_status_unknown");
} catch (...) {
vbc_logger.error("view_building_coordinator::finished_task_gc_fiber got error: {}", std::current_exception());
}
}
}
future<> view_building_coordinator::clean_finished_tasks() {
// Avoid acquiring a group0 operation if there are no tasks.
if (_finished_tasks.empty()) {
co_return;
}
auto guard = co_await start_operation();
auto lock = co_await get_unique_lock(_mutex);
if (!_vb_sm.building_state.currently_processed_base_table || std::ranges::all_of(_finished_tasks, [] (auto& e) { return e.second.empty(); })) {
co_return;
}
view_building_task_mutation_builder builder(guard.write_timestamp());
for (auto& [replica, tasks]: _finished_tasks) {
for (auto& task_id: tasks) {
// The task might be aborted in the meantime. In this case we cannot remove it because we need it to create a new task.
//
// TODO: When we're aborting a view building task (for instance due to tablet migration),
// we can look if we already finished it (check if it's in `_finished_tasks`).
// If yes, we can just remove it instead of aborting it.
auto task_opt = _vb_sm.building_state.get_task(*_vb_sm.building_state.currently_processed_base_table, replica, task_id);
if (task_opt && !task_opt->get().aborted) {
builder.del_task(task_id);
vbc_logger.debug("Removing finished task with ID: {}", task_id);
}
}
}
co_await commit_mutations(std::move(guard), {builder.build()}, "remove finished view building tasks");
for (auto& [_, tasks_set]: _finished_tasks) {
tasks_set.clear();
}
}
future<std::optional<service::group0_guard>> view_building_coordinator::update_state(service::group0_guard guard) {
@@ -358,16 +301,18 @@ future<> view_building_coordinator::update_views_statuses(const service::group0_
}
}
future<> view_building_coordinator::work_on_view_building(service::group0_guard guard) {
future<bool> view_building_coordinator::work_on_view_building(service::group0_guard guard) {
if (!_vb_sm.building_state.currently_processed_base_table) {
vbc_logger.debug("No base table is selected, nothing to do.");
co_return;
co_return false;
}
// Acquire unique lock of `_finished_tasks` to ensure each replica has its own entry in it
// and to select tasks for them.
auto lock = co_await get_unique_lock(_mutex);
utils::chunked_vector<mutation> muts;
std::unordered_set<locator::tablet_replica> _remote_work_keys_to_erase;
for (auto& replica: get_replicas_with_tasks()) {
// Check whether the coordinator already waits for the remote work on the replica to be finished.
// If so: check if the work is done and and remove the shared_future, skip this replica otherwise.
bool skip_work_on_this_replica = false;
if (_remote_work.contains(replica)) {
if (!_remote_work[replica].available()) {
vbc_logger.debug("Replica {} is still doing work", replica);
@@ -375,7 +320,21 @@ future<> view_building_coordinator::work_on_view_building(service::group0_guard
}
auto remote_results_opt = co_await _remote_work[replica].get_future();
_remote_work.erase(replica);
if (remote_results_opt) {
auto results_muts = co_await update_state_after_work_is_done(guard, replica, std::move(*remote_results_opt));
muts.insert(muts.end(), std::make_move_iterator(results_muts.begin()), std::make_move_iterator(results_muts.end()));
// If the replica successfully finished its work, we need to commit mutations generated above before selecting next task
skip_work_on_this_replica = !results_muts.empty();
}
// If there were no mutations for this replica, we can just remove the entry from `_remote_work` map
// and start new work in the same iteration.
// Otherwise, the entry needs to be removed after the mutations are committed successfully.
if (skip_work_on_this_replica) {
_remote_work_keys_to_erase.insert(replica);
} else {
_remote_work.erase(replica);
}
}
const bool ignore_gossiper = utils::get_local_injector().enter("view_building_coordinator_ignore_gossiper");
@@ -384,16 +343,31 @@ future<> view_building_coordinator::work_on_view_building(service::group0_guard
continue;
}
if (!_finished_tasks.contains(replica)) {
_finished_tasks.insert({replica, {}});
if (skip_work_on_this_replica) {
continue;
}
if (auto todo_ids = select_tasks_for_replica(replica); !todo_ids.empty()) {
start_remote_worker(replica, std::move(todo_ids));
if (auto already_started_ids = _vb_sm.building_state.get_started_tasks(*_vb_sm.building_state.currently_processed_base_table, replica); !already_started_ids.empty()) {
// If the replica has any task in `STARTED` state, attach the coordinator to the work.
attach_to_started_tasks(replica, std::move(already_started_ids));
} else if (auto todo_ids = select_tasks_for_replica(replica); !todo_ids.empty()) {
// If the replica has no started tasks and there are tasks to do, mark them as started.
// The coordinator will attach itself to the work in next iteration.
auto new_mutations = co_await start_tasks(guard, std::move(todo_ids));
muts.insert(muts.end(), std::make_move_iterator(new_mutations.begin()), std::make_move_iterator(new_mutations.end()));
} else {
vbc_logger.debug("Nothing to do for replica {}", replica);
}
}
if (!muts.empty()) {
co_await commit_mutations(std::move(guard), std::move(muts), "start view building tasks");
for (auto& key: _remote_work_keys_to_erase) {
_remote_work.erase(key);
}
co_return true;
}
co_return false;
}
std::set<locator::tablet_replica> view_building_coordinator::get_replicas_with_tasks() {
@@ -416,7 +390,7 @@ std::vector<utils::UUID> view_building_coordinator::select_tasks_for_replica(loc
// Select only building tasks and return theirs ids
auto filter_building_tasks = [] (const std::vector<view_building_task>& tasks) -> std::vector<utils::UUID> {
return tasks | std::views::filter([] (const view_building_task& t) {
return t.type == view_building_task::task_type::build_range && !t.aborted;
return t.type == view_building_task::task_type::build_range && t.state == view_building_task::task_state::idle;
}) | std::views::transform([] (const view_building_task& t) {
return t.id;
}) | std::ranges::to<std::vector>();
@@ -430,29 +404,7 @@ std::vector<utils::UUID> view_building_coordinator::select_tasks_for_replica(loc
}
auto& tablet_map = _db.get_token_metadata().tablets().get_tablet_map(*_vb_sm.building_state.currently_processed_base_table);
auto tasks_by_last_token = _vb_sm.building_state.collect_tasks_by_last_token(*_vb_sm.building_state.currently_processed_base_table, replica);
// Remove completed tasks in `_finished_tasks` from `tasks_by_last_token`
auto it = tasks_by_last_token.begin();
while (it != tasks_by_last_token.end()) {
auto task_it = it->second.begin();
while (task_it != it->second.end()) {
if (_finished_tasks.at(replica).contains(task_it->id)) {
task_it = it->second.erase(task_it);
} else {
++task_it;
}
}
// Remove the entry from `tasks_by_last_token` if its vector is empty
if (it->second.empty()) {
it = tasks_by_last_token.erase(it);
} else {
++it;
}
}
for (auto& [token, tasks]: tasks_by_last_token) {
for (auto& [token, tasks]: _vb_sm.building_state.collect_tasks_by_last_token(*_vb_sm.building_state.currently_processed_base_table, replica)) {
auto tid = tablet_map.get_tablet_id(token);
if (tablet_map.get_tablet_transition_info(tid)) {
vbc_logger.debug("Tablet {} on replica {} is in transition.", tid, replica);
@@ -464,7 +416,7 @@ std::vector<utils::UUID> view_building_coordinator::select_tasks_for_replica(loc
return building_tasks;
} else {
return tasks | std::views::filter([] (const view_building_task& t) {
return !t.aborted;
return t.state == view_building_task::task_state::idle;
}) | std::views::transform([] (const view_building_task& t) {
return t.id;
}) | std::ranges::to<std::vector>();
@@ -474,21 +426,32 @@ std::vector<utils::UUID> view_building_coordinator::select_tasks_for_replica(loc
return {};
}
void view_building_coordinator::start_remote_worker(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks) {
future<utils::chunked_vector<mutation>> view_building_coordinator::start_tasks(const service::group0_guard& guard, std::vector<utils::UUID> tasks) {
vbc_logger.info("Starting tasks {}", tasks);
utils::chunked_vector<mutation> muts;
for (auto& t: tasks) {
auto mut = co_await _sys_ks.make_update_view_building_task_state_mutation(guard.write_timestamp(), t, view_building_task::task_state::started);
muts.push_back(std::move(mut));
}
co_return muts;
}
void view_building_coordinator::attach_to_started_tasks(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks) {
vbc_logger.debug("Attaching to started tasks {} on replica {}", tasks, replica);
shared_future<std::optional<std::vector<utils::UUID>>> work = work_on_tasks(replica, std::move(tasks));
shared_future<std::optional<remote_work_results>> work = work_on_tasks(replica, std::move(tasks));
_remote_work.insert({replica, std::move(work)});
}
future<std::optional<std::vector<utils::UUID>>> view_building_coordinator::work_on_tasks(locator::tablet_replica replica, std::vector<utils::UUID> tasks) {
future<std::optional<view_building_coordinator::remote_work_results>> view_building_coordinator::work_on_tasks(locator::tablet_replica replica, std::vector<utils::UUID> tasks) {
constexpr auto backoff_duration = std::chrono::seconds(1);
static thread_local logger::rate_limit rate_limit{backoff_duration};
std::vector<utils::UUID> remote_results;
std::vector<view_task_result> remote_results;
bool rpc_failed = false;
try {
remote_results = co_await ser::view_rpc_verbs::send_work_on_view_building_tasks(&_messaging, replica.host, _as, _term, replica.shard, tasks);
remote_results = co_await ser::view_rpc_verbs::send_work_on_view_building_tasks(&_messaging, replica.host, _as, tasks);
} catch (...) {
vbc_logger.log(log_level::warn, rate_limit, "Work on tasks {} on replica {}, failed with error: {}",
tasks, replica, std::current_exception());
@@ -501,14 +464,44 @@ future<std::optional<std::vector<utils::UUID>>> view_building_coordinator::work_
co_return std::nullopt;
}
// In `view_building_coordinator::work_on_view_building()` we made sure that,
// each replica has its own entry in the `_finished_tasks`, so now we can just take a shared lock
// and insert its of finished tasks to this replica bucket as there is at most one instance of this method for each replica.
auto lock = co_await get_shared_lock(_mutex);
_finished_tasks.at(replica).insert_range(remote_results);
if (tasks.size() != remote_results.size()) {
on_internal_error(vbc_logger, fmt::format("Number of tasks ({}) and results ({}) do not match for replica {}", tasks.size(), remote_results.size(), replica));
}
remote_work_results results;
for (size_t i = 0; i < tasks.size(); ++i) {
results.push_back({tasks[i], remote_results[i]});
}
_vb_sm.event.broadcast();
co_return remote_results;
co_return results;
}
// Mark finished task as done (remove them from the table).
// Retry failed tasks if possible (if failed tasks wasn't aborted).
future<utils::chunked_vector<mutation>> view_building_coordinator::update_state_after_work_is_done(const service::group0_guard& guard, const locator::tablet_replica& replica, view_building_coordinator::remote_work_results results) {
vbc_logger.debug("Got results from replica {}: {}", replica, results);
utils::chunked_vector<mutation> muts;
for (auto& result: results) {
vbc_logger.info("Task {} was finished with result: {}", result.first, result.second);
if (!_vb_sm.building_state.currently_processed_base_table) {
continue;
}
// A task can be aborted by deleting it or by setting its state to `ABORTED`.
// If the task was aborted by changing the state,
// we shouldn't remove it here because it might be needed
// to generate updated after tablet operation (migration/resize)
// is finished.
auto task_opt = _vb_sm.building_state.get_task(*_vb_sm.building_state.currently_processed_base_table, replica, result.first);
if (task_opt && task_opt->get().state != view_building_task::task_state::aborted) {
// Otherwise, the task was completed successfully and we can remove it.
auto delete_mut = co_await _sys_ks.make_remove_view_building_task_mutation(guard.write_timestamp(), result.first);
muts.push_back(std::move(delete_mut));
}
}
co_return muts;
}
future<> view_building_coordinator::stop() {
@@ -538,7 +531,7 @@ void view_building_coordinator::generate_tablet_migration_updates(utils::chunked
auto create_task_copy_on_pending_replica = [&] (const view_building_task& task) {
auto new_id = builder.new_id();
builder.set_type(new_id, task.type)
.set_aborted(new_id, false)
.set_state(new_id, view_building_task::task_state::idle)
.set_base_id(new_id, task.base_id)
.set_last_token(new_id, task.last_token)
.set_replica(new_id, *trinfo.pending_replica);
@@ -606,7 +599,7 @@ void view_building_coordinator::generate_tablet_resize_updates(utils::chunked_ve
auto create_task_copy = [&] (const view_building_task& task, dht::token last_token) -> utils::UUID {
auto new_id = builder.new_id();
builder.set_type(new_id, task.type)
.set_aborted(new_id, false)
.set_state(new_id, view_building_task::task_state::idle)
.set_base_id(new_id, task.base_id)
.set_last_token(new_id, last_token)
.set_replica(new_id, task.replica);
@@ -675,7 +668,7 @@ void view_building_coordinator::abort_tasks(utils::chunked_vector<canonical_muta
auto abort_task_map = [&] (const task_map& task_map) {
for (auto& [id, _]: task_map) {
vbc_logger.debug("Aborting task {}", id);
builder.set_aborted(id, true);
builder.set_state(id, view_building_task::task_state::aborted);
}
};
@@ -705,7 +698,7 @@ void abort_view_building_tasks(const view_building_state_machine& vb_sm,
for (auto& [id, task]: task_map) {
if (task.last_token == last_token) {
vbc_logger.debug("Aborting task {}", id);
builder.set_aborted(id, true);
builder.set_state(id, view_building_task::task_state::aborted);
}
}
};
@@ -721,10 +714,10 @@ void abort_view_building_tasks(const view_building_state_machine& vb_sm,
static void rollback_task_map(view_building_task_mutation_builder& builder, const task_map& task_map) {
for (auto& [id, task]: task_map) {
if (task.aborted) {
if (task.state == view_building_task::task_state::aborted) {
auto new_id = builder.new_id();
builder.set_type(new_id, task.type)
.set_aborted(new_id, false)
.set_state(new_id, view_building_task::task_state::idle)
.set_base_id(new_id, task.base_id)
.set_last_token(new_id, task.last_token)
.set_replica(new_id, task.replica);

View File

@@ -54,9 +54,9 @@ class view_building_coordinator : public service::endpoint_lifecycle_subscriber
const raft::term_t _term;
abort_source& _as;
std::unordered_map<locator::tablet_replica, shared_future<std::optional<std::vector<utils::UUID>>>> _remote_work;
shared_mutex _mutex; // guards `_finished_tasks` field
std::unordered_map<locator::tablet_replica, std::unordered_set<utils::UUID>> _finished_tasks;
using remote_work_results = std::vector<std::pair<utils::UUID, db::view::view_task_result>>;
std::unordered_map<locator::tablet_replica, shared_future<std::optional<remote_work_results>>> _remote_work;
public:
view_building_coordinator(replica::database& db, raft::server& raft, service::raft_group0& group0,
@@ -86,11 +86,9 @@ private:
future<> commit_mutations(service::group0_guard guard, utils::chunked_vector<mutation> mutations, std::string_view description);
void handle_coordinator_error(std::exception_ptr eptr);
future<> finished_task_gc_fiber();
future<> clean_finished_tasks();
future<std::optional<service::group0_guard>> update_state(service::group0_guard guard);
future<> work_on_view_building(service::group0_guard guard);
// Returns if any new tasks were started
future<bool> work_on_view_building(service::group0_guard guard);
future<> mark_view_build_status_started(const service::group0_guard& guard, table_id view_id, utils::chunked_vector<mutation>& out);
future<> mark_all_remaining_view_build_statuses_started(const service::group0_guard& guard, table_id base_id, utils::chunked_vector<mutation>& out);
@@ -99,8 +97,10 @@ private:
std::set<locator::tablet_replica> get_replicas_with_tasks();
std::vector<utils::UUID> select_tasks_for_replica(locator::tablet_replica replica);
void start_remote_worker(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks);
future<std::optional<std::vector<utils::UUID>>> work_on_tasks(locator::tablet_replica replica, std::vector<utils::UUID> tasks);
future<utils::chunked_vector<mutation>> start_tasks(const service::group0_guard& guard, std::vector<utils::UUID> tasks);
void attach_to_started_tasks(const locator::tablet_replica& replica, std::vector<utils::UUID> tasks);
future<std::optional<remote_work_results>> work_on_tasks(locator::tablet_replica replica, std::vector<utils::UUID> tasks);
future<utils::chunked_vector<mutation>> update_state_after_work_is_done(const service::group0_guard& guard, const locator::tablet_replica& replica, remote_work_results results);
};
void abort_view_building_tasks(const db::view::view_building_state_machine& vb_sm,

View File

@@ -13,10 +13,10 @@ namespace db {
namespace view {
view_building_task::view_building_task(utils::UUID id, task_type type, bool aborted, table_id base_id, std::optional<table_id> view_id, locator::tablet_replica replica, dht::token last_token)
view_building_task::view_building_task(utils::UUID id, task_type type, task_state state, table_id base_id, std::optional<table_id> view_id, locator::tablet_replica replica, dht::token last_token)
: id(id)
, type(type)
, aborted(aborted)
, state(state)
, base_id(base_id)
, view_id(view_id)
, replica(replica)
@@ -49,6 +49,30 @@ seastar::sstring task_type_to_sstring(view_building_task::task_type type) {
}
}
view_building_task::task_state task_state_from_string(std::string_view str) {
if (str == "IDLE") {
return view_building_task::task_state::idle;
}
if (str == "STARTED") {
return view_building_task::task_state::started;
}
if (str == "ABORTED") {
return view_building_task::task_state::aborted;
}
throw std::runtime_error(fmt::format("Unknown view building task state: {}", str));
}
seastar::sstring task_state_to_sstring(view_building_task::task_state state) {
switch (state) {
case view_building_task::task_state::idle:
return "IDLE";
case view_building_task::task_state::started:
return "STARTED";
case view_building_task::task_state::aborted:
return "ABORTED";
}
}
std::optional<std::reference_wrapper<const view_building_task>> view_building_state::get_task(table_id base_id, locator::tablet_replica replica, utils::UUID id) const {
if (!tasks_state.contains(base_id) || !tasks_state.at(base_id).contains(replica)) {
return {};
@@ -127,6 +151,46 @@ std::map<dht::token, std::vector<view_building_task>> view_building_state::colle
return tasks;
}
// Returns all tasks for `_vb_sm.building_state.currently_processed_base_table` and `replica` with `STARTED` state.
std::vector<utils::UUID> view_building_state::get_started_tasks(table_id base_table_id, locator::tablet_replica replica) const {
if (!tasks_state.contains(base_table_id) || !tasks_state.at(base_table_id).contains(replica)) {
// No tasks for this replica
return {};
}
std::vector<view_building_task> tasks;
auto& replica_tasks = tasks_state.at(base_table_id).at(replica);
for (auto& [_, view_tasks]: replica_tasks.view_tasks) {
for (auto& [_, task]: view_tasks) {
if (task.state == view_building_task::task_state::started) {
tasks.push_back(task);
}
}
}
for (auto& [_, task]: replica_tasks.staging_tasks) {
if (task.state == view_building_task::task_state::started) {
tasks.push_back(task);
}
}
// All collected tasks should have the same: type, base_id and last_token,
// so they can be executed in the same view_building_worker::batch.
#ifdef SEASTAR_DEBUG
if (!tasks.empty()) {
auto& task = tasks.front();
for (auto& t: tasks) {
SCYLLA_ASSERT(task.type == t.type);
SCYLLA_ASSERT(task.base_id == t.base_id);
SCYLLA_ASSERT(task.last_token == t.last_token);
}
}
#endif
return tasks | std::views::transform([] (const view_building_task& t) {
return t.id;
}) | std::ranges::to<std::vector>();
}
}
}

View File

@@ -39,17 +39,28 @@ struct view_building_task {
process_staging,
};
// When a task is created, it starts with `IDLE` state.
// Then, the view building coordinator will decide to do the task and it will
// set the state to `STARTED`.
// When a task is finished the entry is removed.
//
// If a task is in progress when a tablet operation (migration/resize) starts,
// the task's state is set to `ABORTED`.
enum class task_state {
idle,
started,
aborted,
};
utils::UUID id;
task_type type;
bool aborted;
task_state state;
table_id base_id;
std::optional<table_id> view_id; // nullopt when task_type is `process_staging`
locator::tablet_replica replica;
dht::token last_token;
view_building_task(utils::UUID id, task_type type, bool aborted,
view_building_task(utils::UUID id, task_type type, task_state state,
table_id base_id, std::optional<table_id> view_id,
locator::tablet_replica replica, dht::token last_token);
};
@@ -81,6 +92,7 @@ struct view_building_state {
std::vector<std::reference_wrapper<const view_building_task>> get_tasks_for_host(table_id base_id, locator::host_id host) const;
std::map<dht::token, std::vector<view_building_task>> collect_tasks_by_last_token(table_id base_table_id) const;
std::map<dht::token, std::vector<view_building_task>> collect_tasks_by_last_token(table_id base_table_id, const locator::tablet_replica& replica) const;
std::vector<utils::UUID> get_started_tasks(table_id base_table_id, locator::tablet_replica replica) const;
};
// Represents global state of tablet-based views.
@@ -101,8 +113,18 @@ struct view_building_state_machine {
condition_variable event;
};
struct view_task_result {
enum class command_status: uint8_t {
success = 0,
abort = 1,
};
db::view::view_task_result::command_status status;
};
view_building_task::task_type task_type_from_string(std::string_view str);
seastar::sstring task_type_to_sstring(view_building_task::task_type type);
view_building_task::task_state task_state_from_string(std::string_view str);
seastar::sstring task_state_to_sstring(view_building_task::task_state state);
} // namespace view_building
@@ -114,11 +136,17 @@ template <> struct fmt::formatter<db::view::view_building_task::task_type> : fmt
}
};
template <> struct fmt::formatter<db::view::view_building_task::task_state> : fmt::formatter<string_view> {
auto format(db::view::view_building_task::task_state state, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", db::view::task_state_to_sstring(state));
}
};
template <> struct fmt::formatter<db::view::view_building_task> : fmt::formatter<string_view> {
auto format(db::view::view_building_task task, fmt::format_context& ctx) const {
auto view_id = task.view_id ? fmt::to_string(*task.view_id) : "nullopt";
return fmt::format_to(ctx.out(), "view_building_task{{type: {}, aborted: {}, base_id: {}, view_id: {}, last_token: {}}}",
task.type, task.aborted, task.base_id, view_id, task.last_token);
return fmt::format_to(ctx.out(), "view_building_task{{type: {}, state: {}, base_id: {}, view_id: {}, last_token: {}}}",
task.type, task.state, task.base_id, view_id, task.last_token);
}
};
@@ -133,3 +161,18 @@ template <> struct fmt::formatter<db::view::replica_tasks> : fmt::formatter<stri
return fmt::format_to(ctx.out(), "{{view_tasks: {}, staging_tasks: {}}}", replica_tasks.view_tasks, replica_tasks.staging_tasks);
}
};
template <> struct fmt::formatter<db::view::view_task_result> : fmt::formatter<string_view> {
auto format(db::view::view_task_result result, fmt::format_context& ctx) const {
std::string_view res;
switch (result.status) {
case db::view::view_task_result::command_status::success:
res = "success";
break;
case db::view::view_task_result::command_status::abort:
res = "abort";
break;
}
return format_to(ctx.out(), "{}", res);
}
};

View File

@@ -25,8 +25,8 @@ view_building_task_mutation_builder& view_building_task_mutation_builder::set_ty
_m.set_clustered_cell(get_ck(id), "type", data_value(task_type_to_sstring(type)), _ts);
return *this;
}
view_building_task_mutation_builder& view_building_task_mutation_builder::set_aborted(utils::UUID id, bool aborted) {
_m.set_clustered_cell(get_ck(id), "aborted", data_value(aborted), _ts);
view_building_task_mutation_builder& view_building_task_mutation_builder::set_state(utils::UUID id, db::view::view_building_task::task_state state) {
_m.set_clustered_cell(get_ck(id), "state", data_value(task_state_to_sstring(state)), _ts);
return *this;
}
view_building_task_mutation_builder& view_building_task_mutation_builder::set_base_id(utils::UUID id, table_id base_id) {

View File

@@ -32,7 +32,7 @@ public:
static utils::UUID new_id();
view_building_task_mutation_builder& set_type(utils::UUID id, db::view::view_building_task::task_type type);
view_building_task_mutation_builder& set_aborted(utils::UUID id, bool aborted);
view_building_task_mutation_builder& set_state(utils::UUID id, db::view::view_building_task::task_state state);
view_building_task_mutation_builder& set_base_id(utils::UUID id, table_id base_id);
view_building_task_mutation_builder& set_view_id(utils::UUID id, table_id view_id);
view_building_task_mutation_builder& set_last_token(utils::UUID id, dht::token last_token);

Some files were not shown because too many files have changed in this diff Show More