Compare commits
2 Commits
copilot/en
...
copilot/up
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c222ea8b2f | ||
|
|
9c66f52e46 |
11
.github/copilot-instructions.md
vendored
11
.github/copilot-instructions.md
vendored
@@ -84,14 +84,3 @@ ninja build/<mode>/scylla
|
||||
- Strive for simplicity and clarity, add complexity only when clearly justified
|
||||
- Question requests: don't blindly implement requests - evaluate trade-offs, identify issues, and suggest better alternatives when appropriate
|
||||
- Consider different approaches, weigh pros and cons, and recommend the best fit for the specific context
|
||||
|
||||
## Test Philosophy
|
||||
- Performance matters. Tests should run as quickly as possible. Sleeps in the code are highly discouraged and should be avoided, to reduce run time and flakiness.
|
||||
- Stability matters. Tests should be stable. New tests should be executed 100 times at least to ensure they pass 100 out of 100 times. (use --repeat 100 --max-failures 1 when running it)
|
||||
- Unit tests should ideally test one thing and one thing only.
|
||||
- Tests for bug fixes should run before the fix - and show the failure and after the fix - and show they now pass.
|
||||
- Tests for bug fixes should have in their comments which bug fixes (GitHub or JIRA issue) they test.
|
||||
- Tests in debug are always slower, so if needed, reduce number of iterations, rows, data used, cycles, etc. in debug mode.
|
||||
- Tests should strive to be repeatable, and not use random input that will make their results unpredictable.
|
||||
- Tests should consume as little resources as possible. Prefer running tests on a single node if it is sufficient, for example.
|
||||
|
||||
|
||||
12
.github/workflows/call_jira_status_in_progress.yml
vendored
Normal file
12
.github/workflows/call_jira_status_in_progress.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status In Progress
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened]
|
||||
|
||||
jobs:
|
||||
call-jira-status-in-progress:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
12
.github/workflows/call_jira_status_in_review.yml
vendored
Normal file
12
.github/workflows/call_jira_status_in_review.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status In Review
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [ready_for_review, review_requested]
|
||||
|
||||
jobs:
|
||||
call-jira-status-in-review:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
12
.github/workflows/call_jira_status_ready_for_merge.yml
vendored
Normal file
12
.github/workflows/call_jira_status_ready_for_merge.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
name: Call Jira Status Ready For Merge
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [labeled]
|
||||
|
||||
jobs:
|
||||
call-jira-status-update:
|
||||
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
41
.github/workflows/call_jira_sync.yml
vendored
41
.github/workflows/call_jira_sync.yml
vendored
@@ -1,41 +0,0 @@
|
||||
name: Sync Jira Based on PR Events
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: write
|
||||
issues: write
|
||||
|
||||
jobs:
|
||||
jira-sync-pr-opened:
|
||||
if: github.event.action == 'opened'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-in-review:
|
||||
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-sync-add-label:
|
||||
if: github.event.action == 'labeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-remove-label:
|
||||
if: github.event.action == 'unlabeled'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
|
||||
jira-status-pr-closed:
|
||||
if: github.event.action == 'closed'
|
||||
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
|
||||
secrets:
|
||||
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
|
||||
2
.github/workflows/docs-pages.yaml
vendored
2
.github/workflows/docs-pages.yaml
vendored
@@ -18,8 +18,6 @@ on:
|
||||
|
||||
jobs:
|
||||
release:
|
||||
permissions:
|
||||
contents: write
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
3
.github/workflows/docs-pr.yaml
vendored
3
.github/workflows/docs-pr.yaml
vendored
@@ -2,9 +2,6 @@ name: "Docs / Build PR"
|
||||
# For more information,
|
||||
# see https://sphinx-theme.scylladb.com/stable/deployment/production.html#available-workflows
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
env:
|
||||
FLAG: ${{ github.repository == 'scylladb/scylla-enterprise' && 'enterprise' || 'opensource' }}
|
||||
|
||||
|
||||
3
.github/workflows/docs-validate-metrics.yml
vendored
3
.github/workflows/docs-validate-metrics.yml
vendored
@@ -1,8 +1,5 @@
|
||||
name: Docs / Validate metrics
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
|
||||
2
.github/workflows/read-toolchain.yaml
vendored
2
.github/workflows/read-toolchain.yaml
vendored
@@ -10,8 +10,6 @@ on:
|
||||
jobs:
|
||||
read-toolchain:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
outputs:
|
||||
image: ${{ steps.read.outputs.image }}
|
||||
steps:
|
||||
|
||||
@@ -18,7 +18,6 @@ target_sources(alternator
|
||||
consumed_capacity.cc
|
||||
ttl.cc
|
||||
parsed_expression_cache.cc
|
||||
http_compression.cc
|
||||
${cql_grammar_srcs})
|
||||
target_include_directories(alternator
|
||||
PUBLIC
|
||||
|
||||
@@ -105,23 +105,11 @@ future<> controller::start_server() {
|
||||
alternator_port = _config.alternator_port();
|
||||
_listen_addresses.push_back({addr, *alternator_port});
|
||||
}
|
||||
std::optional<uint16_t> alternator_port_proxy_protocol;
|
||||
if (_config.alternator_port_proxy_protocol()) {
|
||||
alternator_port_proxy_protocol = _config.alternator_port_proxy_protocol();
|
||||
_listen_addresses.push_back({addr, *alternator_port_proxy_protocol});
|
||||
}
|
||||
std::optional<uint16_t> alternator_https_port;
|
||||
std::optional<uint16_t> alternator_https_port_proxy_protocol;
|
||||
std::optional<tls::credentials_builder> creds;
|
||||
if (_config.alternator_https_port() || _config.alternator_https_port_proxy_protocol()) {
|
||||
if (_config.alternator_https_port()) {
|
||||
alternator_https_port = _config.alternator_https_port();
|
||||
_listen_addresses.push_back({addr, *alternator_https_port});
|
||||
}
|
||||
if (_config.alternator_https_port_proxy_protocol()) {
|
||||
alternator_https_port_proxy_protocol = _config.alternator_https_port_proxy_protocol();
|
||||
_listen_addresses.push_back({addr, *alternator_https_port_proxy_protocol});
|
||||
}
|
||||
if (_config.alternator_https_port()) {
|
||||
alternator_https_port = _config.alternator_https_port();
|
||||
_listen_addresses.push_back({addr, *alternator_https_port});
|
||||
creds.emplace();
|
||||
auto opts = _config.alternator_encryption_options();
|
||||
if (opts.empty()) {
|
||||
@@ -147,29 +135,20 @@ future<> controller::start_server() {
|
||||
}
|
||||
}
|
||||
_server.invoke_on_all(
|
||||
[this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds = std::move(creds)] (server& server) mutable {
|
||||
return server.init(addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol, creds,
|
||||
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
|
||||
return server.init(addr, alternator_port, alternator_https_port, creds,
|
||||
_config.alternator_enforce_authorization,
|
||||
_config.alternator_warn_authorization,
|
||||
_config.alternator_max_users_query_size_in_trace_output,
|
||||
&_memory_limiter.local().get_semaphore(),
|
||||
_config.max_concurrent_requests_per_shard);
|
||||
}).handle_exception([this, addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] (std::exception_ptr ep) {
|
||||
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}, proxy-protocol port {}, TLS proxy-protocol port {}: {}",
|
||||
addr,
|
||||
alternator_port ? std::to_string(*alternator_port) : "OFF",
|
||||
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
|
||||
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
|
||||
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF",
|
||||
ep);
|
||||
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {
|
||||
logger.error("Failed to set up Alternator HTTP server on {} port {}, TLS port {}: {}",
|
||||
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF", ep);
|
||||
return stop_server().then([ep = std::move(ep)] { return make_exception_future<>(ep); });
|
||||
}).then([addr, alternator_port, alternator_https_port, alternator_port_proxy_protocol, alternator_https_port_proxy_protocol] {
|
||||
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}, proxy-protocol port {}, TLS proxy-protocol port {}",
|
||||
addr,
|
||||
alternator_port ? std::to_string(*alternator_port) : "OFF",
|
||||
alternator_https_port ? std::to_string(*alternator_https_port) : "OFF",
|
||||
alternator_port_proxy_protocol ? std::to_string(*alternator_port_proxy_protocol) : "OFF",
|
||||
alternator_https_port_proxy_protocol ? std::to_string(*alternator_https_port_proxy_protocol) : "OFF");
|
||||
}).then([addr, alternator_port, alternator_https_port] {
|
||||
logger.info("Alternator server listening on {}, HTTP port {}, HTTPS port {}",
|
||||
addr, alternator_port ? std::to_string(*alternator_port) : "OFF", alternator_https_port ? std::to_string(*alternator_https_port) : "OFF");
|
||||
}).get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -5986,11 +5986,6 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
_stats.api_operations.list_tables++;
|
||||
elogger.trace("Listing tables {}", request);
|
||||
|
||||
co_await utils::get_local_injector().inject("alternator_list_tables", [] (auto& handler) -> future<> {
|
||||
handler.set("waiting", true);
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
|
||||
});
|
||||
|
||||
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
|
||||
rjson::value* limit_json = rjson::find(request, "Limit");
|
||||
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
|
||||
|
||||
@@ -1,301 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "alternator/http_compression.hh"
|
||||
#include "alternator/server.hh"
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <zlib.h>
|
||||
|
||||
static logging::logger slogger("alternator-http-compression");
|
||||
|
||||
namespace alternator {
|
||||
|
||||
|
||||
static constexpr size_t compressed_buffer_size = 1024;
|
||||
class zlib_compressor {
|
||||
z_stream _zs;
|
||||
temporary_buffer<char> _output_buf;
|
||||
noncopyable_function<future<>(temporary_buffer<char>&&)> _write_func;
|
||||
public:
|
||||
zlib_compressor(bool gzip, int compression_level, noncopyable_function<future<>(temporary_buffer<char>&&)> write_func)
|
||||
: _write_func(std::move(write_func)) {
|
||||
memset(&_zs, 0, sizeof(_zs));
|
||||
if (deflateInit2(&_zs, std::clamp(compression_level, Z_NO_COMPRESSION, Z_BEST_COMPRESSION), Z_DEFLATED,
|
||||
(gzip ? 16 : 0) + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
|
||||
// Should only happen if memory allocation fails
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
}
|
||||
~zlib_compressor() {
|
||||
deflateEnd(&_zs);
|
||||
}
|
||||
future<> close() {
|
||||
return compress(nullptr, 0, true);
|
||||
}
|
||||
|
||||
future<> compress(const char* buf, size_t len, bool is_last_chunk = false) {
|
||||
_zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(buf));
|
||||
_zs.avail_in = (uInt) len;
|
||||
int mode = is_last_chunk ? Z_FINISH : Z_NO_FLUSH;
|
||||
while(_zs.avail_in > 0 || is_last_chunk) {
|
||||
co_await coroutine::maybe_yield();
|
||||
if (_output_buf.empty()) {
|
||||
if (is_last_chunk) {
|
||||
uint32_t max_buffer_size = 0;
|
||||
deflatePending(&_zs, &max_buffer_size, nullptr);
|
||||
max_buffer_size += deflateBound(&_zs, _zs.avail_in) + 1;
|
||||
_output_buf = temporary_buffer<char>(std::min(compressed_buffer_size, (size_t) max_buffer_size));
|
||||
} else {
|
||||
_output_buf = temporary_buffer<char>(compressed_buffer_size);
|
||||
}
|
||||
_zs.next_out = reinterpret_cast<unsigned char*>(_output_buf.get_write());
|
||||
_zs.avail_out = compressed_buffer_size;
|
||||
}
|
||||
int e = deflate(&_zs, mode);
|
||||
if (e < Z_OK) {
|
||||
throw api_error::internal("Error during compression of response body");
|
||||
}
|
||||
if (e == Z_STREAM_END || _zs.avail_out < compressed_buffer_size / 4) {
|
||||
_output_buf.trim(compressed_buffer_size - _zs.avail_out);
|
||||
co_await _write_func(std::move(_output_buf));
|
||||
if (e == Z_STREAM_END) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Helper string_view functions for parsing Accept-Encoding header
|
||||
struct case_insensitive_cmp_sv {
|
||||
bool operator()(std::string_view s1, std::string_view s2) const {
|
||||
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(),
|
||||
[](char a, char b) { return ::tolower(a) == ::tolower(b); });
|
||||
}
|
||||
};
|
||||
static inline std::string_view trim_left(std::string_view sv) {
|
||||
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.front())))
|
||||
sv.remove_prefix(1);
|
||||
return sv;
|
||||
}
|
||||
static inline std::string_view trim_right(std::string_view sv) {
|
||||
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.back())))
|
||||
sv.remove_suffix(1);
|
||||
return sv;
|
||||
}
|
||||
static inline std::string_view trim(std::string_view sv) {
|
||||
return trim_left(trim_right(sv));
|
||||
}
|
||||
|
||||
inline std::vector<std::string_view> split(std::string_view text, char separator) {
|
||||
std::vector<std::string_view> tokens;
|
||||
if (text == "") {
|
||||
return tokens;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
auto pos = text.find_first_of(separator);
|
||||
if (pos != std::string_view::npos) {
|
||||
tokens.emplace_back(text.data(), pos);
|
||||
text.remove_prefix(pos + 1);
|
||||
} else {
|
||||
tokens.emplace_back(text);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
constexpr response_compressor::compression_type response_compressor::get_compression_type(std::string_view encoding) {
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::count); ++i) {
|
||||
if (case_insensitive_cmp_sv{}(encoding, compression_names[i])) {
|
||||
return static_cast<compression_type>(i);
|
||||
}
|
||||
}
|
||||
return compression_type::unknown;
|
||||
}
|
||||
|
||||
response_compressor::compression_type response_compressor::find_compression(std::string_view accept_encoding, size_t response_size) {
|
||||
std::optional<float> ct_q[static_cast<size_t>(compression_type::count)];
|
||||
ct_q[static_cast<size_t>(compression_type::none)] = std::numeric_limits<float>::min(); // enabled, but lowest priority
|
||||
compression_type selected_ct = compression_type::none;
|
||||
|
||||
std::vector<std::string_view> entries = split(accept_encoding, ',');
|
||||
for (auto& e : entries) {
|
||||
std::vector<std::string_view> params = split(e, ';');
|
||||
if (params.size() == 0) {
|
||||
continue;
|
||||
}
|
||||
compression_type ct = get_compression_type(trim(params[0]));
|
||||
if (ct == compression_type::unknown) {
|
||||
continue; // ignore unknown encoding types
|
||||
}
|
||||
if (ct_q[static_cast<size_t>(ct)].has_value() && ct_q[static_cast<size_t>(ct)] != 0.0f) {
|
||||
continue; // already processed this encoding
|
||||
}
|
||||
if (response_size < _threshold[static_cast<size_t>(ct)]) {
|
||||
continue; // below threshold treat as unknown
|
||||
}
|
||||
for (size_t i = 1; i < params.size(); ++i) { // find "q=" parameter
|
||||
auto pos = params[i].find("q=");
|
||||
if (pos == std::string_view::npos) {
|
||||
continue;
|
||||
}
|
||||
std::string_view param = params[i].substr(pos + 2);
|
||||
param = trim(param);
|
||||
// parse quality value
|
||||
float q_value = 1.0f;
|
||||
auto [ptr, ec] = std::from_chars(param.data(), param.data() + param.size(), q_value);
|
||||
if (ec != std::errc() || ptr != param.data() + param.size()) {
|
||||
continue;
|
||||
}
|
||||
if (q_value < 0.0) {
|
||||
q_value = 0.0;
|
||||
} else if (q_value > 1.0) {
|
||||
q_value = 1.0;
|
||||
}
|
||||
ct_q[static_cast<size_t>(ct)] = q_value;
|
||||
break; // we parsed quality value
|
||||
}
|
||||
if (!ct_q[static_cast<size_t>(ct)].has_value()) {
|
||||
ct_q[static_cast<size_t>(ct)] = 1.0f; // default quality value
|
||||
}
|
||||
// keep the highest encoding (in the order, unless 'any')
|
||||
if (selected_ct == compression_type::any) {
|
||||
if (ct_q[static_cast<size_t>(ct)] >= ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = ct;
|
||||
}
|
||||
} else {
|
||||
if (ct_q[static_cast<size_t>(ct)] > ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = ct;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (selected_ct == compression_type::any) {
|
||||
// select any not mentioned or highest quality
|
||||
selected_ct = compression_type::none;
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
|
||||
if (!ct_q[i].has_value()) {
|
||||
return static_cast<compression_type>(i);
|
||||
}
|
||||
if (ct_q[i] > ct_q[static_cast<size_t>(selected_ct)]) {
|
||||
selected_ct = static_cast<compression_type>(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return selected_ct;
|
||||
}
|
||||
|
||||
static future<chunked_content> compress(response_compressor::compression_type ct, const db::config& cfg, std::string str) {
|
||||
chunked_content compressed;
|
||||
auto write = [&compressed](temporary_buffer<char>&& buf) -> future<> {
|
||||
compressed.push_back(std::move(buf));
|
||||
return make_ready_future<>();
|
||||
};
|
||||
zlib_compressor compressor(ct != response_compressor::compression_type::deflate,
|
||||
cfg.alternator_response_gzip_compression_level(), std::move(write));
|
||||
co_await compressor.compress(str.data(), str.size(), true);
|
||||
co_return compressed;
|
||||
}
|
||||
|
||||
static sstring flatten(chunked_content&& cc) {
|
||||
size_t total_size = 0;
|
||||
for (const auto& chunk : cc) {
|
||||
total_size += chunk.size();
|
||||
}
|
||||
sstring result = sstring{ sstring::initialized_later{}, total_size };
|
||||
size_t offset = 0;
|
||||
for (const auto& chunk : cc) {
|
||||
std::copy(chunk.begin(), chunk.end(), result.begin() + offset);
|
||||
offset += chunk.size();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, std::string&& response_body) {
|
||||
response_compressor::compression_type ct = find_compression(accept_encoding, response_body.size());
|
||||
if (ct != response_compressor::compression_type::none) {
|
||||
rep->add_header("Content-Encoding", get_encoding_name(ct));
|
||||
rep->set_content_type(content_type);
|
||||
return compress(ct, cfg, std::move(response_body)).then([rep = std::move(rep)] (chunked_content compressed) mutable {
|
||||
rep->_content = flatten(std::move(compressed));
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
});
|
||||
} else {
|
||||
// Note that despite the move, there is a copy here -
|
||||
// as str is std::string and rep->_content is sstring.
|
||||
rep->_content = std::move(response_body);
|
||||
rep->set_content_type(content_type);
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
}
|
||||
|
||||
template<typename Compressor>
|
||||
class compressed_data_sink_impl : public data_sink_impl {
|
||||
output_stream<char> _out;
|
||||
Compressor _compressor;
|
||||
public:
|
||||
template<typename... Args>
|
||||
compressed_data_sink_impl(output_stream<char>&& out, Args&&... args)
|
||||
: _out(std::move(out)), _compressor(std::forward<Args>(args)..., [this](temporary_buffer<char>&& buf) {
|
||||
return _out.write(std::move(buf));
|
||||
}) { }
|
||||
|
||||
future<> put(std::span<temporary_buffer<char>> data) override {
|
||||
return data_sink_impl::fallback_put(data, [this] (temporary_buffer<char>&& buf) {
|
||||
return do_put(std::move(buf));
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
future<> do_put(temporary_buffer<char> buf) {
|
||||
co_return co_await _compressor.compress(buf.get(), buf.size());
|
||||
|
||||
}
|
||||
future<> close() override {
|
||||
return _compressor.close().then([this] {
|
||||
return _out.close();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
|
||||
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
|
||||
output_stream_options opts;
|
||||
opts.trim_to_size = true;
|
||||
std::unique_ptr<data_sink_impl> data_sink_impl;
|
||||
switch (ct) {
|
||||
case response_compressor::compression_type::gzip:
|
||||
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), true, level);
|
||||
break;
|
||||
case response_compressor::compression_type::deflate:
|
||||
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), false, level);
|
||||
break;
|
||||
case response_compressor::compression_type::none:
|
||||
case response_compressor::compression_type::any:
|
||||
case response_compressor::compression_type::unknown:
|
||||
on_internal_error(slogger,"Compression not selected");
|
||||
default:
|
||||
on_internal_error(slogger, "Unsupported compression type for data sink");
|
||||
}
|
||||
return bw(output_stream<char>(data_sink(std::move(data_sink_impl)), compressed_buffer_size, opts));
|
||||
};
|
||||
}
|
||||
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
|
||||
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
|
||||
if (ct != response_compressor::compression_type::none) {
|
||||
rep->add_header("Content-Encoding", get_encoding_name(ct));
|
||||
rep->write_body(content_type, compress(ct, cfg, std::move(body_writer)));
|
||||
} else {
|
||||
rep->write_body(content_type, std::move(body_writer));
|
||||
}
|
||||
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
@@ -1,91 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "alternator/executor.hh"
|
||||
#include <seastar/http/httpd.hh>
|
||||
#include "db/config.hh"
|
||||
|
||||
namespace alternator {
|
||||
|
||||
class response_compressor {
|
||||
public:
|
||||
enum class compression_type {
|
||||
gzip,
|
||||
deflate,
|
||||
compressions_count,
|
||||
any = compressions_count,
|
||||
none,
|
||||
count,
|
||||
unknown = count
|
||||
};
|
||||
static constexpr std::string_view compression_names[] = {
|
||||
"gzip",
|
||||
"deflate",
|
||||
"*",
|
||||
"identity"
|
||||
};
|
||||
|
||||
static sstring get_encoding_name(compression_type ct) {
|
||||
return sstring(compression_names[static_cast<size_t>(ct)]);
|
||||
}
|
||||
static constexpr compression_type get_compression_type(std::string_view encoding);
|
||||
|
||||
sstring get_accepted_encoding(const http::request& req) {
|
||||
if (get_threshold() == 0) {
|
||||
return "";
|
||||
}
|
||||
return req.get_header("Accept-Encoding");
|
||||
}
|
||||
compression_type find_compression(std::string_view accept_encoding, size_t response_size);
|
||||
|
||||
response_compressor(const db::config& cfg)
|
||||
: cfg(cfg)
|
||||
,_gzip_level_observer(
|
||||
cfg.alternator_response_gzip_compression_level.observe([this](int v) {
|
||||
update_threshold();
|
||||
}))
|
||||
,_gzip_threshold_observer(
|
||||
cfg.alternator_response_compression_threshold_in_bytes.observe([this](uint32_t v) {
|
||||
update_threshold();
|
||||
}))
|
||||
{
|
||||
update_threshold();
|
||||
}
|
||||
response_compressor(const response_compressor& rhs) : response_compressor(rhs.cfg) {}
|
||||
|
||||
private:
|
||||
const db::config& cfg;
|
||||
utils::observable<int>::observer _gzip_level_observer;
|
||||
utils::observable<uint32_t>::observer _gzip_threshold_observer;
|
||||
uint32_t _threshold[static_cast<size_t>(compression_type::count)];
|
||||
|
||||
size_t get_threshold() { return _threshold[static_cast<size_t>(compression_type::any)]; }
|
||||
void update_threshold() {
|
||||
_threshold[static_cast<size_t>(compression_type::none)] = std::numeric_limits<uint32_t>::max();
|
||||
_threshold[static_cast<size_t>(compression_type::any)] = std::numeric_limits<uint32_t>::max();
|
||||
uint32_t gzip = cfg.alternator_response_gzip_compression_level() <= 0 ? std::numeric_limits<uint32_t>::max()
|
||||
: cfg.alternator_response_compression_threshold_in_bytes();
|
||||
_threshold[static_cast<size_t>(compression_type::gzip)] = gzip;
|
||||
_threshold[static_cast<size_t>(compression_type::deflate)] = gzip;
|
||||
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
|
||||
if (_threshold[i] < _threshold[static_cast<size_t>(compression_type::any)]) {
|
||||
_threshold[static_cast<size_t>(compression_type::any)] = _threshold[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, std::string&& response_body);
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
|
||||
};
|
||||
|
||||
}
|
||||
@@ -34,7 +34,6 @@
|
||||
#include "client_data.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <zlib.h>
|
||||
#include "alternator/http_compression.hh"
|
||||
|
||||
static logging::logger slogger("alternator-server");
|
||||
|
||||
@@ -112,12 +111,9 @@ class api_handler : public handler_base {
|
||||
// type applies to all replies, both success and error.
|
||||
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
|
||||
public:
|
||||
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle,
|
||||
const db::config& config) : _response_compressor(config), _f_handle(
|
||||
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
|
||||
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
|
||||
sstring accept_encoding = _response_compressor.get_accepted_encoding(*req);
|
||||
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped(
|
||||
[this, rep = std::move(rep), accept_encoding=std::move(accept_encoding)](future<executor::request_return_type> resf) mutable {
|
||||
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
|
||||
if (resf.failed()) {
|
||||
// Exceptions of type api_error are wrapped as JSON and
|
||||
// returned to the client as expected. Other types of
|
||||
@@ -137,20 +133,22 @@ public:
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
}
|
||||
auto res = resf.get();
|
||||
return std::visit(overloaded_functor {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (std::string&& str) {
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(str));
|
||||
// Note that despite the move, there is a copy here -
|
||||
// as str is std::string and rep->_content is sstring.
|
||||
rep->_content = std::move(str);
|
||||
rep->set_content_type(REPLY_CONTENT_TYPE);
|
||||
},
|
||||
[&] (executor::body_writer&& body_writer) {
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(body_writer));
|
||||
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
|
||||
},
|
||||
[&] (const api_error& err) {
|
||||
generate_error_reply(*rep, err);
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
}
|
||||
}, std::move(res));
|
||||
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
});
|
||||
}) { }
|
||||
|
||||
@@ -179,7 +177,6 @@ protected:
|
||||
slogger.trace("api_handler error case: {}", rep._content);
|
||||
}
|
||||
|
||||
response_compressor _response_compressor;
|
||||
future_handler_function _f_handle;
|
||||
};
|
||||
|
||||
@@ -374,40 +371,13 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
for (const auto& header : signed_headers) {
|
||||
signed_headers_map.emplace(header, std::string_view());
|
||||
}
|
||||
std::vector<std::string> modified_values;
|
||||
for (auto& header : req._headers) {
|
||||
std::string header_str;
|
||||
header_str.resize(header.first.size());
|
||||
std::transform(header.first.begin(), header.first.end(), header_str.begin(), ::tolower);
|
||||
auto it = signed_headers_map.find(header_str);
|
||||
if (it != signed_headers_map.end()) {
|
||||
// replace multiple spaces in the header value header.second with
|
||||
// a single space, as required by AWS SigV4 header canonization.
|
||||
// If we modify the value, we need to save it in modified_values
|
||||
// to keep it alive.
|
||||
std::string value;
|
||||
value.reserve(header.second.size());
|
||||
bool prev_space = false;
|
||||
bool modified = false;
|
||||
for (char ch : header.second) {
|
||||
if (ch == ' ') {
|
||||
if (!prev_space) {
|
||||
value += ch;
|
||||
prev_space = true;
|
||||
} else {
|
||||
modified = true; // skip a space
|
||||
}
|
||||
} else {
|
||||
value += ch;
|
||||
prev_space = false;
|
||||
}
|
||||
}
|
||||
if (modified) {
|
||||
modified_values.emplace_back(std::move(value));
|
||||
it->second = std::string_view(modified_values.back());
|
||||
} else {
|
||||
it->second = std::string_view(header.second);
|
||||
}
|
||||
it->second = std::string_view(header.second);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -420,7 +390,6 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
|
||||
datestamp = std::move(datestamp),
|
||||
signed_headers_str = std::move(signed_headers_str),
|
||||
signed_headers_map = std::move(signed_headers_map),
|
||||
modified_values = std::move(modified_values),
|
||||
region = std::move(region),
|
||||
service = std::move(service),
|
||||
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
|
||||
@@ -591,11 +560,11 @@ read_entire_stream(input_stream<char>& inp, size_t length_limit) {
|
||||
class safe_gzip_zstream {
|
||||
z_stream _zs;
|
||||
public:
|
||||
// If gzip is true, decode a gzip header (for "Content-Encoding: gzip").
|
||||
// Otherwise, a zlib header (for "Content-Encoding: deflate").
|
||||
safe_gzip_zstream(bool gzip = true) {
|
||||
safe_gzip_zstream() {
|
||||
memset(&_zs, 0, sizeof(_zs));
|
||||
if (inflateInit2(&_zs, gzip ? 16 + MAX_WBITS : MAX_WBITS) != Z_OK) {
|
||||
// The strange 16 + WMAX_BITS tells zlib to expect and decode
|
||||
// a gzip header, not a zlib header.
|
||||
if (inflateInit2(&_zs, 16 + MAX_WBITS) != Z_OK) {
|
||||
// Should only happen if memory allocation fails
|
||||
throw std::bad_alloc();
|
||||
}
|
||||
@@ -614,21 +583,19 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// ungzip() takes a chunked_content of a compressed request body, and returns
|
||||
// the uncompressed content as a chunked_content. If gzip is true, we expect
|
||||
// gzip header (for "Content-Encoding: gzip"), if gzip is false, we expect a
|
||||
// zlib header (for "Content-Encoding: deflate").
|
||||
// ungzip() takes a chunked_content with a gzip-compressed request body,
|
||||
// uncompresses it, and returns the uncompressed content as a chunked_content.
|
||||
// If the uncompressed content exceeds length_limit, an error is thrown.
|
||||
static future<chunked_content>
|
||||
ungzip(chunked_content&& compressed_body, size_t length_limit, bool gzip = true) {
|
||||
ungzip(chunked_content&& compressed_body, size_t length_limit) {
|
||||
chunked_content ret;
|
||||
// output_buf can be any size - when uncompressing input_buf, it doesn't
|
||||
// need to fit in a single output_buf, we'll use multiple output_buf for
|
||||
// a single input_buf if needed.
|
||||
constexpr size_t OUTPUT_BUF_SIZE = 4096;
|
||||
temporary_buffer<char> output_buf;
|
||||
safe_gzip_zstream strm(gzip);
|
||||
bool complete_stream = false; // empty input is not a valid gzip/deflate
|
||||
safe_gzip_zstream strm;
|
||||
bool complete_stream = false; // empty input is not a valid gzip
|
||||
size_t total_out_bytes = 0;
|
||||
for (const temporary_buffer<char>& input_buf : compressed_body) {
|
||||
if (input_buf.empty()) {
|
||||
@@ -731,8 +698,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
sstring content_encoding = req->get_header("Content-Encoding");
|
||||
if (content_encoding == "gzip") {
|
||||
content = co_await ungzip(std::move(content), request_content_length_limit);
|
||||
} else if (content_encoding == "deflate") {
|
||||
content = co_await ungzip(std::move(content), request_content_length_limit, false);
|
||||
} else if (!content_encoding.empty()) {
|
||||
// DynamoDB returns a 500 error for unsupported Content-Encoding.
|
||||
// I'm not sure if this is the best error code, but let's do it too.
|
||||
@@ -793,7 +758,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
void server::set_routes(routes& r) {
|
||||
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
|
||||
return handle_api_request(std::move(req));
|
||||
}, _proxy.data_dictionary().get_config());
|
||||
});
|
||||
|
||||
r.put(operation_type::POST, "/", req_handler);
|
||||
r.put(operation_type::GET, "/", new health_handler(_pending_requests));
|
||||
@@ -904,9 +869,7 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
|
||||
} {
|
||||
}
|
||||
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
|
||||
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
|
||||
std::optional<tls::credentials_builder> creds,
|
||||
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
|
||||
_memory_limiter = memory_limiter;
|
||||
@@ -914,28 +877,20 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
|
||||
_warn_authorization = std::move(warn_authorization);
|
||||
_max_concurrent_requests = std::move(max_concurrent_requests);
|
||||
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
|
||||
if (!port && !https_port && !port_proxy_protocol && !https_port_proxy_protocol) {
|
||||
if (!port && !https_port) {
|
||||
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"
|
||||
" must be specified in order to init an alternator HTTP server instance"));
|
||||
}
|
||||
return seastar::async([this, addr, port, https_port, port_proxy_protocol, https_port_proxy_protocol, creds] {
|
||||
return seastar::async([this, addr, port, https_port, creds] {
|
||||
_executor.start().get();
|
||||
|
||||
if (port || port_proxy_protocol) {
|
||||
if (port) {
|
||||
set_routes(_http_server._routes);
|
||||
_http_server.set_content_streaming(true);
|
||||
if (port) {
|
||||
_http_server.listen(socket_address{addr, *port}).get();
|
||||
}
|
||||
if (port_proxy_protocol) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
lo.proxy_protocol = true;
|
||||
_http_server.listen(socket_address{addr, *port_proxy_protocol}, lo).get();
|
||||
}
|
||||
_http_server.listen(socket_address{addr, *port}).get();
|
||||
_enabled_servers.push_back(std::ref(_http_server));
|
||||
}
|
||||
if (https_port || https_port_proxy_protocol) {
|
||||
if (https_port) {
|
||||
set_routes(_https_server._routes);
|
||||
_https_server.set_content_streaming(true);
|
||||
|
||||
@@ -955,15 +910,7 @@ future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std:
|
||||
} else {
|
||||
_credentials = creds->build_server_credentials();
|
||||
}
|
||||
if (https_port) {
|
||||
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
|
||||
}
|
||||
if (https_port_proxy_protocol) {
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
lo.proxy_protocol = true;
|
||||
_https_server.listen(socket_address{addr, *https_port_proxy_protocol}, lo, _credentials).get();
|
||||
}
|
||||
_https_server.listen(socket_address{addr, *https_port}, _credentials).get();
|
||||
_enabled_servers.push_back(std::ref(_https_server));
|
||||
}
|
||||
});
|
||||
@@ -1036,8 +983,9 @@ client_data server::ongoing_request::make_client_data() const {
|
||||
// and keep "driver_version" unset.
|
||||
cd.driver_name = _user_agent;
|
||||
// Leave "protocol_version" unset, it has no meaning in Alternator.
|
||||
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
|
||||
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
|
||||
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
|
||||
// As reported in issue #9216, we never set these fields in CQL
|
||||
// either (see cql_server::connection::make_client_data()).
|
||||
return cd;
|
||||
}
|
||||
|
||||
|
||||
@@ -100,9 +100,7 @@ class server : public peering_sharded_service<server> {
|
||||
public:
|
||||
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
|
||||
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port,
|
||||
std::optional<uint16_t> port_proxy_protocol, std::optional<uint16_t> https_port_proxy_protocol,
|
||||
std::optional<tls::credentials_builder> creds,
|
||||
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
|
||||
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
|
||||
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
|
||||
future<> stop();
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#include <seastar/core/chunked_fifo.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/http/exception.hh>
|
||||
|
||||
#include "task_manager.hh"
|
||||
@@ -265,7 +264,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
|
||||
if (id) {
|
||||
module->unregister_task(id);
|
||||
}
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await maybe_yield();
|
||||
}
|
||||
});
|
||||
co_return json_void();
|
||||
|
||||
@@ -76,14 +76,11 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
|
||||
|
||||
///
|
||||
/// Hash a password combined with an implementation-specific salt string.
|
||||
/// Deprecated in favor of `hash_with_salt_async`. This function is still used
|
||||
/// when generating password hashes for storage to ensure that
|
||||
/// `hash_with_salt` and `hash_with_salt_async` produce identical results,
|
||||
/// preserving backward compatibility.
|
||||
/// Deprecated in favor of `hash_with_salt_async`.
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
[[deprecated("Use hash_with_salt_async instead")]] sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
|
||||
///
|
||||
/// Async version of `hash_with_salt` that returns a future.
|
||||
|
||||
@@ -204,7 +204,7 @@ future<topology_description> topology_description::clone_async() const {
|
||||
|
||||
for (const auto& entry : _entries) {
|
||||
vec.push_back(entry);
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await seastar::maybe_yield();
|
||||
}
|
||||
|
||||
co_return topology_description{std::move(vec)};
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
#include "mutation/tombstone.hh"
|
||||
#include "schema/schema.hh"
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastar/core/sstring.hh"
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/types.hh"
|
||||
#include "types/user.hh"
|
||||
|
||||
72
configure.py
72
configure.py
@@ -391,9 +391,9 @@ def find_compiler(name):
|
||||
return None
|
||||
|
||||
|
||||
def resolve_compilers_for_compiler_cache(args, compiler_cache):
|
||||
def resolve_compilers_for_sccache(args, compiler_cache):
|
||||
"""
|
||||
When using a compiler cache, resolve compiler paths to avoid ccache directories.
|
||||
When using sccache, resolve compiler paths to avoid ccache directories.
|
||||
|
||||
This prevents double-caching when ccache symlinks are in PATH.
|
||||
|
||||
@@ -401,7 +401,7 @@ def resolve_compilers_for_compiler_cache(args, compiler_cache):
|
||||
args: The argument namespace with cc and cxx attributes.
|
||||
compiler_cache: Path to the compiler cache binary, or None.
|
||||
"""
|
||||
if not compiler_cache:
|
||||
if not compiler_cache or 'sccache' not in compiler_cache:
|
||||
return
|
||||
if not os.path.isabs(args.cxx):
|
||||
real_cxx = find_compiler(args.cxx)
|
||||
@@ -1034,7 +1034,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'cql3/functions/aggregate_fcts.cc',
|
||||
'cql3/functions/castas_fcts.cc',
|
||||
'cql3/functions/error_injection_fcts.cc',
|
||||
'cql3/functions/vector_similarity_fcts.cc',
|
||||
'cql3/statements/cf_prop_defs.cc',
|
||||
'cql3/statements/cf_statement.cc',
|
||||
'cql3/statements/authentication_statement.cc',
|
||||
@@ -1092,7 +1091,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'cql3/statements/list_service_level_attachments_statement.cc',
|
||||
'cql3/statements/list_effective_service_level_statement.cc',
|
||||
'cql3/statements/describe_statement.cc',
|
||||
'cql3/statements/view_prop_defs.cc',
|
||||
'cql3/update_parameters.cc',
|
||||
'cql3/util.cc',
|
||||
'cql3/ut_name.cc',
|
||||
@@ -1455,7 +1453,6 @@ alternator = [
|
||||
'alternator/auth.cc',
|
||||
'alternator/streams.cc',
|
||||
'alternator/ttl.cc',
|
||||
'alternator/http_compression.cc'
|
||||
]
|
||||
|
||||
idls = ['idl/gossip_digest.idl.hh',
|
||||
@@ -2806,35 +2803,38 @@ def write_build_file(f,
|
||||
|
||||
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
|
||||
seastar_testing_dep = f'$builddir/{mode}/seastar/libseastar_testing.{seastar_lib_ext}'
|
||||
f.write(f'build {seastar_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n')
|
||||
f.write('build {seastar_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n'
|
||||
.format(**locals()))
|
||||
f.write(' pool = submodule_pool\n')
|
||||
f.write(f' subdir = $builddir/{mode}/seastar\n')
|
||||
f.write(' target = seastar\n')
|
||||
f.write(f'build {seastar_testing_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n')
|
||||
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
|
||||
f.write(' target = seastar\n'.format(**locals()))
|
||||
f.write('build {seastar_testing_dep}: ninja $builddir/{mode}/seastar/build.ninja | always {profile_dep}\n'
|
||||
.format(**locals()))
|
||||
f.write(' pool = submodule_pool\n')
|
||||
f.write(f' subdir = $builddir/{mode}/seastar\n')
|
||||
f.write(' target = seastar_testing\n')
|
||||
f.write(f' profile_dep = {profile_dep}\n')
|
||||
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
|
||||
f.write(' target = seastar_testing\n'.format(**locals()))
|
||||
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
|
||||
|
||||
for lib in abseil_libs:
|
||||
f.write(f'build $builddir/{mode}/abseil/{lib}: ninja $builddir/{mode}/abseil/build.ninja | always {profile_dep}\n')
|
||||
f.write(f' pool = submodule_pool\n')
|
||||
f.write(f' subdir = $builddir/{mode}/abseil\n')
|
||||
f.write(f' target = {lib}\n')
|
||||
f.write(f' profile_dep = {profile_dep}\n')
|
||||
f.write('build $builddir/{mode}/abseil/{lib}: ninja $builddir/{mode}/abseil/build.ninja | always {profile_dep}\n'.format(**locals()))
|
||||
f.write(' pool = submodule_pool\n')
|
||||
f.write(' subdir = $builddir/{mode}/abseil\n'.format(**locals()))
|
||||
f.write(' target = {lib}\n'.format(**locals()))
|
||||
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
|
||||
|
||||
f.write(f'build $builddir/{mode}/stdafx.hh.pch: cxx_build_precompiled_header.{mode} stdafx.hh | {profile_dep} {seastar_dep} {abseil_dep} {gen_headers_dep} {pch_dep}\n')
|
||||
|
||||
f.write(f'build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n')
|
||||
f.write('build $builddir/{mode}/seastar/apps/iotune/iotune: ninja $builddir/{mode}/seastar/build.ninja | $builddir/{mode}/seastar/libseastar.{seastar_lib_ext}\n'
|
||||
.format(**locals()))
|
||||
f.write(' pool = submodule_pool\n')
|
||||
f.write(f' subdir = $builddir/{mode}/seastar\n')
|
||||
f.write(' target = iotune\n')
|
||||
f.write(f' profile_dep = {profile_dep}\n')
|
||||
f.write(textwrap.dedent(f'''\
|
||||
f.write(' subdir = $builddir/{mode}/seastar\n'.format(**locals()))
|
||||
f.write(' target = iotune\n'.format(**locals()))
|
||||
f.write(' profile_dep = {profile_dep}\n'.format(**locals()))
|
||||
f.write(textwrap.dedent('''\
|
||||
build $builddir/{mode}/iotune: copy $builddir/{mode}/seastar/apps/iotune/iotune
|
||||
build $builddir/{mode}/iotune.stripped: strip $builddir/{mode}/iotune
|
||||
build $builddir/{mode}/iotune.debug: phony $builddir/{mode}/iotune.stripped
|
||||
'''))
|
||||
''').format(**locals()))
|
||||
if args.dist_only:
|
||||
include_scylla_and_iotune = ''
|
||||
include_scylla_and_iotune_stripped = ''
|
||||
@@ -2843,16 +2843,16 @@ def write_build_file(f,
|
||||
include_scylla_and_iotune = f'$builddir/{mode}/scylla $builddir/{mode}/iotune $builddir/{mode}/patchelf'
|
||||
include_scylla_and_iotune_stripped = f'$builddir/{mode}/scylla.stripped $builddir/{mode}/iotune.stripped $builddir/{mode}/patchelf.stripped'
|
||||
include_scylla_and_iotune_debug = f'$builddir/{mode}/scylla.debug $builddir/{mode}/iotune.debug'
|
||||
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz: package {include_scylla_and_iotune} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter | always\n')
|
||||
f.write(f' mode = {mode}\n')
|
||||
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz: stripped_package {include_scylla_and_iotune_stripped} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.stripped | always\n')
|
||||
f.write(f' mode = {mode}\n')
|
||||
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-debuginfo-{scylla_version}-{scylla_release}.{arch}.tar.gz: debuginfo_package {include_scylla_and_iotune_debug} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.debug | always\n')
|
||||
f.write(f' mode = {mode}\n')
|
||||
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
|
||||
f.write(f' mode = {mode}\n')
|
||||
f.write(f'build $builddir/{mode}/dist/tar/{scylla_product}-{arch}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
|
||||
f.write(f' mode = {mode}\n')
|
||||
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz: package {include_scylla_and_iotune} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter | always\n'.format(**locals()))
|
||||
f.write(' mode = {mode}\n'.format(**locals()))
|
||||
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz: stripped_package {include_scylla_and_iotune_stripped} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.stripped | always\n'.format(**locals()))
|
||||
f.write(' mode = {mode}\n'.format(**locals()))
|
||||
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-debuginfo-{scylla_version}-{scylla_release}.{arch}.tar.gz: debuginfo_package {include_scylla_and_iotune_debug} $builddir/SCYLLA-RELEASE-FILE $builddir/SCYLLA-VERSION-FILE $builddir/debian/debian $builddir/node_exporter/node_exporter.debug | always\n'.format(**locals()))
|
||||
f.write(' mode = {mode}\n'.format(**locals()))
|
||||
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n'.format(**locals()))
|
||||
f.write(' mode = {mode}\n'.format(**locals()))
|
||||
f.write('build $builddir/{mode}/dist/tar/{scylla_product}-{arch}-package.tar.gz: copy $builddir/{mode}/dist/tar/{scylla_product}-{scylla_version}-{scylla_release}.{arch}.tar.gz\n'.format(**locals()))
|
||||
f.write(' mode = {mode}\n'.format(**locals()))
|
||||
|
||||
f.write(f'build $builddir/dist/{mode}/redhat: rpmbuild $builddir/{mode}/dist/tar/{scylla_product}-unstripped-{scylla_version}-{scylla_release}.{arch}.tar.gz\n')
|
||||
f.write(f' mode = {mode}\n')
|
||||
@@ -3026,7 +3026,7 @@ def create_build_system(args):
|
||||
os.makedirs(outdir, exist_ok=True)
|
||||
|
||||
compiler_cache = find_compiler_cache(args.compiler_cache)
|
||||
resolve_compilers_for_compiler_cache(args, compiler_cache)
|
||||
resolve_compilers_for_sccache(args, compiler_cache)
|
||||
|
||||
scylla_product, scylla_version, scylla_release = generate_version(args.date_stamp)
|
||||
|
||||
@@ -3113,7 +3113,7 @@ def configure_using_cmake(args):
|
||||
in selected_modes)
|
||||
|
||||
compiler_cache = find_compiler_cache(args.compiler_cache)
|
||||
resolve_compilers_for_compiler_cache(args, compiler_cache)
|
||||
resolve_compilers_for_sccache(args, compiler_cache)
|
||||
|
||||
settings = {
|
||||
'CMAKE_CONFIGURATION_TYPES': selected_configs,
|
||||
|
||||
@@ -47,7 +47,6 @@ target_sources(cql3
|
||||
functions/aggregate_fcts.cc
|
||||
functions/castas_fcts.cc
|
||||
functions/error_injection_fcts.cc
|
||||
functions/vector_similarity_fcts.cc
|
||||
statements/cf_prop_defs.cc
|
||||
statements/cf_statement.cc
|
||||
statements/authentication_statement.cc
|
||||
@@ -105,7 +104,6 @@ target_sources(cql3
|
||||
statements/list_service_level_attachments_statement.cc
|
||||
statements/list_effective_service_level_statement.cc
|
||||
statements/describe_statement.cc
|
||||
statements/view_prop_defs.cc
|
||||
update_parameters.cc
|
||||
util.cc
|
||||
ut_name.cc
|
||||
|
||||
50
cql3/Cql.g
50
cql3/Cql.g
@@ -431,7 +431,6 @@ unaliasedSelector returns [uexpression tmp]
|
||||
| K_TTL '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::ttl,
|
||||
unresolved_identifier{std::move(c)}}; }
|
||||
| f=functionName args=selectionFunctionArgs { tmp = function_call{std::move(f), std::move(args)}; }
|
||||
| f=similarityFunctionName args=vectorSimilarityArgs { tmp = function_call{std::move(f), std::move(args)}; }
|
||||
| K_CAST '(' arg=unaliasedSelector K_AS t=native_type ')' { tmp = cast{.style = cast::cast_style::sql, .arg = std::move(arg), .type = std::move(t)}; }
|
||||
)
|
||||
( '.' fi=cident { tmp = field_selection{std::move(tmp), std::move(fi)}; }
|
||||
@@ -446,18 +445,6 @@ selectionFunctionArgs returns [std::vector<expression> a]
|
||||
')'
|
||||
;
|
||||
|
||||
vectorSimilarityArgs returns [std::vector<expression> a]
|
||||
: '(' ')'
|
||||
| '(' v1=vectorSimilarityArg { a.push_back(std::move(v1)); }
|
||||
( ',' vn=vectorSimilarityArg { a.push_back(std::move(vn)); } )*
|
||||
')'
|
||||
;
|
||||
|
||||
vectorSimilarityArg returns [uexpression a]
|
||||
: s=unaliasedSelector { a = std::move(s); }
|
||||
| v=value { a = std::move(v); }
|
||||
;
|
||||
|
||||
countArgument
|
||||
: '*'
|
||||
| i=INTEGER { if (i->getText() != "1") {
|
||||
@@ -898,10 +885,6 @@ pkDef[cql3::statements::create_table_statement::raw_statement& expr]
|
||||
| '(' k1=ident { l.push_back(k1); } ( ',' kn=ident { l.push_back(kn); } )* ')' { $expr.add_key_aliases(l); }
|
||||
;
|
||||
|
||||
cfamProperties[cql3::statements::cf_properties& expr]
|
||||
: cfamProperty[expr] (K_AND cfamProperty[expr])*
|
||||
;
|
||||
|
||||
cfamProperty[cql3::statements::cf_properties& expr]
|
||||
: property[*$expr.properties()]
|
||||
| K_COMPACT K_STORAGE { $expr.set_compact_storage(); }
|
||||
@@ -939,22 +922,16 @@ typeColumns[create_type_statement& expr]
|
||||
*/
|
||||
createIndexStatement returns [std::unique_ptr<create_index_statement> expr]
|
||||
@init {
|
||||
auto idx_props = make_shared<index_specific_prop_defs>();
|
||||
auto props = index_prop_defs();
|
||||
auto props = make_shared<index_prop_defs>();
|
||||
bool if_not_exists = false;
|
||||
auto name = ::make_shared<cql3::index_name>();
|
||||
std::vector<::shared_ptr<index_target::raw>> targets;
|
||||
}
|
||||
: K_CREATE (K_CUSTOM { idx_props->is_custom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
|
||||
: K_CREATE (K_CUSTOM { props->is_custom = true; })? K_INDEX (K_IF K_NOT K_EXISTS { if_not_exists = true; } )?
|
||||
(idxName[*name])? K_ON cf=columnFamilyName '(' (target1=indexIdent { targets.emplace_back(target1); } (',' target2=indexIdent { targets.emplace_back(target2); } )*)? ')'
|
||||
(K_USING cls=STRING_LITERAL { idx_props->custom_class = sstring{$cls.text}; })?
|
||||
(K_WITH cfamProperties[props])?
|
||||
{
|
||||
props.extract_index_specific_properties_to(*idx_props);
|
||||
view_prop_defs view_props = std::move(props).into_view_prop_defs();
|
||||
|
||||
$expr = std::make_unique<create_index_statement>(cf, name, targets, std::move(idx_props), std::move(view_props), if_not_exists);
|
||||
}
|
||||
(K_USING cls=STRING_LITERAL { props->custom_class = sstring{$cls.text}; })?
|
||||
(K_WITH properties[*props])?
|
||||
{ $expr = std::make_unique<create_index_statement>(cf, name, targets, props, if_not_exists); }
|
||||
;
|
||||
|
||||
indexIdent returns [::shared_ptr<index_target::raw> id]
|
||||
@@ -1102,9 +1079,9 @@ alterTypeStatement returns [std::unique_ptr<alter_type_statement> expr]
|
||||
*/
|
||||
alterViewStatement returns [std::unique_ptr<alter_view_statement> expr]
|
||||
@init {
|
||||
auto props = cql3::statements::view_prop_defs();
|
||||
auto props = cql3::statements::cf_prop_defs();
|
||||
}
|
||||
: K_ALTER K_MATERIALIZED K_VIEW cf=columnFamilyName K_WITH properties[*props.properties()]
|
||||
: K_ALTER K_MATERIALIZED K_VIEW cf=columnFamilyName K_WITH properties[props]
|
||||
{
|
||||
$expr = std::make_unique<alter_view_statement>(std::move(cf), std::move(props));
|
||||
}
|
||||
@@ -1706,10 +1683,6 @@ functionName returns [cql3::functions::function_name s]
|
||||
: (ks=keyspaceName '.')? f=allowedFunctionName { $s.keyspace = std::move(ks); $s.name = std::move(f); }
|
||||
;
|
||||
|
||||
similarityFunctionName returns [cql3::functions::function_name s]
|
||||
: f=allowedSimilarityFunctionName { $s = cql3::functions::function_name::native_function(std::move(f)); }
|
||||
;
|
||||
|
||||
allowedFunctionName returns [sstring s]
|
||||
: f=IDENT { $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
|
||||
| f=QUOTED_NAME { $s = $f.text; }
|
||||
@@ -1718,11 +1691,6 @@ allowedFunctionName returns [sstring s]
|
||||
| K_COUNT { $s = "count"; }
|
||||
;
|
||||
|
||||
allowedSimilarityFunctionName returns [sstring s]
|
||||
: f=(K_SIMILARITY_COSINE | K_SIMILARITY_EUCLIDEAN | K_SIMILARITY_DOT_PRODUCT)
|
||||
{ $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
|
||||
;
|
||||
|
||||
functionArgs returns [std::vector<expression> a]
|
||||
: '(' ')'
|
||||
| '(' t1=term { a.push_back(std::move(t1)); }
|
||||
@@ -2419,10 +2387,6 @@ K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
|
||||
|
||||
K_VECTOR_SEARCH_INDEXING: V E C T O R '_' S E A R C H '_' I N D E X I N G;
|
||||
|
||||
K_SIMILARITY_EUCLIDEAN: S I M I L A R I T Y '_' E U C L I D E A N;
|
||||
K_SIMILARITY_COSINE: S I M I L A R I T Y '_' C O S I N E;
|
||||
K_SIMILARITY_DOT_PRODUCT: S I M I L A R I T Y '_' D O T '_' P R O D U C T;
|
||||
|
||||
// Case-insensitive alpha characters
|
||||
fragment A: ('a'|'A');
|
||||
fragment B: ('b'|'B');
|
||||
|
||||
@@ -25,11 +25,6 @@ public:
|
||||
NOT_ASSIGNABLE,
|
||||
};
|
||||
|
||||
struct vector_test_result {
|
||||
test_result result;
|
||||
std::optional<size_t> dimension_opt;
|
||||
};
|
||||
|
||||
static bool is_assignable(test_result tr) {
|
||||
return tr != test_result::NOT_ASSIGNABLE;
|
||||
}
|
||||
@@ -49,8 +44,6 @@ public:
|
||||
*/
|
||||
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const = 0;
|
||||
|
||||
virtual vector_test_result test_assignment_any_size_float_vector() const = 0;
|
||||
|
||||
virtual std::optional<data_type> assignment_testable_type_opt() const = 0;
|
||||
|
||||
// for error reporting
|
||||
|
||||
@@ -1434,112 +1434,6 @@ test_assignment(const expression& expr, data_dictionary::database db, const sstr
|
||||
}, expr);
|
||||
}
|
||||
|
||||
template <cql3_type::kind... Kinds>
|
||||
assignment_testable::vector_test_result
|
||||
test_assignment_any_size_float_vector(const expression& expr) {
|
||||
using test_result = assignment_testable::vector_test_result;
|
||||
const test_result NOT_ASSIGNABLE = {assignment_testable::test_result::NOT_ASSIGNABLE, std::nullopt};
|
||||
const test_result WEAKLY_ASSIGNABLE = {assignment_testable::test_result::WEAKLY_ASSIGNABLE, std::nullopt};
|
||||
auto is_float_or_bind = [] (const expression& e) {
|
||||
return expr::visit(overloaded_functor{
|
||||
[] (const bind_variable&) {
|
||||
return true;
|
||||
},
|
||||
[] (const untyped_constant& uc) {
|
||||
return uc.partial_type == untyped_constant::type_class::floating_point
|
||||
|| uc.partial_type == untyped_constant::type_class::integer;
|
||||
},
|
||||
[] (const constant& value) {
|
||||
auto kind = value.type->as_cql3_type().get_kind();
|
||||
return cql3_type::kind_enum_set::frozen<Kinds...>().contains(kind);
|
||||
},
|
||||
[] (const auto&) {
|
||||
return false;
|
||||
},
|
||||
}, e);
|
||||
};
|
||||
auto validate_assignment = [&] (const data_type& dt) -> test_result {
|
||||
auto vt = dynamic_pointer_cast<const vector_type_impl>(dt->underlying_type());
|
||||
if (!vt) {
|
||||
return NOT_ASSIGNABLE;
|
||||
}
|
||||
auto elem_kind = vt->get_elements_type()->as_cql3_type().get_kind();
|
||||
if (cql3_type::kind_enum_set::frozen<Kinds...>().contains(elem_kind)) {
|
||||
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, vt->get_dimension()};
|
||||
}
|
||||
return NOT_ASSIGNABLE;
|
||||
};
|
||||
return expr::visit(overloaded_functor{
|
||||
[&] (const constant& value) -> test_result {
|
||||
return validate_assignment(value.type);
|
||||
},
|
||||
[&] (const binary_operator&) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const conjunction&) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const column_value& col_val) -> test_result {
|
||||
return validate_assignment(col_val.col->type);
|
||||
},
|
||||
[&] (const subscript&) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const unresolved_identifier& ui) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const column_mutation_attribute& cma) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const function_call& fc) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const cast& c) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const field_selection& fs) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const bind_variable& bv) -> test_result {
|
||||
return WEAKLY_ASSIGNABLE;
|
||||
},
|
||||
[&] (const untyped_constant& uc) -> test_result {
|
||||
return uc.partial_type == untyped_constant::type_class::null
|
||||
? WEAKLY_ASSIGNABLE
|
||||
: NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const tuple_constructor& tc) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const collection_constructor& c) -> test_result {
|
||||
switch (c.style) {
|
||||
case collection_constructor::style_type::list_or_vector: {
|
||||
if(std::ranges::all_of(c.elements, is_float_or_bind)) {
|
||||
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, c.elements.size()};
|
||||
}
|
||||
return NOT_ASSIGNABLE;
|
||||
}
|
||||
case collection_constructor::style_type::set: return NOT_ASSIGNABLE;
|
||||
case collection_constructor::style_type::map: return NOT_ASSIGNABLE;
|
||||
case collection_constructor::style_type::vector:
|
||||
on_internal_error(expr_logger, "vector style type found in test_assignment, should have been introduced post-prepare");
|
||||
}
|
||||
on_internal_error(expr_logger, fmt::format("unexpected collection_constructor style {}", static_cast<unsigned>(c.style)));
|
||||
},
|
||||
[&] (const usertype_constructor& uc) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
[&] (const temporary& t) -> test_result {
|
||||
return NOT_ASSIGNABLE;
|
||||
},
|
||||
}, expr);
|
||||
}
|
||||
|
||||
assignment_testable::vector_test_result
|
||||
test_assignment_any_size_float_vector(const expression& expr) {
|
||||
return test_assignment_any_size_float_vector<cql3_type::kind::FLOAT, cql3_type::kind::DOUBLE>(expr);
|
||||
}
|
||||
|
||||
expression
|
||||
prepare_expression(const expression& expr, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
|
||||
auto e_opt = try_prepare_expression(expr, db, keyspace, schema_opt, std::move(receiver));
|
||||
@@ -1573,9 +1467,6 @@ public:
|
||||
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const override {
|
||||
return expr::test_assignment(_e, db, keyspace, schema_opt, receiver);
|
||||
}
|
||||
virtual vector_test_result test_assignment_any_size_float_vector() const override {
|
||||
return expr::test_assignment_any_size_float_vector(_e);
|
||||
}
|
||||
virtual sstring assignment_testable_source_context() const override {
|
||||
return fmt::format("{}", _e);
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
#include "cql3/functions/user_function.hh"
|
||||
#include "cql3/functions/user_aggregate.hh"
|
||||
#include "cql3/functions/uuid_fcts.hh"
|
||||
#include "cql3/functions/vector_similarity_fcts.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "as_json_function.hh"
|
||||
#include "cql3/prepare_context.hh"
|
||||
@@ -399,14 +398,6 @@ functions::get(data_dictionary::database db,
|
||||
}
|
||||
});
|
||||
|
||||
const auto func_name = name.has_keyspace() ? name : name.as_native_function();
|
||||
if (SIMILARITY_FUNCTIONS.contains(func_name)) {
|
||||
auto arg_types = retrieve_vector_arg_types(func_name, provided_args);
|
||||
auto fun = ::make_shared<vector_similarity_fct>(func_name.name, arg_types);
|
||||
validate_types(db, keyspace, schema.get(), fun, provided_args, receiver_ks, receiver_cf);
|
||||
return fun;
|
||||
}
|
||||
|
||||
if (name.has_keyspace()
|
||||
? name == TOKEN_FUNCTION_NAME
|
||||
: name.name == TOKEN_FUNCTION_NAME.name) {
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "vector_similarity_fcts.hh"
|
||||
#include "types/types.hh"
|
||||
#include "types/vector.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace cql3 {
|
||||
namespace functions {
|
||||
namespace {
|
||||
|
||||
// The computations of similarity scores match the exact formulas of Cassandra's (jVector's) implementation to ensure compatibility.
|
||||
// There exist tests checking the compliance of the results.
|
||||
// Reference:
|
||||
// https://github.com/datastax/jvector/blob/f967f1c9249035b63b55a566fac7d4dc38380349/jvector-base/src/main/java/io/github/jbellis/jvector/vector/VectorSimilarityFunction.java#L36-L69
|
||||
|
||||
// You should only use this function if you need to preserve the original vectors and cannot normalize
|
||||
// them in advance.
|
||||
float compute_cosine_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
|
||||
double dot_product = 0.0;
|
||||
double squared_norm_a = 0.0;
|
||||
double squared_norm_b = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = value_cast<float>(v1[i]);
|
||||
double b = value_cast<float>(v2[i]);
|
||||
|
||||
dot_product += a * b;
|
||||
squared_norm_a += a * a;
|
||||
squared_norm_b += b * b;
|
||||
}
|
||||
|
||||
if (squared_norm_a == 0 || squared_norm_b == 0) {
|
||||
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
|
||||
}
|
||||
|
||||
// The cosine similarity is in the range [-1, 1].
|
||||
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
|
||||
// for consistency with other similarity functions.
|
||||
return (1 + (dot_product / (std::sqrt(squared_norm_a * squared_norm_b)))) / 2;
|
||||
}
|
||||
|
||||
float compute_euclidean_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
|
||||
double sum = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = value_cast<float>(v1[i]);
|
||||
double b = value_cast<float>(v2[i]);
|
||||
|
||||
double diff = a - b;
|
||||
sum += diff * diff;
|
||||
}
|
||||
|
||||
// The squared Euclidean (L2) distance is of range [0, inf).
|
||||
// It is mapped to a similarity score in the range (0, 1] (0 -> 1, inf -> 0)
|
||||
// for consistency with other similarity functions.
|
||||
return (1 / (1 + sum));
|
||||
}
|
||||
|
||||
// Assumes that both vectors are L2-normalized.
|
||||
// This similarity is intended as an optimized way to perform cosine similarity calculation.
|
||||
float compute_dot_product_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
|
||||
double dot_product = 0.0;
|
||||
|
||||
for (size_t i = 0; i < v1.size(); ++i) {
|
||||
double a = value_cast<float>(v1[i]);
|
||||
double b = value_cast<float>(v2[i]);
|
||||
dot_product += a * b;
|
||||
}
|
||||
|
||||
// The dot product is in the range [-1, 1] for L2-normalized vectors.
|
||||
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
|
||||
// for consistency with other similarity functions.
|
||||
return ((1 + dot_product) / 2);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS = {
|
||||
{SIMILARITY_COSINE_FUNCTION_NAME, compute_cosine_similarity},
|
||||
{SIMILARITY_EUCLIDEAN_FUNCTION_NAME, compute_euclidean_similarity},
|
||||
{SIMILARITY_DOT_PRODUCT_FUNCTION_NAME, compute_dot_product_similarity},
|
||||
};
|
||||
|
||||
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args) {
|
||||
if (provided_args.size() != 2) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Invalid number of arguments for function {}(vector<float, n>, vector<float, n>)", name));
|
||||
}
|
||||
|
||||
auto [first_result, first_dim_opt] = provided_args[0]->test_assignment_any_size_float_vector();
|
||||
auto [second_result, second_dim_opt] = provided_args[1]->test_assignment_any_size_float_vector();
|
||||
|
||||
auto invalid_type_error_message = [&name](const shared_ptr<assignment_testable>& arg) {
|
||||
auto type = arg->assignment_testable_type_opt();
|
||||
const auto& source_context = arg->assignment_testable_source_context();
|
||||
if (type) {
|
||||
return fmt::format("Function {} requires a float vector argument, but found {} of type {}", name, source_context, type.value()->cql3_type_name());
|
||||
} else {
|
||||
return fmt::format("Function {} requires a float vector argument, but found {}", name, source_context);
|
||||
}
|
||||
};
|
||||
|
||||
if (!is_assignable(first_result)) {
|
||||
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[0]));
|
||||
}
|
||||
if (!is_assignable(second_result)) {
|
||||
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[1]));
|
||||
}
|
||||
|
||||
if (!first_dim_opt && !second_dim_opt) {
|
||||
throw exceptions::invalid_request_exception(fmt::format("Cannot infer type of argument {} for function {}(vector<float, n>, vector<float, n>)",
|
||||
provided_args[0]->assignment_testable_source_context(), name));
|
||||
}
|
||||
if (first_dim_opt && second_dim_opt) {
|
||||
if (*first_dim_opt != *second_dim_opt) {
|
||||
throw exceptions::invalid_request_exception(fmt::format(
|
||||
"All arguments must have the same vector dimensions, but found vector<float, {}> and vector<float, {}>", *first_dim_opt, *second_dim_opt));
|
||||
}
|
||||
}
|
||||
|
||||
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
|
||||
auto type = vector_type_impl::get_instance(float_type, dimension);
|
||||
return {type, type};
|
||||
}
|
||||
|
||||
bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters) {
|
||||
if (std::any_of(parameters.begin(), parameters.end(), [](const auto& param) {
|
||||
return !param;
|
||||
})) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
const auto& type = arg_types()[0];
|
||||
data_value v1 = type->deserialize(*parameters[0]);
|
||||
data_value v2 = type->deserialize(*parameters[1]);
|
||||
const auto& v1_elements = value_cast<std::vector<data_value>>(v1);
|
||||
const auto& v2_elements = value_cast<std::vector<data_value>>(v2);
|
||||
|
||||
float result = SIMILARITY_FUNCTIONS.at(_name)(v1_elements, v2_elements);
|
||||
return float_type->decompose(result);
|
||||
}
|
||||
|
||||
} // namespace functions
|
||||
} // namespace cql3
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "native_scalar_function.hh"
|
||||
#include "cql3/assignment_testable.hh"
|
||||
#include "cql3/functions/function_name.hh"
|
||||
|
||||
namespace cql3 {
|
||||
namespace functions {
|
||||
|
||||
static const function_name SIMILARITY_COSINE_FUNCTION_NAME = function_name::native_function("similarity_cosine");
|
||||
static const function_name SIMILARITY_EUCLIDEAN_FUNCTION_NAME = function_name::native_function("similarity_euclidean");
|
||||
static const function_name SIMILARITY_DOT_PRODUCT_FUNCTION_NAME = function_name::native_function("similarity_dot_product");
|
||||
|
||||
using similarity_function_t = float (*)(const std::vector<data_value>&, const std::vector<data_value>&);
|
||||
extern thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS;
|
||||
|
||||
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args);
|
||||
|
||||
class vector_similarity_fct : public native_scalar_function {
|
||||
public:
|
||||
vector_similarity_fct(const sstring& name, const std::vector<data_type>& arg_types)
|
||||
: native_scalar_function(name, float_type, arg_types) {
|
||||
}
|
||||
|
||||
virtual bytes_opt execute(std::span<const bytes_opt> parameters) override;
|
||||
};
|
||||
|
||||
} // namespace functions
|
||||
} // namespace cql3
|
||||
@@ -14,7 +14,6 @@
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include <seastar/coroutine/try_future.hh>
|
||||
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
@@ -994,7 +993,7 @@ query_processor::execute_with_params(
|
||||
auto opts = make_internal_options(p, values, cl);
|
||||
auto statement = p->statement;
|
||||
|
||||
auto msg = co_await coroutine::try_future(execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params));
|
||||
auto msg = co_await execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params);
|
||||
co_return ::make_shared<untyped_result_set>(msg);
|
||||
}
|
||||
|
||||
@@ -1004,7 +1003,7 @@ query_processor::do_execute_with_params(
|
||||
shared_ptr<cql_statement> statement,
|
||||
const query_options& options, std::optional<service::group0_guard> guard) {
|
||||
statement->validate(*this, service::client_state::for_internal_calls());
|
||||
co_return co_await coroutine::try_future(statement->execute(*this, query_state, options, std::move(guard)));
|
||||
co_return co_await statement->execute(*this, query_state, options, std::move(guard));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -32,7 +32,7 @@ bool
|
||||
selectable_processes_selection(const expr::expression& selectable) {
|
||||
return expr::visit(overloaded_functor{
|
||||
[&] (const expr::constant&) -> bool {
|
||||
return true;
|
||||
on_internal_error(slogger, "no way to express SELECT constant in the grammar yet");
|
||||
},
|
||||
[&] (const expr::conjunction& conj) -> bool {
|
||||
on_internal_error(slogger, "no way to express 'SELECT a AND b' in the grammar yet");
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
#include "seastar/coroutine/exception.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
@@ -206,9 +206,8 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
locator::replication_strategy_params(ks_md_update->strategy_options(), ks_md_update->initial_tablets(), ks_md_update->consistency_option()),
|
||||
topo);
|
||||
|
||||
// If RF-rack-validity must be enforced for the keyspace according to `enforce_rf_rack_validity_for_keyspace`,
|
||||
// it's forbidden to perform a schema change that would lead to an RF-rack-invalid keyspace.
|
||||
// Verify that this change does not.
|
||||
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to perform a schema change that
|
||||
// would lead to an RF-rack-valid keyspace. Verify that this change does not.
|
||||
// For more context, see: scylladb/scylladb#23071.
|
||||
try {
|
||||
// There are two things to note here:
|
||||
@@ -226,13 +225,13 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
|
||||
// disturb it (see scylladb/scylladb#23345), but we ignore that.
|
||||
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
|
||||
} catch (const std::exception& e) {
|
||||
if (replica::database::enforce_rf_rack_validity_for_keyspace(qp.db().get_config(), *ks_md)) {
|
||||
if (qp.db().get_config().rf_rack_valid_keyspaces()) {
|
||||
// There's no guarantee what the type of the exception will be, so we need to
|
||||
// wrap it manually here in a type that can be passed to the user.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
} else {
|
||||
// Even when RF-rack-validity is not enforced for the keyspace, we'd
|
||||
// like to inform the user that the keyspace they're altering will not
|
||||
// Even when the configuration option `rf_rack_valid_keyspaces` is set to false,
|
||||
// we'd like to inform the user that the keyspace they're altering will not
|
||||
// satisfy the restriction after the change--but just as a warning.
|
||||
// For more context, see issue: scylladb/scylladb#23330.
|
||||
warnings.push_back(seastar::format(
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "cql3/statements/alter_view_statement.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "validation.hh"
|
||||
@@ -23,7 +22,7 @@ namespace cql3 {
|
||||
|
||||
namespace statements {
|
||||
|
||||
alter_view_statement::alter_view_statement(cf_name view_name, std::optional<view_prop_defs> properties)
|
||||
alter_view_statement::alter_view_statement(cf_name view_name, std::optional<cf_prop_defs> properties)
|
||||
: schema_altering_statement{std::move(view_name)}
|
||||
, _properties{std::move(properties)}
|
||||
{
|
||||
@@ -53,8 +52,8 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
|
||||
throw exceptions::invalid_request_exception("ALTER MATERIALIZED VIEW WITH invoked, but no parameters found");
|
||||
}
|
||||
|
||||
auto schema_extensions = _properties->properties()->make_schema_extensions(db.extensions());
|
||||
_properties->validate_raw(view_prop_defs::op_type::alter, db, keyspace(), schema_extensions);
|
||||
auto schema_extensions = _properties->make_schema_extensions(db.extensions());
|
||||
_properties->validate(db, keyspace(), schema_extensions);
|
||||
|
||||
bool is_colocated = [&] {
|
||||
if (!db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
|
||||
@@ -71,15 +70,28 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
|
||||
}();
|
||||
|
||||
if (is_colocated) {
|
||||
auto gc_opts = _properties->properties()->get_tombstone_gc_options(schema_extensions);
|
||||
auto gc_opts = _properties->get_tombstone_gc_options(schema_extensions);
|
||||
if (gc_opts && gc_opts->mode() == tombstone_gc_mode::repair) {
|
||||
throw exceptions::invalid_request_exception("The 'repair' mode for tombstone_gc is not allowed on co-located materialized view tables.");
|
||||
}
|
||||
}
|
||||
|
||||
auto builder = schema_builder(schema);
|
||||
_properties->apply_to_builder(view_prop_defs::op_type::alter, builder, std::move(schema_extensions),
|
||||
db, keyspace(), is_colocated);
|
||||
_properties->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
|
||||
|
||||
if (builder.get_gc_grace_seconds() == 0) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Cannot alter gc_grace_seconds of a materialized view to 0, since this "
|
||||
"value is used to TTL undelivered updates. Setting gc_grace_seconds too "
|
||||
"low might cause undelivered updates to expire before being replayed.");
|
||||
}
|
||||
|
||||
if (builder.default_time_to_live().count() > 0) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Cannot set or alter default_time_to_live for a materialized view. "
|
||||
"Data in a materialized view always expire at the same time than "
|
||||
"the corresponding data in the parent table.");
|
||||
}
|
||||
|
||||
return view_ptr(builder.build());
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "cql3/statements/cf_prop_defs.hh"
|
||||
#include "cql3/statements/schema_altering_statement.hh"
|
||||
|
||||
namespace cql3 {
|
||||
@@ -26,10 +26,10 @@ namespace statements {
|
||||
/** An <code>ALTER MATERIALIZED VIEW</code> parsed from a CQL query statement. */
|
||||
class alter_view_statement : public schema_altering_statement {
|
||||
private:
|
||||
std::optional<view_prop_defs> _properties;
|
||||
std::optional<cf_prop_defs> _properties;
|
||||
view_ptr prepare_view(data_dictionary::database db) const;
|
||||
public:
|
||||
alter_view_statement(cf_name view_name, std::optional<view_prop_defs> properties);
|
||||
alter_view_statement(cf_name view_name, std::optional<cf_prop_defs> properties);
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
|
||||
@@ -19,8 +19,7 @@ namespace statements {
|
||||
/**
|
||||
* Class for common statement properties.
|
||||
*/
|
||||
class cf_properties {
|
||||
protected:
|
||||
class cf_properties final {
|
||||
const ::shared_ptr<cf_prop_defs> _properties = ::make_shared<cf_prop_defs>();
|
||||
bool _use_compact_storage = false;
|
||||
std::vector<std::pair<::shared_ptr<column_identifier>, bool>> _defined_ordering; // Insertion ordering is important
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include "db/view/view.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "index/vector_index.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "types/types.hh"
|
||||
@@ -219,24 +218,18 @@ view_ptr create_index_statement::create_view_for_index(const schema_ptr schema,
|
||||
std::map<sstring, sstring> tags_map = {{db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY, "true"}};
|
||||
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
|
||||
}
|
||||
|
||||
const schema::extensions_map exts = _view_properties.properties()->make_schema_extensions(db.extensions());
|
||||
_view_properties.apply_to_builder(view_prop_defs::op_type::create, builder, exts, db, keyspace(), is_colocated);
|
||||
|
||||
return view_ptr{builder.build()};
|
||||
}
|
||||
|
||||
create_index_statement::create_index_statement(cf_name name,
|
||||
::shared_ptr<index_name> index_name,
|
||||
std::vector<::shared_ptr<index_target::raw>> raw_targets,
|
||||
::shared_ptr<index_specific_prop_defs> idx_properties,
|
||||
view_prop_defs view_properties,
|
||||
::shared_ptr<index_prop_defs> properties,
|
||||
bool if_not_exists)
|
||||
: schema_altering_statement(name)
|
||||
, _index_name(index_name->get_idx())
|
||||
, _raw_targets(raw_targets)
|
||||
, _idx_properties(std::move(idx_properties))
|
||||
, _view_properties(std::move(view_properties))
|
||||
, _properties(properties)
|
||||
, _if_not_exists(if_not_exists)
|
||||
{
|
||||
}
|
||||
@@ -259,53 +252,14 @@ static sstring target_type_name(index_target::target_type type) {
|
||||
void
|
||||
create_index_statement::validate(query_processor& qp, const service::client_state& state) const
|
||||
{
|
||||
if (_raw_targets.empty() && !_idx_properties->is_custom) {
|
||||
if (_raw_targets.empty() && !_properties->is_custom) {
|
||||
throw exceptions::invalid_request_exception("Only CUSTOM indexes can be created without specifying a target column");
|
||||
}
|
||||
|
||||
_idx_properties->validate();
|
||||
|
||||
// FIXME: This is ugly and can be improved.
|
||||
const bool is_vector_index = _idx_properties->custom_class && *_idx_properties->custom_class == "vector_index";
|
||||
const bool uses_view_properties = _view_properties.properties()->count() > 0
|
||||
|| _view_properties.use_compact_storage()
|
||||
|| _view_properties.defined_ordering().size() > 0;
|
||||
|
||||
if (is_vector_index && uses_view_properties) {
|
||||
throw exceptions::invalid_request_exception("You cannot use view properties with a vector index");
|
||||
}
|
||||
|
||||
const schema::extensions_map exts = _view_properties.properties()->make_schema_extensions(qp.db().extensions());
|
||||
_view_properties.validate_raw(view_prop_defs::op_type::create, qp.db(), keyspace(), exts);
|
||||
|
||||
// These keywords are still accepted by other schema entities, but they don't have effect on them.
|
||||
// Since indexes are not bound by any backward compatibility contract in this regard, let's forbid these.
|
||||
static sstring obsolete_keywords[] = {
|
||||
"index_interval",
|
||||
"replicate_on_write",
|
||||
"populate_io_cache_on_flush",
|
||||
"read_repair_chance",
|
||||
"dclocal_read_repair_chance",
|
||||
};
|
||||
|
||||
for (const sstring& keyword : obsolete_keywords) {
|
||||
if (_view_properties.properties()->has_property(keyword)) {
|
||||
// We use the same type of exception and the same error message as would be thrown for
|
||||
// an invalid property via `_view_properties.validate_raw`.
|
||||
throw exceptions::syntax_exception(seastar::format("Unknown property '{}'", keyword));
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: This is a temporary limitation as it might deserve more attention.
|
||||
if (!_view_properties.defined_ordering().empty()) {
|
||||
throw exceptions::invalid_request_exception("Indexes do not allow for specifying the clustering order");
|
||||
}
|
||||
_properties->validate();
|
||||
}
|
||||
|
||||
std::pair<std::vector<::shared_ptr<index_target>>, cql3::cql_warnings_vec>
|
||||
create_index_statement::validate_while_executing(data_dictionary::database db, locator::token_metadata_ptr tmptr) const {
|
||||
cql3::cql_warnings_vec warnings;
|
||||
|
||||
std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_executing(data_dictionary::database db) const {
|
||||
auto schema = validation::validate_column_family(db, keyspace(), column_family());
|
||||
|
||||
if (schema->is_counter()) {
|
||||
@@ -327,22 +281,13 @@ create_index_statement::validate_while_executing(data_dictionary::database db, l
|
||||
|
||||
// Regular secondary indexes require rf-rack-validity.
|
||||
// Custom indexes need to validate this property themselves, if they need it.
|
||||
if (!_idx_properties || !_idx_properties->custom_class) {
|
||||
if (!_properties || !_properties->custom_class) {
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace(), tmptr);
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
|
||||
if (db.find_keyspace(keyspace()).uses_tablets()) {
|
||||
warnings.emplace_back(
|
||||
"Creating an index in a keyspace that uses tablets requires "
|
||||
"the keyspace to remain RF-rack-valid while the index exists. "
|
||||
"Some operations will be restricted to enforce this: altering the keyspace's replication "
|
||||
"factor, adding a node in a new rack, and removing or decommissioning a node that would "
|
||||
"eliminate a rack.");
|
||||
}
|
||||
}
|
||||
|
||||
validate_for_local_index(*schema);
|
||||
@@ -352,14 +297,14 @@ create_index_statement::validate_while_executing(data_dictionary::database db, l
|
||||
targets.emplace_back(raw_target->prepare(*schema));
|
||||
}
|
||||
|
||||
if (_idx_properties && _idx_properties->custom_class) {
|
||||
auto custom_index_factory = secondary_index::secondary_index_manager::get_custom_class_factory(*_idx_properties->custom_class);
|
||||
if (_properties && _properties->custom_class) {
|
||||
auto custom_index_factory = secondary_index::secondary_index_manager::get_custom_class_factory(*_properties->custom_class);
|
||||
if (!custom_index_factory) {
|
||||
throw exceptions::invalid_request_exception(format("Non-supported custom class \'{}\' provided", *_idx_properties->custom_class));
|
||||
throw exceptions::invalid_request_exception(format("Non-supported custom class \'{}\' provided", *(_properties->custom_class)));
|
||||
}
|
||||
auto custom_index = (*custom_index_factory)();
|
||||
custom_index->validate(*schema, *_idx_properties, targets, db.features(), db);
|
||||
_idx_properties->index_version = custom_index->index_version(*schema);
|
||||
custom_index->validate(*schema, *_properties, targets, db.features(), db);
|
||||
_properties->index_version = custom_index->index_version(*schema);
|
||||
}
|
||||
|
||||
if (targets.size() > 1) {
|
||||
@@ -439,7 +384,7 @@ create_index_statement::validate_while_executing(data_dictionary::database db, l
|
||||
}
|
||||
}
|
||||
|
||||
return std::make_pair(std::move(targets), std::move(warnings));
|
||||
return targets;
|
||||
}
|
||||
|
||||
void create_index_statement::validate_for_local_index(const schema& schema) const {
|
||||
@@ -578,7 +523,7 @@ void create_index_statement::validate_target_column_is_map_if_index_involves_key
|
||||
|
||||
void create_index_statement::validate_targets_for_multi_column_index(std::vector<::shared_ptr<index_target>> targets) const
|
||||
{
|
||||
if (!_idx_properties->is_custom) {
|
||||
if (!_properties->is_custom) {
|
||||
if (targets.size() > 2 || (targets.size() == 2 && std::holds_alternative<index_target::single_column>(targets.front()->value))) {
|
||||
throw exceptions::invalid_request_exception("Only CUSTOM indexes support multiple columns");
|
||||
}
|
||||
@@ -592,9 +537,8 @@ void create_index_statement::validate_targets_for_multi_column_index(std::vector
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<std::optional<create_index_statement::base_schema_with_new_index>, cql3::cql_warnings_vec>
|
||||
create_index_statement::build_index_schema(data_dictionary::database db, locator::token_metadata_ptr tmptr) const {
|
||||
auto [targets, warnings] = validate_while_executing(db, tmptr);
|
||||
std::optional<create_index_statement::base_schema_with_new_index> create_index_statement::build_index_schema(data_dictionary::database db) const {
|
||||
auto targets = validate_while_executing(db);
|
||||
|
||||
auto schema = db.find_schema(keyspace(), column_family());
|
||||
|
||||
@@ -610,8 +554,8 @@ create_index_statement::build_index_schema(data_dictionary::database db, locator
|
||||
}
|
||||
index_metadata_kind kind;
|
||||
index_options_map index_options;
|
||||
if (_idx_properties->custom_class) {
|
||||
index_options = _idx_properties->get_options();
|
||||
if (_properties->custom_class) {
|
||||
index_options = _properties->get_options();
|
||||
kind = index_metadata_kind::custom;
|
||||
} else {
|
||||
kind = schema->is_compound() ? index_metadata_kind::composites : index_metadata_kind::keys;
|
||||
@@ -620,17 +564,17 @@ create_index_statement::build_index_schema(data_dictionary::database db, locator
|
||||
auto existing_index = schema->find_index_noname(index);
|
||||
if (existing_index) {
|
||||
if (_if_not_exists) {
|
||||
return std::make_pair(std::nullopt, std::move(warnings));
|
||||
return {};
|
||||
} else {
|
||||
throw exceptions::invalid_request_exception(
|
||||
format("Index {} is a duplicate of existing index {}", index.name(), existing_index.value().name()));
|
||||
}
|
||||
}
|
||||
bool existing_vector_index = _idx_properties->custom_class && _idx_properties->custom_class == "vector_index" && secondary_index::vector_index::has_vector_index_on_column(*schema, targets[0]->column_name());
|
||||
bool custom_index_with_same_name = _idx_properties->custom_class && db.existing_index_names(keyspace()).contains(_index_name);
|
||||
bool existing_vector_index = _properties->custom_class && _properties->custom_class == "vector_index" && secondary_index::vector_index::has_vector_index_on_column(*schema, targets[0]->column_name());
|
||||
bool custom_index_with_same_name = _properties->custom_class && db.existing_index_names(keyspace()).contains(_index_name);
|
||||
if (existing_vector_index || custom_index_with_same_name) {
|
||||
if (_if_not_exists) {
|
||||
return std::make_pair(std::nullopt, std::move(warnings));
|
||||
return {};
|
||||
} else {
|
||||
throw exceptions::invalid_request_exception("There exists a duplicate custom index");
|
||||
}
|
||||
@@ -646,13 +590,13 @@ create_index_statement::build_index_schema(data_dictionary::database db, locator
|
||||
schema_builder builder{schema};
|
||||
builder.with_index(index);
|
||||
|
||||
return std::make_pair(base_schema_with_new_index{builder.build(), index}, std::move(warnings));
|
||||
return base_schema_with_new_index{builder.build(), index};
|
||||
}
|
||||
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chunked_vector<mutation>, cql3::cql_warnings_vec>>
|
||||
create_index_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
|
||||
using namespace cql_transport;
|
||||
auto [res, warnings] = build_index_schema(qp.db(), qp.proxy().get_token_metadata_ptr());
|
||||
auto res = build_index_schema(qp.db());
|
||||
|
||||
::shared_ptr<event::schema_change> ret;
|
||||
utils::chunked_vector<mutation> muts;
|
||||
@@ -682,7 +626,7 @@ create_index_statement::prepare_schema_mutations(query_processor& qp, const quer
|
||||
column_family());
|
||||
}
|
||||
|
||||
co_return std::make_tuple(std::move(ret), std::move(muts), std::move(warnings));
|
||||
co_return std::make_tuple(std::move(ret), std::move(muts), std::vector<sstring>());
|
||||
}
|
||||
|
||||
std::unique_ptr<cql3::statements::prepared_statement>
|
||||
|
||||
@@ -10,8 +10,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cql3/statements/index_prop_defs.hh"
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
#include "schema_altering_statement.hh"
|
||||
#include "index_target.hh"
|
||||
|
||||
@@ -29,25 +27,20 @@ class index_name;
|
||||
|
||||
namespace statements {
|
||||
|
||||
class index_specific_prop_defs;
|
||||
class index_prop_defs;
|
||||
|
||||
/** A <code>CREATE INDEX</code> statement parsed from a CQL query. */
|
||||
class create_index_statement : public schema_altering_statement {
|
||||
const sstring _index_name;
|
||||
const std::vector<::shared_ptr<index_target::raw>> _raw_targets;
|
||||
|
||||
// Options specific to this index.
|
||||
const ::shared_ptr<index_specific_prop_defs> _idx_properties;
|
||||
// Options corresponding to the underlying materialized view.
|
||||
const view_prop_defs _view_properties;
|
||||
|
||||
const ::shared_ptr<index_prop_defs> _properties;
|
||||
const bool _if_not_exists;
|
||||
cql_stats* _cql_stats = nullptr;
|
||||
|
||||
public:
|
||||
create_index_statement(cf_name name, ::shared_ptr<index_name> index_name,
|
||||
std::vector<::shared_ptr<index_target::raw>> raw_targets,
|
||||
::shared_ptr<index_specific_prop_defs> idx_properties, view_prop_defs view_properties, bool if_not_exists);
|
||||
::shared_ptr<index_prop_defs> properties, bool if_not_exists);
|
||||
|
||||
future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
void validate(query_processor&, const service::client_state& state) const override;
|
||||
@@ -60,7 +53,7 @@ public:
|
||||
schema_ptr schema;
|
||||
index_metadata index;
|
||||
};
|
||||
std::pair<std::optional<base_schema_with_new_index>, cql3::cql_warnings_vec> build_index_schema(data_dictionary::database db, locator::token_metadata_ptr tmptr) const;
|
||||
std::optional<base_schema_with_new_index> build_index_schema(data_dictionary::database db) const;
|
||||
view_ptr create_view_for_index(const schema_ptr, const index_metadata& im, const data_dictionary::database&) const;
|
||||
private:
|
||||
void validate_for_local_index(const schema& schema) const;
|
||||
@@ -76,7 +69,7 @@ private:
|
||||
const sstring& name,
|
||||
index_metadata_kind kind,
|
||||
const index_options_map& options);
|
||||
std::pair<std::vector<::shared_ptr<index_target>>, cql3::cql_warnings_vec> validate_while_executing(data_dictionary::database db, locator::token_metadata_ptr tmptr) const;
|
||||
std::vector<::shared_ptr<index_target>> validate_while_executing(data_dictionary::database db) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -116,21 +116,21 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
|
||||
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
|
||||
}
|
||||
|
||||
// If RF-rack-validity must be enforced for the keyspace according to `enforce_rf_rack_validity_for_keyspace`,
|
||||
// it's forbidden to create an RF-rack-invalid keyspace. Verify that it's RF-rack-valid.
|
||||
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to create an RF-rack-invalid keyspace.
|
||||
// Verify that it's RF-rack-valid.
|
||||
// For more context, see: scylladb/scylladb#23071.
|
||||
try {
|
||||
// We hold a group0_guard, so it's correct to check this here.
|
||||
// The topology or schema cannot change while we're performing this query.
|
||||
locator::assert_rf_rack_valid_keyspace(_name, tmptr, *rs);
|
||||
} catch (const std::exception& e) {
|
||||
if (replica::database::enforce_rf_rack_validity_for_keyspace(cfg, *ksm)) {
|
||||
if (cfg.rf_rack_valid_keyspaces()) {
|
||||
// There's no guarantee what the type of the exception will be, so we need to
|
||||
// wrap it manually here in a type that can be passed to the user.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
} else {
|
||||
// Even when RF-rack-validity is not enforced for the keyspace, we'd
|
||||
// like to inform the user that the keyspace they're creating does not
|
||||
// Even when the configuration option `rf_rack_valid_keyspaces` is set to false,
|
||||
// we'd like to inform the user that the keyspace they're creating does not
|
||||
// satisfy the restriction--but just as a warning.
|
||||
// For more context, see issue: scylladb/scylladb#23330.
|
||||
warnings.push_back(seastar::format(
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include <unordered_set>
|
||||
@@ -106,7 +105,7 @@ static bool validate_primary_key(
|
||||
return new_non_pk_column;
|
||||
}
|
||||
|
||||
std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(data_dictionary::database db, locator::token_metadata_ptr tmptr) const {
|
||||
std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(data_dictionary::database db) const {
|
||||
// We need to make sure that:
|
||||
// - materialized view name is valid
|
||||
// - primary key includes all columns in base table's primary key
|
||||
@@ -120,7 +119,15 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
cql3::cql_warnings_vec warnings;
|
||||
|
||||
auto schema_extensions = _properties.properties()->make_schema_extensions(db.extensions());
|
||||
_properties.validate_raw(view_prop_defs::op_type::create, db, keyspace(), schema_extensions);
|
||||
_properties.validate(db, keyspace(), schema_extensions);
|
||||
|
||||
if (_properties.use_compact_storage()) {
|
||||
throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view"));
|
||||
}
|
||||
|
||||
if (_properties.properties()->get_cdc_options(schema_extensions)) {
|
||||
throw exceptions::invalid_request_exception("Cannot enable CDC for a materialized view");
|
||||
}
|
||||
|
||||
// View and base tables must be in the same keyspace, to ensure that RF
|
||||
// is the same (because we assign a view replica to each base replica).
|
||||
@@ -146,21 +153,12 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
schema_ptr schema = validation::validate_column_family(db, _base_name.get_keyspace(), _base_name.get_column_family());
|
||||
|
||||
try {
|
||||
db::view::validate_view_keyspace(db, keyspace(), tmptr);
|
||||
db::view::validate_view_keyspace(db, keyspace());
|
||||
} catch (const std::exception& e) {
|
||||
// The type of the thrown exception is not specified, so we need to wrap it here.
|
||||
throw exceptions::invalid_request_exception(e.what());
|
||||
}
|
||||
|
||||
if (db.find_keyspace(keyspace()).uses_tablets()) {
|
||||
warnings.emplace_back(
|
||||
"Creating a materialized view in a keyspaces that uses tablets requires "
|
||||
"the keyspace to remain RF-rack-valid while the materialized view exists. "
|
||||
"Some operations will be restricted to enforce this: altering the keyspace's replication "
|
||||
"factor, adding a node in a new rack, and removing or decommissioning a node that would "
|
||||
"eliminate a rack.");
|
||||
}
|
||||
|
||||
if (schema->is_counter()) {
|
||||
throw exceptions::invalid_request_exception(format("Materialized views are not supported on counter tables"));
|
||||
}
|
||||
@@ -343,7 +341,16 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
warnings.emplace_back(std::move(warning_text));
|
||||
}
|
||||
|
||||
schema_builder builder{keyspace(), column_family()};
|
||||
const auto maybe_id = _properties.properties()->get_id();
|
||||
if (maybe_id && db.try_find_table(*maybe_id)) {
|
||||
const auto schema_ptr = db.find_schema(*maybe_id);
|
||||
const auto& ks_name = schema_ptr->ks_name();
|
||||
const auto& cf_name = schema_ptr->cf_name();
|
||||
|
||||
throw exceptions::invalid_request_exception(seastar::format("Table with ID {} already exists: {}.{}", *maybe_id, ks_name, cf_name));
|
||||
}
|
||||
|
||||
schema_builder builder{keyspace(), column_family(), maybe_id};
|
||||
auto add_columns = [this, &builder] (std::vector<const column_definition*>& defs, column_kind kind) mutable {
|
||||
for (auto* def : defs) {
|
||||
auto&& type = _properties.get_reversable_type(*def->column_specification->name, def->type);
|
||||
@@ -389,8 +396,14 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
}
|
||||
}
|
||||
|
||||
_properties.apply_to_builder(view_prop_defs::op_type::create, builder, std::move(schema_extensions),
|
||||
db, keyspace(), is_colocated);
|
||||
_properties.properties()->apply_to_builder(builder, std::move(schema_extensions), db, keyspace(), !is_colocated);
|
||||
|
||||
if (builder.default_time_to_live().count() > 0) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Cannot set or alter default_time_to_live for a materialized view. "
|
||||
"Data in a materialized view always expire at the same time than "
|
||||
"the corresponding data in the parent table.");
|
||||
}
|
||||
|
||||
auto where_clause_text = util::relations_to_where_clause(_where_clause);
|
||||
builder.with_view_info(schema, included.empty(), std::move(where_clause_text));
|
||||
@@ -401,7 +414,7 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chunked_vector<mutation>, cql3::cql_warnings_vec>>
|
||||
create_view_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
|
||||
utils::chunked_vector<mutation> m;
|
||||
auto [definition, warnings] = prepare_view(qp.db(), qp.proxy().get_token_metadata_ptr());
|
||||
auto [definition, warnings] = prepare_view(qp.db());
|
||||
try {
|
||||
m = co_await service::prepare_new_view_announcement(qp.proxy(), std::move(definition), ts);
|
||||
} catch (const exceptions::already_exists_exception& e) {
|
||||
|
||||
@@ -7,9 +7,9 @@
|
||||
#pragma once
|
||||
|
||||
#include "cql3/statements/schema_altering_statement.hh"
|
||||
#include "cql3/statements/cf_properties.hh"
|
||||
#include "cql3/cf_name.hh"
|
||||
#include "cql3/expr/expression.hh"
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
@@ -35,7 +35,7 @@ private:
|
||||
expr::expression _where_clause;
|
||||
std::vector<::shared_ptr<cql3::column_identifier::raw>> _partition_keys;
|
||||
std::vector<::shared_ptr<cql3::column_identifier::raw>> _clustering_keys;
|
||||
view_prop_defs _properties;
|
||||
cf_properties _properties;
|
||||
bool _if_not_exists;
|
||||
|
||||
public:
|
||||
@@ -48,7 +48,7 @@ public:
|
||||
std::vector<::shared_ptr<cql3::column_identifier::raw>> clustering_keys,
|
||||
bool if_not_exists);
|
||||
|
||||
std::pair<view_ptr, cql3::cql_warnings_vec> prepare_view(data_dictionary::database db, locator::token_metadata_ptr tmptr) const;
|
||||
std::pair<view_ptr, cql3::cql_warnings_vec> prepare_view(data_dictionary::database db) const;
|
||||
|
||||
auto& properties() {
|
||||
return _properties;
|
||||
|
||||
@@ -710,12 +710,11 @@ std::vector<lw_shared_ptr<column_specification>> listing_describe_statement::get
|
||||
|
||||
future<std::vector<std::vector<managed_bytes_opt>>> listing_describe_statement::describe(cql3::query_processor& qp, const service::client_state& client_state) const {
|
||||
auto db = qp.db();
|
||||
auto raw_ks = client_state.get_raw_keyspace();
|
||||
|
||||
std::vector<sstring> keyspaces;
|
||||
// For most describe statements we should limit the results to the USEd
|
||||
// keyspace (client_state.get_raw_keyspace()), if any. However for DESC
|
||||
// KEYSPACES we must list all keyspaces, not just the USEd one.
|
||||
if (_element != element_type::keyspace && !client_state.get_raw_keyspace().empty()) {
|
||||
keyspaces.push_back(client_state.get_raw_keyspace());
|
||||
if (!raw_ks.empty()) {
|
||||
keyspaces.push_back(raw_ks);
|
||||
} else {
|
||||
keyspaces = db.get_all_keyspaces();
|
||||
std::ranges::sort(keyspaces);
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
#include <set>
|
||||
#include <seastar/core/format.hh>
|
||||
#include "index_prop_defs.hh"
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
#include "index/secondary_index.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
@@ -22,9 +21,7 @@ static void check_system_option_specified(const index_options_map& options, cons
|
||||
}
|
||||
}
|
||||
|
||||
namespace cql3::statements {
|
||||
|
||||
void index_specific_prop_defs::validate() const {
|
||||
void cql3::statements::index_prop_defs::validate() const {
|
||||
static std::set<sstring> keywords({ sstring(KW_OPTIONS) });
|
||||
|
||||
property_definitions::validate(keywords);
|
||||
@@ -43,13 +40,13 @@ void index_specific_prop_defs::validate() const {
|
||||
}
|
||||
|
||||
index_options_map
|
||||
index_specific_prop_defs::get_raw_options() const {
|
||||
cql3::statements::index_prop_defs::get_raw_options() const {
|
||||
auto options = get_map(KW_OPTIONS);
|
||||
return !options ? std::unordered_map<sstring, sstring>() : std::unordered_map<sstring, sstring>(options->begin(), options->end());
|
||||
}
|
||||
|
||||
index_options_map
|
||||
index_specific_prop_defs::get_options() const {
|
||||
cql3::statements::index_prop_defs::get_options() const {
|
||||
auto options = get_raw_options();
|
||||
options.emplace(db::index::secondary_index::custom_class_option_name, *custom_class);
|
||||
if (index_version.has_value()) {
|
||||
@@ -57,25 +54,3 @@ index_specific_prop_defs::get_options() const {
|
||||
}
|
||||
return options;
|
||||
}
|
||||
|
||||
void index_prop_defs::extract_index_specific_properties_to(index_specific_prop_defs& target) {
|
||||
if (properties()->has_property(index_specific_prop_defs::KW_OPTIONS)) {
|
||||
auto value = properties()->extract_property(index_specific_prop_defs::KW_OPTIONS);
|
||||
|
||||
std::visit([&target] <typename T> (T&& val) {
|
||||
target.add_property(index_specific_prop_defs::KW_OPTIONS, std::forward<T>(val));
|
||||
}, std::move(value));
|
||||
}
|
||||
}
|
||||
|
||||
view_prop_defs index_prop_defs::into_view_prop_defs() && {
|
||||
if (properties()->has_property(index_specific_prop_defs::KW_OPTIONS)) {
|
||||
utils::on_internal_error(seastar::format(
|
||||
"Precondition has been violated. The property '{}' is still present", index_specific_prop_defs::KW_OPTIONS));
|
||||
}
|
||||
|
||||
view_prop_defs result = std::move(static_cast<view_prop_defs&>(*this));
|
||||
return result;
|
||||
}
|
||||
|
||||
} // namespace cql3::statements
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
#include "property_definitions.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "schema/schema_fwd.hh"
|
||||
@@ -24,7 +23,7 @@ namespace cql3 {
|
||||
|
||||
namespace statements {
|
||||
|
||||
class index_specific_prop_defs : public property_definitions {
|
||||
class index_prop_defs : public property_definitions {
|
||||
public:
|
||||
static constexpr auto KW_OPTIONS = "options";
|
||||
|
||||
@@ -38,19 +37,6 @@ public:
|
||||
index_options_map get_options() const;
|
||||
};
|
||||
|
||||
struct index_prop_defs : public view_prop_defs {
|
||||
/// Extract all of the index-specific properties to `target`.
|
||||
///
|
||||
/// If there's a property at an index-specific key, and if `target` already has
|
||||
/// a value at that key, that value will be replaced.
|
||||
void extract_index_specific_properties_to(index_specific_prop_defs& target);
|
||||
|
||||
/// Turns this object into an object of type `view_prop_defs`, as if moved.
|
||||
///
|
||||
/// Precondition: the object MUST NOT contain any index-specific property.
|
||||
view_prop_defs into_view_prop_defs() &&;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,8 +8,8 @@
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include <seastar/core/format.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "seastar/core/format.hh"
|
||||
#include "seastar/core/sstring.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "cql3/statements/ks_prop_defs.hh"
|
||||
#include "cql3/statements/request_validations.hh"
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
#include <ranges>
|
||||
|
||||
#include <seastar/core/format.hh>
|
||||
#include <stdexcept>
|
||||
#include "cql3/statements/property_definitions.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
@@ -103,18 +102,6 @@ bool property_definitions::has_property(const sstring& name) const {
|
||||
return _properties.contains(name);
|
||||
}
|
||||
|
||||
property_definitions::value_type property_definitions::extract_property(const sstring& name) {
|
||||
auto it = _properties.find(name);
|
||||
|
||||
if (it == _properties.end()) {
|
||||
throw std::out_of_range{std::format("No property of name '{}'", std::string_view(name))};
|
||||
}
|
||||
|
||||
value_type result = std::move(it->second);
|
||||
_properties.erase(it);
|
||||
return result;
|
||||
}
|
||||
|
||||
std::optional<property_definitions::value_type> property_definitions::get(const sstring& name) const {
|
||||
if (auto it = _properties.find(name); it != _properties.end()) {
|
||||
return it->second;
|
||||
|
||||
@@ -59,8 +59,6 @@ protected:
|
||||
public:
|
||||
bool has_property(const sstring& name) const;
|
||||
|
||||
value_type extract_property(const sstring& name);
|
||||
|
||||
std::optional<value_type> get(const sstring& name) const;
|
||||
|
||||
std::optional<extended_map_type> get_extended_map(const sstring& name) const;
|
||||
|
||||
@@ -261,8 +261,7 @@ future<> select_statement::check_access(query_processor& qp, const service::clie
|
||||
auto& cf_name = s->is_view()
|
||||
? s->view_info()->base_name()
|
||||
: (cdc ? cdc->cf_name() : column_family());
|
||||
const schema_ptr& base_schema = cdc ? cdc : _schema;
|
||||
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*base_schema);
|
||||
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*_schema);
|
||||
co_await state.has_column_family_access(keyspace(), cf_name, auth::permission::SELECT, auth::command_desc::type::OTHER, is_vector_indexed);
|
||||
} catch (const data_dictionary::no_such_column_family& e) {
|
||||
// Will be validated afterwards.
|
||||
@@ -1977,7 +1976,9 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
|
||||
if (it == indexes.end()) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'");
|
||||
}
|
||||
|
||||
if (index_opt || parameters->allow_filtering() || !(restrictions->is_empty()) || check_needs_allow_filtering_anyway(*restrictions)) {
|
||||
throw exceptions::invalid_request_exception("ANN ordering by vector does not support filtering");
|
||||
}
|
||||
index_opt = *it;
|
||||
|
||||
if (!index_opt) {
|
||||
@@ -2273,9 +2274,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
|
||||
throw exceptions::invalid_request_exception("PER PARTITION LIMIT is not allowed with aggregate queries.");
|
||||
}
|
||||
|
||||
bool is_ann_query = !_parameters->orderings().empty() && std::holds_alternative<select_statement::ann_vector>(_parameters->orderings().front().second);
|
||||
|
||||
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering() || is_ann_query,
|
||||
auto restrictions = prepare_restrictions(db, schema, ctx, selection, for_view, _parameters->allow_filtering(),
|
||||
restrictions::check_indexes(!_parameters->is_mutation_fragments()));
|
||||
|
||||
if (_parameters->is_distinct()) {
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "cql3/statements/view_prop_defs.hh"
|
||||
|
||||
namespace cql3::statements {
|
||||
|
||||
void view_prop_defs::validate_raw(op_type op, const data_dictionary::database db, sstring ks_name,
|
||||
const schema::extensions_map& exts) const
|
||||
{
|
||||
cf_properties::validate(db, std::move(ks_name), exts);
|
||||
|
||||
if (use_compact_storage()) {
|
||||
throw exceptions::invalid_request_exception(format("Cannot use 'COMPACT STORAGE' when defining a materialized view"));
|
||||
}
|
||||
|
||||
if (properties()->get_cdc_options(exts)) {
|
||||
throw exceptions::invalid_request_exception("Cannot enable CDC for a materialized view");
|
||||
}
|
||||
|
||||
if (op == op_type::create) {
|
||||
const auto maybe_id = properties()->get_id();
|
||||
if (maybe_id && db.try_find_table(*maybe_id)) {
|
||||
const auto schema_ptr = db.find_schema(*maybe_id);
|
||||
const auto& ks_name = schema_ptr->ks_name();
|
||||
const auto& cf_name = schema_ptr->cf_name();
|
||||
|
||||
throw exceptions::invalid_request_exception(seastar::format("Table with ID {} already exists: {}.{}", *maybe_id, ks_name, cf_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void view_prop_defs::apply_to_builder(op_type op, schema_builder& builder, schema::extensions_map exts,
|
||||
const data_dictionary::database db, sstring ks_name, bool is_colocated) const
|
||||
{
|
||||
_properties->apply_to_builder(builder, exts, db, std::move(ks_name), !is_colocated);
|
||||
|
||||
if (op == op_type::create) {
|
||||
const auto maybe_id = properties()->get_id();
|
||||
if (maybe_id) {
|
||||
builder.set_uuid(*maybe_id);
|
||||
}
|
||||
}
|
||||
|
||||
if (op == op_type::alter) {
|
||||
if (builder.get_gc_grace_seconds() == 0) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Cannot alter gc_grace_seconds of a materialized view to 0, since this "
|
||||
"value is used to TTL undelivered updates. Setting gc_grace_seconds too "
|
||||
"low might cause undelivered updates to expire before being replayed.");
|
||||
}
|
||||
}
|
||||
|
||||
if (builder.default_time_to_live().count() > 0) {
|
||||
throw exceptions::invalid_request_exception(
|
||||
"Cannot set or alter default_time_to_live for a materialized view. "
|
||||
"Data in a materialized view always expire at the same time than "
|
||||
"the corresponding data in the parent table.");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace cql3::statements
|
||||
@@ -1,62 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cql3/statements/cf_properties.hh"
|
||||
|
||||
namespace cql3::statements {
|
||||
|
||||
/// This type represents the possible properties of the following CQL statements:
|
||||
///
|
||||
/// * CREATE MATERIALIZED VIEW,
|
||||
/// * ALTER MATERIALIZED VIEW.
|
||||
///
|
||||
/// Since the sets of the valid properties may differ between those statements, this type
|
||||
/// is supposed to represent a superset of them.
|
||||
///
|
||||
/// This type does NOT guarantee that all of the necessary validation logic will be performed
|
||||
/// by it. It strives to do that, but you should keep this in mind. What does that mean?
|
||||
/// Some parts of validation may require more context that's not accessible from here.
|
||||
///
|
||||
/// As of yet, this type does not cover all of the validation logic that could be here either.
|
||||
class view_prop_defs : public cf_properties {
|
||||
public:
|
||||
/// The type of a schema operation on a materialized view.
|
||||
/// These values will be used to guide the validation logic.
|
||||
enum class op_type {
|
||||
create,
|
||||
alter
|
||||
};
|
||||
|
||||
public:
|
||||
template <typename... Args>
|
||||
view_prop_defs(Args&&... args) : cf_properties(std::forward<Args>(args)...) {}
|
||||
|
||||
// Explicitly delete this method. It's declared in the inherited types.
|
||||
// The user of this interface should use `validate_raw` instead.
|
||||
void validate(const data_dictionary::database, sstring ks_name, const schema::extensions_map&) const = delete;
|
||||
|
||||
/// Validate the properties for the specified schema operation.
|
||||
///
|
||||
/// The validation is *raw* because we mostly validate the properties in their string form (checking if
|
||||
/// a property exists or not for instance) and only focus on the properties on their own, without
|
||||
/// having access to any other information.
|
||||
void validate_raw(op_type, const data_dictionary::database, sstring ks_name, const schema::extensions_map&) const;
|
||||
|
||||
/// Apply the properties to the provided schema_builder and validate them.
|
||||
///
|
||||
/// NOTE: If the validation fails, this function will throw an exception. What's more important,
|
||||
/// however, is that the provided schema_builder might have already been modified by that
|
||||
/// point. Because of that, in presence of an exception, the schema builder should NOT be
|
||||
/// used anymore.
|
||||
void apply_to_builder(op_type, schema_builder&, schema::extensions_map, const data_dictionary::database,
|
||||
sstring ks_name, bool is_colocated) const;
|
||||
};
|
||||
|
||||
} // namespace cql3::statements
|
||||
@@ -16,7 +16,6 @@
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
|
||||
@@ -378,7 +377,7 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
|
||||
|
||||
for (const auto& [fm, s] : fms) {
|
||||
mutations.emplace_back(fm.to_mutation(s));
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await maybe_yield();
|
||||
}
|
||||
|
||||
if (!mutations.empty()) {
|
||||
|
||||
@@ -502,9 +502,6 @@ public:
|
||||
void flush_segments(uint64_t size_to_remove);
|
||||
void check_no_data_older_than_allowed();
|
||||
|
||||
// whitebox testing
|
||||
std::function<future<>()> _oversized_pre_wait_memory_func;
|
||||
|
||||
private:
|
||||
class shutdown_marker{};
|
||||
|
||||
@@ -1600,15 +1597,8 @@ future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writ
|
||||
|
||||
scope_increment_counter allocating(totals.active_allocations);
|
||||
|
||||
// #27992 - whitebox testing. signal we are trying to lock out
|
||||
// all allocators
|
||||
if (_oversized_pre_wait_memory_func) {
|
||||
co_await _oversized_pre_wait_memory_func();
|
||||
}
|
||||
|
||||
auto permit = co_await std::move(fut);
|
||||
// #27992 - task reordering _can_ force the available units to negative. this is ok.
|
||||
SCYLLA_ASSERT(_request_controller.available_units() <= 0);
|
||||
SCYLLA_ASSERT(_request_controller.available_units() == 0);
|
||||
|
||||
decltype(permit) fake_permit; // can't have allocate+sync release semaphore.
|
||||
bool failed = false;
|
||||
@@ -1869,15 +1859,13 @@ future<> db::commitlog::segment_manager::oversized_allocation(entry_writer& writ
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto avail = _request_controller.available_units();
|
||||
SCYLLA_ASSERT(avail <= 0);
|
||||
SCYLLA_ASSERT(_request_controller.available_units() == 0);
|
||||
SCYLLA_ASSERT(permit.count() == max_request_controller_units());
|
||||
auto nw = _request_controller.waiters();
|
||||
permit.return_all();
|
||||
// #20633 cannot guarantee controller avail is now full, since we could have had waiters when doing
|
||||
// return all -> now will be less avail
|
||||
SCYLLA_ASSERT(nw > 0 || _request_controller.available_units() == (avail + ssize_t(max_request_controller_units())));
|
||||
SCYLLA_ASSERT(nw > 0 || _request_controller.available_units() == ssize_t(max_request_controller_units()));
|
||||
|
||||
if (!failed) {
|
||||
clogger.trace("Oversized allocation succeeded.");
|
||||
@@ -3961,9 +3949,6 @@ void db::commitlog::update_max_data_lifetime(std::optional<uint64_t> commitlog_d
|
||||
_segment_manager->cfg.commitlog_data_max_lifetime_in_seconds = commitlog_data_max_lifetime_in_seconds;
|
||||
}
|
||||
|
||||
void db::commitlog::set_oversized_pre_wait_memory_func(std::function<future<>()> f) {
|
||||
_segment_manager->_oversized_pre_wait_memory_func = std::move(f);
|
||||
}
|
||||
|
||||
future<std::vector<sstring>> db::commitlog::get_segments_to_replay() const {
|
||||
return _segment_manager->get_segments_to_replay();
|
||||
|
||||
@@ -385,9 +385,6 @@ public:
|
||||
// (Re-)set data mix lifetime.
|
||||
void update_max_data_lifetime(std::optional<uint64_t> commitlog_data_max_lifetime_in_seconds);
|
||||
|
||||
// Whitebox testing. Do not use for production
|
||||
void set_oversized_pre_wait_memory_func(std::function<future<>()>);
|
||||
|
||||
using commit_load_reader_func = std::function<future<>(buffer_and_replay_position)>;
|
||||
|
||||
class segment_error : public std::exception {};
|
||||
|
||||
15
db/config.cc
15
db/config.cc
@@ -1318,7 +1318,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable.")
|
||||
, prometheus_address(this, "prometheus_address", value_status::Used, {/* listen_address */}, "Prometheus listening address, defaulting to listen_address if not explicitly set.")
|
||||
, prometheus_prefix(this, "prometheus_prefix", value_status::Used, "scylla", "Set the prefix of the exported Prometheus metrics. Changing this will break Scylla's dashboard compatibility, do not change unless you know what you are doing.")
|
||||
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, true, "Enable Prometheus protobuf with native histogram. Set to false to force text exposition format.")
|
||||
, prometheus_allow_protobuf(this, "prometheus_allow_protobuf", value_status::Used, false, "If set allows the experimental Prometheus protobuf with native histogram")
|
||||
, abort_on_lsa_bad_alloc(this, "abort_on_lsa_bad_alloc", value_status::Used, false, "Abort when allocation in LSA region fails.")
|
||||
, murmur3_partitioner_ignore_msb_bits(this, "murmur3_partitioner_ignore_msb_bits", value_status::Used, default_murmur3_partitioner_ignore_msb_bits, "Number of most significant token bits to ignore in murmur3 partitioner; increase for very large clusters.")
|
||||
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.")
|
||||
@@ -1447,10 +1447,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"SELECT statements with aggregation or GROUP BYs or a secondary index may use this page size for their internal reading data, not the page size specified in the query options.")
|
||||
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
|
||||
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
|
||||
, alternator_port_proxy_protocol(this, "alternator_port_proxy_protocol", value_status::Used, 0,
|
||||
"Port on which the Alternator API listens for clients using proxy protocol v2. Disabled (0) by default.")
|
||||
, alternator_https_port_proxy_protocol(this, "alternator_https_port_proxy_protocol", value_status::Used, 0,
|
||||
"Port on which the Alternator HTTPS API listens for clients using proxy protocol v2. Disabled (0) by default.")
|
||||
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
|
||||
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
|
||||
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
|
||||
@@ -1484,13 +1480,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
|
||||
, alternator_describe_table_info_cache_validity_in_seconds(this, "alternator_describe_table_info_cache_validity_in_seconds", liveness::LiveUpdate, value_status::Used, 60 * 60 * 6,
|
||||
"The validity of DescribeTable information - table size in bytes. This is how long calculated value will be reused before recalculation.")
|
||||
, alternator_response_gzip_compression_level(this, "alternator_response_gzip_compression_level", liveness::LiveUpdate, value_status::Used, int8_t(6),
|
||||
"Controls gzip and deflate compression level for Alternator response bodies (if the client requests it via Accept-Encoding header) Default of 6 is a compromise between speed and compression.\n"
|
||||
"Valid values:\n"
|
||||
"\t0 : No compression (disables gzip/deflate)\n"
|
||||
"\t1-9: Compression levels (1 = fastest, 9 = best compression)")
|
||||
, alternator_response_compression_threshold_in_bytes(this, "alternator_response_compression_threshold_in_bytes", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
|
||||
"When the compression is enabled, this value indicates the minimum size of data to compress. Smaller responses will not be compressed.")
|
||||
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
|
||||
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
|
||||
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
|
||||
@@ -1570,8 +1559,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"\tdisabled: New keyspaces use vnodes by default, unless enabled by the tablets={'enabled':true} option\n"
|
||||
"\tenabled: New keyspaces use tablets by default, unless disabled by the tablets={'enabled':false} option\n"
|
||||
"\tenforced: New keyspaces must use tablets. Tablets cannot be disabled using the CREATE KEYSPACE option")
|
||||
, auto_repair_enabled_default(this, "auto_repair_enabled_default", liveness::LiveUpdate, value_status::Used, false, "Set true to enable auto repair for tablet tables by default. The value will be overridden by the per keyspace or per table configuration which is not implemented yet.")
|
||||
, auto_repair_threshold_default_in_seconds(this, "auto_repair_threshold_default_in_seconds", liveness::LiveUpdate, value_status::Used, 24 * 3600 , "Set the default time in seconds for the auto repair threshold for tablet tables. If the time since last repair is bigger than the configured time, the tablet is eligible for auto repair. The value will be overridden by the per keyspace or per table configuration which is not implemented yet.")
|
||||
, view_flow_control_delay_limit_in_ms(this, "view_flow_control_delay_limit_in_ms", liveness::LiveUpdate, value_status::Used, 1000,
|
||||
"The maximal amount of time that materialized-view update flow control may delay responses "
|
||||
"to try to slow down the client and prevent buildup of unfinished view updates. "
|
||||
|
||||
@@ -464,8 +464,6 @@ public:
|
||||
|
||||
named_value<uint16_t> alternator_port;
|
||||
named_value<uint16_t> alternator_https_port;
|
||||
named_value<uint16_t> alternator_port_proxy_protocol;
|
||||
named_value<uint16_t> alternator_https_port_proxy_protocol;
|
||||
named_value<sstring> alternator_address;
|
||||
named_value<bool> alternator_enforce_authorization;
|
||||
named_value<bool> alternator_warn_authorization;
|
||||
@@ -480,8 +478,6 @@ public:
|
||||
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
|
||||
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
|
||||
named_value<uint32_t> alternator_describe_table_info_cache_validity_in_seconds;
|
||||
named_value<int> alternator_response_gzip_compression_level;
|
||||
named_value<uint32_t> alternator_response_compression_threshold_in_bytes;
|
||||
|
||||
named_value<bool> abort_on_ebadf;
|
||||
|
||||
@@ -571,8 +567,6 @@ public:
|
||||
named_value<double> topology_barrier_stall_detector_threshold_seconds;
|
||||
named_value<bool> enable_tablets;
|
||||
named_value<enum_option<tablets_mode_t>> tablets_mode_for_new_keyspaces;
|
||||
named_value<bool> auto_repair_enabled_default;
|
||||
named_value<int32_t> auto_repair_threshold_default_in_seconds;
|
||||
|
||||
bool enable_tablets_by_default() const noexcept {
|
||||
switch (tablets_mode_for_new_keyspaces()) {
|
||||
|
||||
@@ -644,12 +644,6 @@ future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (!replay_allowed()) {
|
||||
auto reason = seastar::format("Precondition violdated while trying to drain {} / {}: "
|
||||
"hint replay is not allowed", host_id, ip);
|
||||
on_internal_error(manager_logger, std::move(reason));
|
||||
}
|
||||
|
||||
manager_logger.info("Draining starts for {}", host_id);
|
||||
|
||||
const auto holder = seastar::gate::holder{_draining_eps_gate};
|
||||
|
||||
@@ -318,10 +318,6 @@ public:
|
||||
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
|
||||
/// corresponding hint_endpoint_manager objects.
|
||||
///
|
||||
/// Preconditions:
|
||||
/// * Hint replay must be allowed (i.e. `replay_allowed()` must be true) throughout
|
||||
/// the execution of this function.
|
||||
///
|
||||
/// \param host_id host ID of the node that left the cluster
|
||||
/// \param ip the IP of the node that left the cluster
|
||||
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
|
||||
@@ -346,15 +342,15 @@ public:
|
||||
return _state.contains(state::started);
|
||||
}
|
||||
|
||||
bool replay_allowed() const noexcept {
|
||||
return _state.contains(state::replay_allowed);
|
||||
}
|
||||
|
||||
private:
|
||||
void set_started() noexcept {
|
||||
_state.set(state::started);
|
||||
}
|
||||
|
||||
bool replay_allowed() const noexcept {
|
||||
return _state.contains(state::replay_allowed);
|
||||
}
|
||||
|
||||
void set_draining_all() noexcept {
|
||||
_state.set(state::draining_all);
|
||||
}
|
||||
|
||||
@@ -12,23 +12,13 @@
|
||||
#include <yaml-cpp/yaml.h>
|
||||
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include "utils/s3/creds.hh"
|
||||
#include "utils/http.hh"
|
||||
#include "object_storage_endpoint_param.hh"
|
||||
|
||||
using namespace std::string_literals;
|
||||
|
||||
static auto format_url(std::string_view host, unsigned port, bool use_https) {
|
||||
return fmt::format("{}://{}:{}", use_https ? "https" : "http", host, port);
|
||||
}
|
||||
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(s3_storage s)
|
||||
: _data(std::move(s))
|
||||
{}
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(std::string endpoint, s3::endpoint_config config)
|
||||
: object_storage_endpoint_param(s3_storage{format_url(endpoint, config.port, config.use_https), std::move(config.region), std::move(config.role_arn), true /* legacy_format */})
|
||||
{}
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(gs_storage s)
|
||||
: _data(std::move(s))
|
||||
{}
|
||||
@@ -37,29 +27,13 @@ db::object_storage_endpoint_param::object_storage_endpoint_param() = default;
|
||||
db::object_storage_endpoint_param::object_storage_endpoint_param(const object_storage_endpoint_param&) = default;
|
||||
|
||||
std::string db::object_storage_endpoint_param::s3_storage::to_json_string() const {
|
||||
if (!legacy_format) {
|
||||
return fmt::format("{{ \"type\": \"s3\", \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
|
||||
region, iam_role_arn
|
||||
);
|
||||
}
|
||||
|
||||
auto url = utils::http::parse_simple_url(endpoint);
|
||||
return fmt::format("{{ \"port\": {}, \"use_https\": {}, \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
|
||||
url.port, url.is_https(), region, iam_role_arn
|
||||
return fmt::format("{{ \"type\": \"s3\", \"aws_region\": \"{}\", \"iam_role_arn\": \"{}\" }}",
|
||||
region, iam_role_arn
|
||||
);
|
||||
}
|
||||
|
||||
std::string db::object_storage_endpoint_param::s3_storage::key() const {
|
||||
// The `endpoint` is full URL all the time, so only return it as a key
|
||||
// if it wasn't configured "the old way". In the latter case, split the
|
||||
// URL and return its host part to mimic the old behavior.
|
||||
|
||||
if (!legacy_format) {
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
auto url = utils::http::parse_simple_url(endpoint);
|
||||
return url.host;
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
std::string db::object_storage_endpoint_param::gs_storage::to_json_string() const {
|
||||
@@ -127,21 +101,20 @@ db::object_storage_endpoint_param db::object_storage_endpoint_param::decode(cons
|
||||
return tmp ? tmp.template as<std::decay_t<decltype(def)>>() : def;
|
||||
};
|
||||
// aws s3 endpoint.
|
||||
if (!type || type.as<std::string>() == s3_type) {
|
||||
if (!type || type.as<std::string>() == s3_type ) {
|
||||
s3_storage ep;
|
||||
auto endpoint = name.as<std::string>();
|
||||
ep.legacy_format = (!endpoint.starts_with("http://") && !endpoint.starts_with("https://"));
|
||||
|
||||
if (!ep.legacy_format) {
|
||||
ep.endpoint = std::move(endpoint);
|
||||
} else {
|
||||
ep.endpoint = format_url(endpoint, node["port"].as<unsigned>(), node["https"].as<bool>(false));
|
||||
}
|
||||
|
||||
ep.endpoint = name.as<std::string>();
|
||||
auto aws_region = node["aws_region"];
|
||||
ep.region = aws_region ? aws_region.as<std::string>() : std::getenv("AWS_DEFAULT_REGION");
|
||||
ep.iam_role_arn = get_opt(node, "iam_role_arn", ""s);
|
||||
|
||||
if (maybe_legacy_endpoint_name(ep.endpoint)) {
|
||||
// Support legacy config for a while
|
||||
auto port = node["port"].as<unsigned>();
|
||||
auto use_https = node["https"].as<bool>(false);
|
||||
ep.endpoint = fmt::format("http{}://{}:{}", use_https ? "s" : "", ep.endpoint, port);
|
||||
}
|
||||
|
||||
return object_storage_endpoint_param{std::move(ep)};
|
||||
}
|
||||
// GCS endpoint
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
#include <variant>
|
||||
#include <compare>
|
||||
#include <fmt/core.h>
|
||||
#include "utils/s3/creds.hh"
|
||||
|
||||
namespace YAML {
|
||||
class Node;
|
||||
@@ -27,7 +26,6 @@ public:
|
||||
std::string endpoint;
|
||||
std::string region;
|
||||
std::string iam_role_arn;
|
||||
bool legacy_format; // FIXME convert it to bool_class after seastar#3198
|
||||
|
||||
std::strong_ordering operator<=>(const s3_storage&) const = default;
|
||||
std::string to_json_string() const;
|
||||
@@ -45,7 +43,6 @@ public:
|
||||
object_storage_endpoint_param();
|
||||
object_storage_endpoint_param(const object_storage_endpoint_param&);
|
||||
object_storage_endpoint_param(s3_storage);
|
||||
object_storage_endpoint_param(std::string endpoint, s3::endpoint_config config);
|
||||
object_storage_endpoint_param(gs_storage);
|
||||
|
||||
std::strong_ordering operator<=>(const object_storage_endpoint_param&) const;
|
||||
@@ -79,3 +76,7 @@ template <>
|
||||
struct fmt::formatter<db::object_storage_endpoint_param> : fmt::formatter<std::string_view> {
|
||||
auto format(const db::object_storage_endpoint_param&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
inline bool maybe_legacy_endpoint_name(std::string_view ep) noexcept {
|
||||
return !(ep.starts_with("http://") || ep.starts_with("https://"));
|
||||
}
|
||||
|
||||
@@ -961,15 +961,15 @@ public:
|
||||
|
||||
auto include_pending_changes = [&table_schemas](schema_diff_per_shard d) -> future<> {
|
||||
for (auto& schema : d.dropped) {
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await maybe_yield();
|
||||
table_schemas.erase(schema->id());
|
||||
}
|
||||
for (auto& change : d.altered) {
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await maybe_yield();
|
||||
table_schemas.insert_or_assign(change.new_schema->id(), change.new_schema);
|
||||
}
|
||||
for (auto& schema : d.created) {
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await maybe_yield();
|
||||
table_schemas.insert_or_assign(schema->id(), schema);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -152,8 +152,7 @@ future<> backup_task_impl::do_backup() {
|
||||
}
|
||||
|
||||
future<> backup_task_impl::process_snapshot_dir() {
|
||||
auto directory = co_await io_check(open_directory, _snapshot_dir.native());
|
||||
auto snapshot_dir_lister = directory_lister(directory, _snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
||||
auto snapshot_dir_lister = directory_lister(_snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
||||
size_t num_sstable_comps = 0;
|
||||
|
||||
try {
|
||||
@@ -162,7 +161,7 @@ future<> backup_task_impl::process_snapshot_dir() {
|
||||
while (auto component_ent = co_await snapshot_dir_lister.get()) {
|
||||
const auto& name = component_ent->name;
|
||||
auto file_path = _snapshot_dir / name;
|
||||
auto st = co_await file_stat(directory, name);
|
||||
auto st = co_await file_stat(file_path.native());
|
||||
total += st.size;
|
||||
try {
|
||||
auto desc = sstables::parse_path(file_path, "", "");
|
||||
|
||||
@@ -140,7 +140,6 @@ namespace {
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
system_keyspace::CLIENT_ROUTES,
|
||||
system_keyspace::REPAIR_TASKS,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.is_group0_table = true;
|
||||
@@ -491,24 +490,6 @@ schema_ptr system_keyspace::repair_history() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::repair_tasks() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
|
||||
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
|
||||
.with_column("task_uuid", uuid_type, column_kind::partition_key)
|
||||
.with_column("operation", utf8_type, column_kind::clustering_key)
|
||||
// First and last token for of the tablet
|
||||
.with_column("first_token", long_type, column_kind::clustering_key)
|
||||
.with_column("last_token", long_type, column_kind::clustering_key)
|
||||
.with_column("timestamp", timestamp_type)
|
||||
.with_column("table_uuid", uuid_type, column_kind::static_column)
|
||||
.set_comment("Record tablet repair tasks")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::built_indexes() {
|
||||
static thread_local auto built_indexes = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
|
||||
@@ -2375,7 +2356,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
corrupt_data(),
|
||||
scylla_local(), db::schema_tables::scylla_table_schema_history(),
|
||||
repair_history(),
|
||||
repair_tasks(),
|
||||
v3::views_builds_in_progress(), v3::built_views(),
|
||||
v3::scylla_views_builds_in_progress(),
|
||||
v3::truncated(),
|
||||
@@ -2619,32 +2599,6 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
|
||||
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
|
||||
constexpr int ttl = 10 * 24 * 3600;
|
||||
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
|
||||
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
|
||||
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
|
||||
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
|
||||
utils::chunked_vector<canonical_mutation> cmuts(muts.begin(), muts.end());
|
||||
co_return cmuts;
|
||||
}
|
||||
|
||||
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
|
||||
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
|
||||
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
||||
repair_task_entry ent;
|
||||
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
|
||||
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
|
||||
ent.first_token = row.get_as<int64_t>("first_token");
|
||||
ent.last_token = row.get_as<int64_t>("last_token");
|
||||
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
|
||||
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
|
||||
co_await f(std::move(ent));
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
|
||||
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
|
||||
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
|
||||
@@ -3295,7 +3249,7 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features"));
|
||||
}
|
||||
|
||||
if (row.has("topology_request") && nstate != service::node_state::left) {
|
||||
if (row.has("topology_request")) {
|
||||
auto req = service::topology_request_from_string(row.get_as<sstring>("topology_request"));
|
||||
ret.requests.emplace(host_id, req);
|
||||
switch(req) {
|
||||
@@ -3845,35 +3799,4 @@ future<> system_keyspace::apply_mutation(mutation m) {
|
||||
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
|
||||
}
|
||||
|
||||
// The names are persisted in system tables so should not be changed.
|
||||
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
|
||||
{system_keyspace::repair_task_operation::requested, "requested"},
|
||||
{system_keyspace::repair_task_operation::finished, "finished"},
|
||||
};
|
||||
|
||||
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
|
||||
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
|
||||
for (auto&& [v, s] : repair_task_operation_to_name) {
|
||||
result.emplace(s, v);
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
|
||||
auto i = repair_task_operation_to_name.find(op);
|
||||
if (i == repair_task_operation_to_name.end()) {
|
||||
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
|
||||
}
|
||||
return i->second;
|
||||
}
|
||||
|
||||
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
|
||||
return repair_task_operation_from_name.at(name);
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
|
||||
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
|
||||
}
|
||||
|
||||
@@ -57,8 +57,6 @@ namespace paxos {
|
||||
struct topology_request_state;
|
||||
|
||||
class group0_guard;
|
||||
|
||||
class raft_group0_client;
|
||||
}
|
||||
|
||||
namespace netw {
|
||||
@@ -187,7 +185,6 @@ public:
|
||||
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
|
||||
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
|
||||
static constexpr auto REPAIR_HISTORY = "repair_history";
|
||||
static constexpr auto REPAIR_TASKS = "repair_tasks";
|
||||
static constexpr auto GROUP0_HISTORY = "group0_history";
|
||||
static constexpr auto DISCOVERY = "discovery";
|
||||
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
|
||||
@@ -267,7 +264,6 @@ public:
|
||||
static schema_ptr raft();
|
||||
static schema_ptr raft_snapshots();
|
||||
static schema_ptr repair_history();
|
||||
static schema_ptr repair_tasks();
|
||||
static schema_ptr group0_history();
|
||||
static schema_ptr discovery();
|
||||
static schema_ptr broadcast_kv_store();
|
||||
@@ -407,22 +403,6 @@ public:
|
||||
int64_t range_end;
|
||||
};
|
||||
|
||||
enum class repair_task_operation {
|
||||
requested,
|
||||
finished,
|
||||
};
|
||||
static sstring repair_task_operation_to_string(repair_task_operation op);
|
||||
static repair_task_operation repair_task_operation_from_string(const sstring& name);
|
||||
|
||||
struct repair_task_entry {
|
||||
tasks::task_id task_uuid;
|
||||
repair_task_operation operation;
|
||||
int64_t first_token;
|
||||
int64_t last_token;
|
||||
db_clock::time_point timestamp;
|
||||
table_id table_uuid;
|
||||
};
|
||||
|
||||
struct topology_requests_entry {
|
||||
utils::UUID id;
|
||||
utils::UUID initiating_host;
|
||||
@@ -444,10 +424,6 @@ public:
|
||||
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
|
||||
future<> get_repair_history(table_id, repair_history_consumer f);
|
||||
|
||||
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
|
||||
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
|
||||
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
|
||||
|
||||
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
|
||||
future<replay_positions> get_truncated_positions(table_id);
|
||||
future<> drop_truncation_rp_records();
|
||||
@@ -757,8 +733,3 @@ public:
|
||||
}; // class system_keyspace
|
||||
|
||||
} // namespace db
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
|
||||
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
228
db/view/view.cc
228
db/view/view.cc
@@ -57,7 +57,7 @@
|
||||
#include "locator/network_topology_strategy.hh"
|
||||
#include "mutation/mutation.hh"
|
||||
#include "mutation/mutation_partition.hh"
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "seastar/core/on_internal_error.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
@@ -2244,7 +2244,7 @@ future<> view_builder::start_in_background(service::migration_manager& mm, utils
|
||||
// Guard the whole startup routine with a semaphore,
|
||||
// so that it's not intercepted by `on_drop_view`, `on_create_view`
|
||||
// or `on_update_view` events.
|
||||
auto units = co_await get_units(_sem, view_builder_semaphore_units);
|
||||
auto units = co_await get_units(_sem, 1);
|
||||
// Wait for schema agreement even if we're a seed node.
|
||||
co_await mm.wait_for_schema_agreement(_db, db::timeout_clock::time_point::max(), &_as);
|
||||
|
||||
@@ -2659,7 +2659,7 @@ future<> view_builder::add_new_view(view_ptr view, build_step& step) {
|
||||
co_await utils::get_local_injector().inject("add_new_view_pause_last_shard", utils::wait_for_message(5min));
|
||||
}
|
||||
|
||||
co_await _sys_ks.register_view_for_building(view->ks_name(), view->cf_name(), step.current_token());
|
||||
co_await _sys_ks.register_view_for_building_for_all_shards(view->ks_name(), view->cf_name(), step.current_token());
|
||||
step.build_status.emplace(step.build_status.begin(), view_build_status{view, step.current_token(), std::nullopt});
|
||||
}
|
||||
|
||||
@@ -2667,74 +2667,40 @@ static bool should_ignore_tablet_keyspace(const replica::database& db, const sst
|
||||
return db.features().view_building_coordinator && db.has_keyspace(ks_name) && db.find_keyspace(ks_name).uses_tablets();
|
||||
}
|
||||
|
||||
future<> view_builder::dispatch_create_view(sstring ks_name, sstring view_name) {
|
||||
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
|
||||
// This runs on shard 0 only; seed the global rows before broadcasting.
|
||||
return handle_seed_view_build_progress(ks_name, view_name).then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
|
||||
return container().invoke_on_all([ks_name = std::move(ks_name), view_name = std::move(view_name)] (view_builder& vb) mutable {
|
||||
return vb.handle_create_view_local(std::move(ks_name), std::move(view_name));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::handle_seed_view_build_progress(sstring ks_name, sstring view_name) {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto& step = get_or_create_build_step(view->view_info()->base_id());
|
||||
return _sys_ks.register_view_for_building_for_all_shards(view->ks_name(), view->cf_name(), step.current_token());
|
||||
}
|
||||
|
||||
future<> view_builder::handle_create_view_local(sstring ks_name, sstring view_name){
|
||||
if (this_shard_id() == 0) {
|
||||
return handle_create_view_local_impl(std::move(ks_name), std::move(view_name));
|
||||
} else {
|
||||
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
|
||||
return handle_create_view_local_impl(std::move(ks_name), std::move(view_name));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> view_builder::handle_create_view_local_impl(sstring ks_name, sstring view_name) {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto& step = get_or_create_build_step(view->view_info()->base_id());
|
||||
return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] {
|
||||
return flush_base(step.base, _as);
|
||||
}).then([this, view, &step] () {
|
||||
// This resets the build step to the current token. It may result in views currently
|
||||
// being built to receive duplicate updates, but it simplifies things as we don't have
|
||||
// to keep around a list of new views to build the next time the reader crosses a token
|
||||
// threshold.
|
||||
return initialize_reader_at_current_token(step).then([this, view, &step] () mutable {
|
||||
return add_new_view(view, step);
|
||||
}).then_wrapped([this, view] (future<>&& f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (abort_requested_exception&) {
|
||||
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
|
||||
} catch (raft::request_aborted&) {
|
||||
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
|
||||
} catch (...) {
|
||||
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception());
|
||||
}
|
||||
|
||||
// Waited on indirectly in stop().
|
||||
static_cast<void>(_build_step.trigger());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) {
|
||||
if (this_shard_id() != 0) {
|
||||
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do it in the background, serialized and broadcast from shard 0.
|
||||
static_cast<void>(dispatch_create_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to dispatch view creation {}.{}: {}", ks_name, view_name, ep);
|
||||
}));
|
||||
// Do it in the background, serialized.
|
||||
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto& step = get_or_create_build_step(view->view_info()->base_id());
|
||||
return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] {
|
||||
return flush_base(step.base, _as);
|
||||
}).then([this, view, &step] () mutable {
|
||||
// This resets the build step to the current token. It may result in views currently
|
||||
// being built to receive duplicate updates, but it simplifies things as we don't have
|
||||
// to keep around a list of new views to build the next time the reader crosses a token
|
||||
// threshold.
|
||||
return initialize_reader_at_current_token(step).then([this, view, &step] () mutable {
|
||||
return add_new_view(view, step).then_wrapped([this, view] (future<>&& f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (abort_requested_exception&) {
|
||||
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
|
||||
} catch (raft::request_aborted&) {
|
||||
vlogger.debug("Aborted while setting up view for building {}.{}", view->ks_name(), view->cf_name());
|
||||
} catch (...) {
|
||||
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), std::current_exception());
|
||||
}
|
||||
|
||||
// Waited on indirectly in stop().
|
||||
(void)_build_step.trigger();
|
||||
});
|
||||
});
|
||||
});
|
||||
}).handle_exception_type([] (replica::no_such_column_family&) { });
|
||||
}
|
||||
|
||||
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
|
||||
@@ -2743,7 +2709,7 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na
|
||||
}
|
||||
|
||||
// Do it in the background, serialized.
|
||||
(void)with_semaphore(_sem, view_builder_semaphore_units, [ks_name, view_name, this] {
|
||||
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
|
||||
if (step_it == _base_to_build_step.end()) {
|
||||
@@ -2758,75 +2724,45 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na
|
||||
}).handle_exception_type([] (replica::no_such_column_family&) { });
|
||||
}
|
||||
|
||||
future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) {
|
||||
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
|
||||
// This runs on shard 0 only; broadcast local cleanup before global cleanup.
|
||||
return container().invoke_on_all([ks_name, view_name] (view_builder& vb) mutable {
|
||||
return vb.handle_drop_view_local(std::move(ks_name), std::move(view_name));
|
||||
}).then([this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
|
||||
return handle_drop_view_global_cleanup(std::move(ks_name), std::move(view_name));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> view_builder::handle_drop_view_local(sstring ks_name, sstring view_name) {
|
||||
if (this_shard_id() == 0) {
|
||||
return handle_drop_view_local_impl(std::move(ks_name), std::move(view_name));
|
||||
} else {
|
||||
return with_semaphore(_sem, view_builder_semaphore_units, [this, ks_name = std::move(ks_name), view_name = std::move(view_name)] () mutable {
|
||||
return handle_drop_view_local_impl(std::move(ks_name), std::move(view_name));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> view_builder::handle_drop_view_local_impl(sstring ks_name, sstring view_name) {
|
||||
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
|
||||
// The view is absent from the database at this point, so find it by brute force.
|
||||
([&, this] {
|
||||
for (auto& [_, step] : _base_to_build_step) {
|
||||
if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) {
|
||||
continue;
|
||||
}
|
||||
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
|
||||
if (it->view->cf_name() == view_name) {
|
||||
_built_views.erase(it->view->id());
|
||||
step.build_status.erase(it);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
})();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> view_builder::handle_drop_view_global_cleanup(sstring ks_name, sstring view_name) {
|
||||
if (this_shard_id() != 0) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
vlogger.info0("Starting view global cleanup {}.{}", ks_name, view_name);
|
||||
return when_all_succeed(
|
||||
_sys_ks.remove_view_build_progress_across_all_shards(ks_name, view_name),
|
||||
_sys_ks.remove_built_view(ks_name, view_name),
|
||||
remove_view_build_status(ks_name, view_name))
|
||||
.discard_result()
|
||||
.handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep);
|
||||
});
|
||||
}
|
||||
|
||||
void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) {
|
||||
if (this_shard_id() != 0) {
|
||||
if (should_ignore_tablet_keyspace(_db, ks_name)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do it in the background, serialized and broadcast from shard 0.
|
||||
static_cast<void>(dispatch_drop_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to dispatch view drop {}.{}: {}", ks_name, view_name, ep);
|
||||
}));
|
||||
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
|
||||
// Do it in the background, serialized.
|
||||
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
// The view is absent from the database at this point, so find it by brute force.
|
||||
([&, this] {
|
||||
for (auto& [_, step] : _base_to_build_step) {
|
||||
if (step.build_status.empty() || step.build_status.front().view->ks_name() != ks_name) {
|
||||
continue;
|
||||
}
|
||||
for (auto it = step.build_status.begin(); it != step.build_status.end(); ++it) {
|
||||
if (it->view->cf_name() == view_name) {
|
||||
_built_views.erase(it->view->id());
|
||||
step.build_status.erase(it);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
})();
|
||||
if (this_shard_id() != 0) {
|
||||
// Shard 0 can't remove the entry in the build progress system table on behalf of the
|
||||
// current shard, since shard 0 may have already processed the notification, and this
|
||||
// shard may since have updated the system table if the drop happened concurrently
|
||||
// with the build.
|
||||
return _sys_ks.remove_view_build_progress(ks_name, view_name);
|
||||
}
|
||||
return when_all_succeed(
|
||||
_sys_ks.remove_view_build_progress(ks_name, view_name),
|
||||
_sys_ks.remove_built_view(ks_name, view_name),
|
||||
remove_view_build_status(ks_name, view_name))
|
||||
.discard_result()
|
||||
.handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep);
|
||||
});
|
||||
}).handle_exception_type([] (replica::no_such_keyspace&) {});
|
||||
}
|
||||
|
||||
future<> view_builder::do_build_step() {
|
||||
@@ -2837,7 +2773,7 @@ future<> view_builder::do_build_step() {
|
||||
return seastar::async(std::move(attr), [this] {
|
||||
exponential_backoff_retry r(1s, 1min);
|
||||
while (!_base_to_build_step.empty() && !_as.abort_requested()) {
|
||||
auto units = get_units(_sem, view_builder_semaphore_units).get();
|
||||
auto units = get_units(_sem, 1).get();
|
||||
++_stats.steps_performed;
|
||||
try {
|
||||
execute(_current_step->second, exponential_backoff_retry(1s, 1min));
|
||||
@@ -3697,20 +3633,20 @@ sstring build_status_to_sstring(build_status status) {
|
||||
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", (int)status));
|
||||
}
|
||||
|
||||
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name, locator::token_metadata_ptr tmptr) {
|
||||
const auto& rs = db.find_keyspace(keyspace_name).get_replication_strategy();
|
||||
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name) {
|
||||
const bool tablet_views_enabled = db.features().views_with_tablets;
|
||||
// Note: if the configuration option `rf_rack_valid_keyspaces` is enabled, we can be
|
||||
// sure that all tablet-based keyspaces are RF-rack-valid. We check that
|
||||
// at start-up and then we don't allow for creating RF-rack-invalid keyspaces.
|
||||
const bool rf_rack_valid_keyspaces = db.get_config().rf_rack_valid_keyspaces();
|
||||
const bool required_config = tablet_views_enabled && rf_rack_valid_keyspaces;
|
||||
|
||||
if (rs.uses_tablets() && !db.features().views_with_tablets) {
|
||||
const bool uses_tablets = db.find_keyspace(keyspace_name).get_replication_strategy().uses_tablets();
|
||||
|
||||
if (!required_config && uses_tablets) {
|
||||
throw std::logic_error("Materialized views and secondary indexes are not supported on base tables with tablets. "
|
||||
"To be able to use them, make sure all nodes in the cluster are upgraded.");
|
||||
}
|
||||
|
||||
try {
|
||||
locator::assert_rf_rack_valid_keyspace(keyspace_name, tmptr, rs);
|
||||
} catch (const std::exception& e) {
|
||||
throw std::logic_error(fmt::format(
|
||||
"Materialized views and secondary indexes are not supported on the keyspace '{}': {}",
|
||||
keyspace_name, e.what()));
|
||||
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and make sure "
|
||||
"that the cluster feature `VIEWS_WITH_TABLETS` is enabled.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "gc_clock.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include "query/query-request.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "readers/mutation_reader.hh"
|
||||
@@ -319,7 +318,7 @@ endpoints_to_update get_view_natural_endpoint(
|
||||
///
|
||||
/// Preconditions:
|
||||
/// * The provided `keyspace_name` must correspond to an existing keyspace.
|
||||
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name, locator::token_metadata_ptr tmptr);
|
||||
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -169,11 +169,10 @@ class view_builder final : public service::migration_listener::only_view_notific
|
||||
base_to_build_step_type _base_to_build_step;
|
||||
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
|
||||
serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
|
||||
static constexpr size_t view_builder_semaphore_units = 1;
|
||||
// Ensures bookkeeping operations are serialized, meaning that while we execute
|
||||
// a build step we don't consider newly added or removed views. This simplifies
|
||||
// the algorithms. Also synchronizes an operation wrt. a call to stop().
|
||||
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
|
||||
seastar::named_semaphore _sem{1, named_semaphore_exception_factory{"view builder"}};
|
||||
seastar::abort_source _as;
|
||||
future<> _started = make_ready_future<>();
|
||||
// Used to coordinate between shards the conclusion of the build process for a particular view.
|
||||
@@ -267,14 +266,6 @@ private:
|
||||
future<> maybe_mark_view_as_built(view_ptr, dht::token);
|
||||
future<> mark_as_built(view_ptr);
|
||||
void setup_metrics();
|
||||
future<> dispatch_create_view(sstring ks_name, sstring view_name);
|
||||
future<> dispatch_drop_view(sstring ks_name, sstring view_name);
|
||||
future<> handle_seed_view_build_progress(sstring ks_name, sstring view_name);
|
||||
future<> handle_create_view_local(sstring ks_name, sstring view_name);
|
||||
future<> handle_drop_view_local(sstring ks_name, sstring view_name);
|
||||
future<> handle_create_view_local_impl(sstring ks_name, sstring view_name);
|
||||
future<> handle_drop_view_local_impl(sstring ks_name, sstring view_name);
|
||||
future<> handle_drop_view_global_cleanup(sstring ks_name, sstring view_name);
|
||||
|
||||
template <typename Func1, typename Func2>
|
||||
future<> write_view_build_status(Func1&& fn_group0, Func2&& fn_sys_dist) {
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include <seastar/core/gate.hh>
|
||||
#include "seastar/core/gate.hh"
|
||||
#include "db/view/view_building_state.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
@@ -102,13 +102,13 @@ view_update_generator::view_update_generator(replica::database& db, sharded<serv
|
||||
, _early_abort_subscription(as.subscribe([this] () noexcept { do_abort(); }))
|
||||
{
|
||||
setup_metrics();
|
||||
discover_staging_sstables();
|
||||
_db.plug_view_update_generator(*this);
|
||||
}
|
||||
|
||||
view_update_generator::~view_update_generator() {}
|
||||
|
||||
future<> view_update_generator::start() {
|
||||
discover_staging_sstables();
|
||||
_started = seastar::async([this]() mutable {
|
||||
auto drop_sstable_references = defer([&] () noexcept {
|
||||
// Clear sstable references so sstables_manager::stop() doesn't hang.
|
||||
|
||||
1
dist/common/kernel_conf/scylla_tune_sched
vendored
1
dist/common/kernel_conf/scylla_tune_sched
vendored
@@ -7,6 +7,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import os
|
||||
import sys
|
||||
import errno
|
||||
import logging
|
||||
|
||||
|
||||
2
dist/common/scripts/scylla-blocktune
vendored
2
dist/common/scripts/scylla-blocktune
vendored
@@ -8,7 +8,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import argparse
|
||||
from scylla_blocktune import tune_yaml, tune_fs, tune_dev
|
||||
from scylla_blocktune import *
|
||||
|
||||
if __name__ == "__main__":
|
||||
ap = argparse.ArgumentParser('Tune filesystems for ScyllaDB')
|
||||
|
||||
4
dist/common/scripts/scylla_coredump_setup
vendored
4
dist/common/scripts/scylla_coredump_setup
vendored
@@ -14,9 +14,7 @@ import subprocess
|
||||
import time
|
||||
import tempfile
|
||||
import shutil
|
||||
import re
|
||||
import distro
|
||||
from scylla_util import out, is_debian_variant, is_suse_variant, pkg_install, is_redhat_variant, systemd_unit, is_gentoo
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
|
||||
|
||||
|
||||
5
dist/common/scripts/scylla_cpuscaling_setup
vendored
5
dist/common/scripts/scylla_cpuscaling_setup
vendored
@@ -10,8 +10,9 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import shutil
|
||||
from scylla_util import is_debian_variant, pkg_install, systemd_unit, sysconfig_parser, is_gentoo, is_arch, is_amzn2, is_suse_variant, is_redhat_variant
|
||||
import shlex
|
||||
import distro
|
||||
from scylla_util import *
|
||||
|
||||
UNIT_DATA= '''
|
||||
[Unit]
|
||||
|
||||
2
dist/common/scripts/scylla_cpuset_setup
vendored
2
dist/common/scripts/scylla_cpuset_setup
vendored
@@ -10,7 +10,7 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from scylla_util import is_container, sysconfig_parser
|
||||
from scylla_util import *
|
||||
|
||||
if __name__ == '__main__':
|
||||
if not is_container() and os.getuid() > 0:
|
||||
|
||||
2
dist/common/scripts/scylla_dev_mode_setup
vendored
2
dist/common/scripts/scylla_dev_mode_setup
vendored
@@ -10,7 +10,7 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from scylla_util import is_nonroot, is_container, etcdir, sysconfig_parser
|
||||
from scylla_util import *
|
||||
|
||||
if __name__ == '__main__':
|
||||
if not is_nonroot() and not is_container() and os.getuid() > 0:
|
||||
|
||||
2
dist/common/scripts/scylla_fstrim
vendored
2
dist/common/scripts/scylla_fstrim
vendored
@@ -8,6 +8,8 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import os
|
||||
import sys
|
||||
import yaml
|
||||
import argparse
|
||||
import subprocess
|
||||
|
||||
|
||||
5
dist/common/scripts/scylla_fstrim_setup
vendored
5
dist/common/scripts/scylla_fstrim_setup
vendored
@@ -8,8 +8,9 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import os
|
||||
import sys
|
||||
from scylla_util import systemd_unit
|
||||
import subprocess
|
||||
import shutil
|
||||
from scylla_util import *
|
||||
|
||||
if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
|
||||
2
dist/common/scripts/scylla_io_setup
vendored
2
dist/common/scripts/scylla_io_setup
vendored
@@ -9,7 +9,7 @@
|
||||
|
||||
import os
|
||||
import re
|
||||
from scylla_util import etcdir, datadir, bindir, scriptsdir, is_nonroot, is_container, is_developer_mode
|
||||
from scylla_util import *
|
||||
import resource
|
||||
import subprocess
|
||||
import argparse
|
||||
|
||||
2
dist/common/scripts/scylla_kernel_check
vendored
2
dist/common/scripts/scylla_kernel_check
vendored
@@ -10,7 +10,7 @@
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
from scylla_util import pkg_install
|
||||
from scylla_util import *
|
||||
from subprocess import run, DEVNULL
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
3
dist/common/scripts/scylla_logrotate
vendored
3
dist/common/scripts/scylla_logrotate
vendored
@@ -7,8 +7,9 @@
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from scylla_util import scylladir_p
|
||||
from scylla_util import *
|
||||
|
||||
if __name__ == '__main__':
|
||||
log = scylladir_p() / 'scylla-server.log'
|
||||
|
||||
2
dist/common/scripts/scylla_memory_setup
vendored
2
dist/common/scripts/scylla_memory_setup
vendored
@@ -10,7 +10,7 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from scylla_util import etcdir_p
|
||||
from scylla_util import *
|
||||
|
||||
if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
|
||||
3
dist/common/scripts/scylla_ntp_setup
vendored
3
dist/common/scripts/scylla_ntp_setup
vendored
@@ -11,9 +11,10 @@ import os
|
||||
import sys
|
||||
import argparse
|
||||
import re
|
||||
import distro
|
||||
import shutil
|
||||
|
||||
from scylla_util import systemd_unit, is_redhat_variant, pkg_install
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
4
dist/common/scripts/scylla_prepare
vendored
4
dist/common/scripts/scylla_prepare
vendored
@@ -12,10 +12,8 @@ import sys
|
||||
import glob
|
||||
import platform
|
||||
import distro
|
||||
import re
|
||||
import traceback
|
||||
|
||||
from scylla_util import sysconfig_parser, out, get_set_nic_and_disks_config_value, check_sysfs_numa_topology_is_valid, sysconfdir_p, scylla_excepthook, perftune_base_command
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
|
||||
def get_cur_cpuset():
|
||||
|
||||
19
dist/common/scripts/scylla_raid_setup
vendored
19
dist/common/scripts/scylla_raid_setup
vendored
@@ -9,6 +9,7 @@
|
||||
|
||||
import os
|
||||
import argparse
|
||||
import distutils.util
|
||||
import pwd
|
||||
import grp
|
||||
import sys
|
||||
@@ -17,22 +18,12 @@ import logging
|
||||
import pyudev
|
||||
import psutil
|
||||
import platform
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from scylla_util import is_unused_disk, out, pkg_install, systemd_unit, SystemdException, is_debian_variant, is_redhat_variant, is_offline
|
||||
from subprocess import run, SubprocessError, Popen
|
||||
from scylla_util import *
|
||||
from subprocess import run, SubprocessError
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
def strtobool(val):
|
||||
val = val.lower()
|
||||
if val in ('y', 'yes', 't', 'true', 'on', '1'):
|
||||
return 1
|
||||
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
|
||||
return 0
|
||||
else:
|
||||
raise ValueError(f"invalid truth value {val!r}")
|
||||
|
||||
class UdevInfo:
|
||||
def __init__(self, device_file):
|
||||
self.context = pyudev.Context()
|
||||
@@ -154,7 +145,7 @@ if __name__ == '__main__':
|
||||
args = parser.parse_args()
|
||||
|
||||
# Allow args.online_discard to be used as a boolean value
|
||||
args.online_discard = strtobool(args.online_discard)
|
||||
args.online_discard = distutils.util.strtobool(args.online_discard)
|
||||
|
||||
root = args.root.rstrip('/')
|
||||
if args.volume_role == 'all':
|
||||
@@ -236,7 +227,7 @@ if __name__ == '__main__':
|
||||
with open(discard_path) as f:
|
||||
discard = f.read().strip()
|
||||
if discard != '0':
|
||||
proc = Popen(['blkdiscard', disk])
|
||||
proc = subprocess.Popen(['blkdiscard', disk])
|
||||
procs.append(proc)
|
||||
for proc in procs:
|
||||
proc.wait()
|
||||
|
||||
3
dist/common/scripts/scylla_rsyslog_setup
vendored
3
dist/common/scripts/scylla_rsyslog_setup
vendored
@@ -8,9 +8,8 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from scylla_util import systemd_unit
|
||||
from scylla_util import *
|
||||
|
||||
def update_rsysconf(rsyslog_server):
|
||||
if ':' not in rsyslog_server:
|
||||
|
||||
3
dist/common/scripts/scylla_selinux_setup
vendored
3
dist/common/scripts/scylla_selinux_setup
vendored
@@ -9,7 +9,8 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
from scylla_util import is_redhat_variant, out, sysconfig_parser
|
||||
import subprocess
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
10
dist/common/scripts/scylla_setup
vendored
10
dist/common/scripts/scylla_setup
vendored
@@ -12,14 +12,12 @@ import sys
|
||||
import argparse
|
||||
import glob
|
||||
import shutil
|
||||
import io
|
||||
import stat
|
||||
import re
|
||||
from scylla_util import colorprint, is_valid_nic, scylladir_p, is_offline, is_debian_variant, is_redhat_variant, is_suse_variant, is_gentoo, out, is_unused_disk, scriptsdir, is_nonroot, sysconfdir_p, sysconfig_parser, systemd_unit, swap_exists, get_product, etcdir
|
||||
import distro
|
||||
from scylla_util import *
|
||||
from subprocess import run, DEVNULL
|
||||
|
||||
PRODUCT = get_product(etcdir())
|
||||
|
||||
|
||||
interactive = False
|
||||
HOUSEKEEPING_TIMEOUT = 60
|
||||
def when_interactive_ask_service(interactive, msg1, msg2, default = None):
|
||||
@@ -296,9 +294,11 @@ if __name__ == '__main__':
|
||||
sys.exit(1)
|
||||
|
||||
disks = args.disks
|
||||
nic = args.nic
|
||||
set_nic_and_disks = args.setup_nic_and_disks
|
||||
swap_directory = args.swap_directory
|
||||
swap_size = args.swap_size
|
||||
ec2_check = not args.no_ec2_check
|
||||
kernel_check = not args.no_kernel_check
|
||||
verify_package = not args.no_verify_package
|
||||
enable_service = not args.no_enable_service
|
||||
|
||||
2
dist/common/scripts/scylla_stop
vendored
2
dist/common/scripts/scylla_stop
vendored
@@ -9,7 +9,7 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
from scylla_util import sysconfig_parser, sysconfdir_p
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
2
dist/common/scripts/scylla_swap_setup
vendored
2
dist/common/scripts/scylla_swap_setup
vendored
@@ -12,7 +12,7 @@ import sys
|
||||
import argparse
|
||||
import psutil
|
||||
from pathlib import Path
|
||||
from scylla_util import swap_exists, out, systemd_unit
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
|
||||
def GB(n):
|
||||
|
||||
3
dist/common/scripts/scylla_sysconfig_setup
vendored
3
dist/common/scripts/scylla_sysconfig_setup
vendored
@@ -10,8 +10,9 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import subprocess
|
||||
import re
|
||||
from scylla_util import sysconfig_parser, sysconfdir_p, get_set_nic_and_disks_config_value, is_valid_nic, check_sysfs_numa_topology_is_valid, out, perftune_base_command, hex2list
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
|
||||
def bool2str(val):
|
||||
|
||||
15
dist/common/scripts/scylla_util.py
vendored
15
dist/common/scripts/scylla_util.py
vendored
@@ -3,9 +3,12 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import configparser
|
||||
import glob
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
import yaml
|
||||
import sys
|
||||
@@ -16,6 +19,7 @@ from datetime import datetime, timedelta
|
||||
|
||||
import distro
|
||||
from scylla_sysconfdir import SYSCONFDIR
|
||||
from scylla_product import PRODUCT
|
||||
|
||||
from multiprocessing import cpu_count
|
||||
|
||||
@@ -334,7 +338,7 @@ def apt_install(pkg, offline_exit=True):
|
||||
if apt_is_updated():
|
||||
break
|
||||
try:
|
||||
run('apt-get update', shell=True, check=True, stderr=PIPE, encoding='utf-8')
|
||||
res = run('apt-get update', shell=True, check=True, stderr=PIPE, encoding='utf-8')
|
||||
break
|
||||
except CalledProcessError as e:
|
||||
print(e.stderr, end='')
|
||||
@@ -515,12 +519,3 @@ class sysconfig_parser:
|
||||
def commit(self):
|
||||
with open(self._filename, 'w') as f:
|
||||
f.write(self._data)
|
||||
|
||||
def get_product(dir):
|
||||
if dir is None:
|
||||
dir = etcdir()
|
||||
try:
|
||||
with open(os.path.join(dir, 'SCYLLA-PRODUCT-FILE')) as f:
|
||||
return f.read().strip()
|
||||
except FileNotFoundError:
|
||||
return 'scylla'
|
||||
|
||||
1
dist/docker/docker-entrypoint.py
vendored
1
dist/docker/docker-entrypoint.py
vendored
@@ -1,6 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import subprocess
|
||||
|
||||
@@ -54,14 +54,6 @@ these default locations can overridden by specifying
|
||||
`--alternator-encryption-options keyfile="..."` and
|
||||
`--alternator-encryption-options certificate="..."`.
|
||||
|
||||
In addition to **alternator_port** and **alternator_https_port**, the two
|
||||
options **alternator_port_proxy_protocol** (for HTTP) and
|
||||
**alternator_https_port_proxy_protocol** (for HTTPS) allow running Alternator
|
||||
behind a reverse proxy, such as HAProxy or AWS PrivateLink, and still report
|
||||
the correct client address. The reverse proxy must be configured to use
|
||||
[Proxy Protocol v2](https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt)
|
||||
(the binary header format).
|
||||
|
||||
By default, ScyllaDB saves a snapshot of deleted tables. But Alternator does
|
||||
not offer an API to restore these snapshots, so these snapshots are not useful
|
||||
and waste disk space - deleting a table does not recover any disk space.
|
||||
@@ -154,5 +146,4 @@ with Tablets enabled.
|
||||
getting-started
|
||||
compatibility
|
||||
new-apis
|
||||
network
|
||||
```
|
||||
|
||||
@@ -1,159 +0,0 @@
|
||||
# Reducing network costs in Alternator
|
||||
|
||||
In some deployments, the network between the application and the ScyllaDB
|
||||
cluster is limited in bandwidth, or network traffic is expensive.
|
||||
This document surveys some mechanisms that can be used to reduce this network
|
||||
traffic.
|
||||
|
||||
## Compression
|
||||
|
||||
An easy way to reduce network traffic between the application and the
|
||||
ScyllaDB cluster is to compress the requests and responses. The resulting
|
||||
saving can be substantial if items are long and compress well - e.g., long
|
||||
strings. But additionally, Alternator's protocol (the DynamoDB API) is
|
||||
JSON-based - textual and verbose - and compresses well.
|
||||
|
||||
Note that enabling compression also has a downside - it requires more
|
||||
computation by both client and server. So the choice of whether to enable
|
||||
compression depends on the relative cost of network traffic and CPU.
|
||||
|
||||
### Compression of requests
|
||||
|
||||
When the application sends a large request - notably a `PutItem` or
|
||||
`BatchWriteItem` - we can reduce network usage by compressing the content
|
||||
of this request.
|
||||
|
||||
Alternator's request protocol - the DynamoDB API - is based on HTTP or HTTPS.
|
||||
The standard HTTP 1.1 mechanism for compressing a request is to compress
|
||||
the request's body using some compression algorithm (e.g., gzip) and
|
||||
send a header `Content-Encoding: gzip` stating which compression algorithm
|
||||
was used. Alternator currently supports two compression algorithms, `gzip`
|
||||
and `deflate`, both standardized in ([RFC 9110](https://www.rfc-editor.org/rfc/rfc9110.html)).
|
||||
Other standard compression types which are listed in
|
||||
[IANA's HTTP Content Coding Registry](https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding),
|
||||
including `zstd` ([RFC 8878][https://www.rfc-editor.org/rfc/rfc8878.html]),
|
||||
are not yet supported by Alternator.
|
||||
|
||||
Note that HTTP's compression only compresses the request's _body_ - not the
|
||||
request's _headers_ - so it is beneficial to avoid sending unnecessary headers
|
||||
in the request, as they will not be compressed. See more on this below.
|
||||
|
||||
To use compressed requests, the client library (SDK) used by the application
|
||||
should be configured to actually compress requests. Amazon's AWS SDKs support
|
||||
this feature in some languages, but not in others, and may automatically
|
||||
compress only requests that are longer than a certain size. Check their website
|
||||
<https://docs.aws.amazon.com/sdkref/latest/guide/feature-compression.html>
|
||||
for more details for the specific SDK you are using. ScyllaDB also publishes
|
||||
extensions for these SDKs which may have better support for compressed
|
||||
requests (and other features mentioned in this document), so again please
|
||||
consult the documentation of the specific SDK that you are using.
|
||||
|
||||
### Compression of responses
|
||||
|
||||
Some types of requests, notably `GetItem` and `BatchGetItem`, can have small
|
||||
requests but large responses, so it can be beneficial to compress these
|
||||
responses. The standard HTTP mechanism for doing this is that the client
|
||||
provides a header like `Accept-Encoding: gzip` in the request, signalling
|
||||
that it is ready to accept a gzip'ed response. Alternator may then compress
|
||||
the response, or not, depending on its configuration and the response's
|
||||
size. If Alternator does compress the response body, it sets a header
|
||||
`Content-Encoding: gzip` in the response.
|
||||
|
||||
Currently, Alternator supports response compression with either `gzip`
|
||||
or `deflate` encodings. If the client requests response compression
|
||||
(via the `Accept-Encoding` header), then by default Alternator compresses
|
||||
responses over 4 KB in length (leaving smaller responses uncompressed),
|
||||
and uses compression level 6 (where 1 is the fastest, 9 is best compression).
|
||||
These defaults can be changed with the configuration options
|
||||
`alternator_response_compression_threshold_in_bytes` and
|
||||
`alternator_response_gzip_compression_level`, respectively.
|
||||
|
||||
To use compressed responses, the client library (SDK) used by application
|
||||
should be configured to send an `Accept-Encoding: gzip` header and to
|
||||
understand the potentially-compressed response. Although DynamoDB does
|
||||
support compressing responses, it is not clear if any of Amazon's AWS SDKs can
|
||||
use this feature. ScyllaDB publishes extensions for these SDKs which may
|
||||
have better support for compressed responses (and other features mentioned
|
||||
in this document), so please consult the documentation of the specific SDK that
|
||||
you are using to check if it can make use of the response compression
|
||||
feature that Alternator supports.
|
||||
|
||||
## Fewer and shorter headers
|
||||
|
||||
As we saw above, the HTTP 1.1 protocol allows compressing the _bodies_ of
|
||||
requests and responses. But HTTP 1.1 does not have a mechanism to compress
|
||||
_headers_. So both client (SDK) and server (Alternator) should avoid sending
|
||||
headers that are unnecessary or unnecessarily long.
|
||||
|
||||
The Alternator server sends headers like the following in its responses:
|
||||
```
|
||||
Content-Length: 2
|
||||
Content-Type: application/x-amz-json-1.0
|
||||
Date: Tue, 30 Dec 2025 20:00:01 GMT
|
||||
Server: Seastar httpd
|
||||
```
|
||||
|
||||
This is a bit over 100 bytes. Most of it is necessary, but the `Date`
|
||||
and `Server` headers are not strictly necessary and a future version of
|
||||
Alternator will most likely make them optional (or remove them altogether).
|
||||
|
||||
The request headers add significantly larger overhead, and AWS SDKs add
|
||||
even more than necessary. Here is an example:
|
||||
```
|
||||
Content-Length: 300
|
||||
amz-sdk-request: attempt=1
|
||||
amz-sdk-invocation-id: caf3eb0e-8138-4cbd-adff-44ac357de04e
|
||||
X-Amz-Date: 20251230T200829Z
|
||||
host: 127.15.196.80:8000
|
||||
User-Agent: Boto3/1.42.12 md/Botocore#1.42.12 md/awscrt#0.27.2 ua/2.1 os/linux#6.17.12-300.fc43.x86_64 md/arch#x86_64 lang/python#3.14.2 md/pyimpl#CPython m/Z,N,e,D,P,b cfg/retry-mode#legacy Botocore/1.42.12 Resource
|
||||
Content-Type: application/x-amz-json-1.0
|
||||
Authorization: AWS4-HMAC-SHA256 Credential=cassandra/20251230/us-east-1/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-target, Signature=34100a8cd044ef131c1ae025c91c6fc3468507c28449615cdb2edb4d82298be0"
|
||||
X-Amz-Target: DynamoDB_20120810.CreateTable
|
||||
Accept-Encoding: identity
|
||||
```
|
||||
|
||||
There is a lot of waste here: Some headers like `amz-sdk-invocation-id` are
|
||||
not needed at all. Others like `User-Agent` are useful for debugging but the
|
||||
200-byte string sent by boto3 (AWS's Python SDK) on every request is clearly
|
||||
excessive. In AWS's SDKs the user cannot make the request headers shorter,
|
||||
but we plan to provide such a feature in ScyllaDB's extensions to AWS's SDKs.
|
||||
|
||||
Note that the request signing protocol used by Alternator and DynamoDB,
|
||||
known as AWS Signature Version 4, needs the headers `x-amz-date` and
|
||||
`Authorization` - which together use (as can be seen in the above example)
|
||||
more than 230 bytes in each and every request. We could save these 230 bytes
|
||||
by using SSL - instead of AWS Signature Version 4 - for authentication.
|
||||
This idea is not yet implemented however - it is not supported by Alternator,
|
||||
DynamoDB, or any of the client SDKs.
|
||||
|
||||
Some middleware or gateways modify HTTP traffic going through them and add
|
||||
even more headers for their own use. If you suspect this might be happening,
|
||||
inspect the HTTP traffic of your workload with a packet analyzer to see if
|
||||
any unnecessary HTTP headers appear in requests arriving to Alternator, or
|
||||
in responses arriving back at the application.
|
||||
|
||||
## Rack-aware request routing
|
||||
|
||||
In some deployments, the network between the client and the ScyllaDB server
|
||||
is only expensive if the communication crosses **rack** boundaries.
|
||||
A "rack" is a ScyllaDB term roughly equivalent to AWS's "availability zone".
|
||||
|
||||
Typically, a ScyllaDB cluster is divided to three racks and each item of
|
||||
data is replicated once on each rack. Each instance of the client application
|
||||
also lives in one of these racks. In many deployments, communication inside
|
||||
the rack is free but communicating to a different rack costs extra. It is
|
||||
therefore important that the client be aware of which rack it is running
|
||||
on, and send the request to a ScyllaDB node on the same rack. It should not
|
||||
choose one of the three replicas as random. This is known as "rack-aware
|
||||
request routing", and should be enabled in the ScyllaDB extension of the
|
||||
AWS SDK.
|
||||
|
||||
## Networking inside the ScyllaDB cluster
|
||||
|
||||
The previous sections were all about reducing the amount of network traffic
|
||||
between the client application and the ScyllaDB cluster. If the network
|
||||
between different ScyllaDB nodes is also metered - especially between nodes
|
||||
in different racks - then this intra-cluster networking should also be reduced.
|
||||
The best way to do that is to enable compression of internode communication.
|
||||
Refer to [Advanced Internode (RPC) Compression](../operating-scylla/procedures/config-change/advanced-internode-compression.html)
|
||||
for instructions on how to enable it.
|
||||
@@ -8,22 +8,28 @@ SSTable Version Support
|
||||
------------------------
|
||||
|
||||
.. list-table::
|
||||
:widths: 50 50
|
||||
:widths: 33 33 33
|
||||
:header-rows: 1
|
||||
|
||||
* - SSTable Version
|
||||
- ScyllaDB Version
|
||||
- ScyllaDB Enterprise Version
|
||||
- ScyllaDB Open Source Version
|
||||
* - 3.x ('ms')
|
||||
- 2025.4 and above
|
||||
- None
|
||||
* - 3.x ('me')
|
||||
- 2022.2 and above
|
||||
- 5.1 and above
|
||||
* - 3.x ('md')
|
||||
- 2021.1
|
||||
|
||||
* The supported formats are ``me`` and ``ms``.
|
||||
* The ``md`` format is used only when upgrading from an existing cluster using
|
||||
``md``. The ``sstable_format`` parameter is ignored if it is set to ``md``.
|
||||
* Note: The ``sstable_format`` parameter specifies the SSTable format used for
|
||||
**writes**. The legacy SSTable formats (``ka``, ``la``, ``mc``) remain
|
||||
supported for reads, which is essential for restoring clusters from existing
|
||||
backups.
|
||||
- 4.3, 4.4, 4.5, 4.6, 5.0
|
||||
* - 3.0 ('mc')
|
||||
- 2019.1, 2020.1
|
||||
- 3.x, 4.1, 4.2
|
||||
* - 2.2 ('la')
|
||||
- N/A
|
||||
- 2.3
|
||||
* - 2.1.8 ('ka')
|
||||
- 2018.1
|
||||
- 2.2
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ ScyllaDB SSTable Format
|
||||
|
||||
.. include:: _common/sstable_what_is.rst
|
||||
|
||||
In ScyllaDB 6.0 and above, *me* format is enabled by default.
|
||||
|
||||
For more information on each of the SSTable formats, see below:
|
||||
|
||||
* :doc:`SSTable 2.x <sstable2/index>`
|
||||
|
||||
@@ -12,6 +12,8 @@ ScyllaDB SSTable - 3.x
|
||||
|
||||
.. include:: ../_common/sstable_what_is.rst
|
||||
|
||||
In ScyllaDB 6.0 and above, the ``me`` format is mandatory, and ``md`` format is used only when upgrading from an existing cluster using ``md``. The ``sstable_format`` parameter is ignored if it is set to ``md``.
|
||||
|
||||
Additional Information
|
||||
-------------------------
|
||||
|
||||
|
||||
@@ -42,10 +42,9 @@ the administrator. The tablet load balancer decides where to migrate
|
||||
the tablets, either within the same node to balance the shards or across
|
||||
the nodes to balance the global load in the cluster.
|
||||
|
||||
During this process, the balancer will compute disk usage based on the
|
||||
actual disk sizes of the tablets located on the nodes. It will then issue
|
||||
tablet migrations which migrate tablets from nodes with higher to nodes
|
||||
with lower disk utilization, equalizing the load.
|
||||
The number of tablets the load balancer maintains on a node is directly
|
||||
proportional to the node's storage capacity. A node with twice
|
||||
the storage will have twice the number of tablets located on it.
|
||||
|
||||
As a table grows, each tablet can split into two, creating a new tablet.
|
||||
The load balancer can migrate the split halves independently to different nodes
|
||||
@@ -194,18 +193,12 @@ Limitations and Unsupported Features
|
||||
|
||||
.. warning::
|
||||
|
||||
If a keyspace has tablets enabled and contains a materialized view or a secondary index, it must
|
||||
remain :term:`RF-rack-valid <RF-rack-valid keyspace>` throughout its lifetime. Failing to keep that
|
||||
invariant satisfied may result in data inconsistencies, performance problems, or other issues.
|
||||
If a keyspace has tablets enabled, it must remain :term:`RF-rack-valid <RF-rack-valid keyspace>`
|
||||
throughout its lifetime. Failing to keep that invariant satisfied may result in data inconsistencies,
|
||||
performance problems, or other issues.
|
||||
|
||||
The invariant is enforced while the keyspace contains a materialized view or a secondary index, or
|
||||
if the `rf_rack_valid_keyspaces` option is set, by rejecting operations that would violate the RF-rack-valid property:
|
||||
Altering a keyspace's replication factor, adding a node in a new rack, or removing the last node
|
||||
in a rack, will be rejected if they would result in an RF-rack-invalid keyspace.
|
||||
|
||||
To enable materialized views and secondary indexes for tablet keyspaces, the keyspace
|
||||
must be :term:`RF-rack-valid <RF-rack-valid keyspace>`.
|
||||
See :ref:`Views with tablets <admin-views-with-tablets>` for details.
|
||||
To enable materialized views and secondary indexes for tablet keyspaces, use
|
||||
the `--rf-rack-valid-keyspaces` See :ref:`Views with tablets <admin-views-with-tablets>` for details.
|
||||
|
||||
Resharding in keyspaces with tablets enabled has the following limitations:
|
||||
|
||||
|
||||
@@ -289,7 +289,7 @@ with tablets enabled. Keyspaces using tablets must also remain :term:`RF-rack-va
|
||||
throughout their lifetime. See :ref:`Limitations and Unsupported Features <tablets-limitations>`
|
||||
for details.
|
||||
|
||||
**The** ``initial`` **sub-option (deprecated)**
|
||||
**The ``initial`` sub-option (deprecated)**
|
||||
|
||||
A good rule of thumb to calculate initial tablets is to divide the expected total storage used
|
||||
by tables in this keyspace by (``replication_factor`` * 5GB). For example, if you expect a 30TB
|
||||
@@ -422,7 +422,7 @@ An empty list is allowed, and it's equivalent to numeric replication factor of 0
|
||||
|
||||
Altering from a rack list to a numeric replication factor is not supported.
|
||||
|
||||
Keyspaces which use rack lists are :term:`RF-rack-valid <RF-rack-valid keyspace>` if each rack in the rack list contains at least one node (excluding :doc:`zero-token nodes </architecture/zero-token-nodes>`).
|
||||
Keyspaces which use rack lists are :term:`RF-rack-valid <RF-rack-valid keyspace>`.
|
||||
|
||||
.. _drop-keyspace-statement:
|
||||
|
||||
|
||||
@@ -272,17 +272,9 @@ For example::
|
||||
This query returns up to 5 rows with the closest distance of ``embedding`` vector to the provided query vector,
|
||||
in this case ``[0.1, 0.2, 0.3, 0.4]``.
|
||||
|
||||
There's also possibility to return the similarity score along with the results by using the :ref:`similarity functions <vector-similarity-functions>`.
|
||||
|
||||
For example::
|
||||
|
||||
SELECT image_id, similarity_cosine(embedding, [0.1, 0.2, 0.3, 0.4])
|
||||
FROM ImageEmbeddings
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
.. warning::
|
||||
|
||||
Currently, vector queries do not support filtering with ``WHERE`` clause,
|
||||
Currently, vector queries do not support filtering with ``WHERE`` clause, returning similarity distances,
|
||||
grouping with ``GROUP BY`` and paging. This will be added in the future releases.
|
||||
|
||||
|
||||
|
||||
@@ -227,39 +227,6 @@ A number of functions are provided to “convert” the native types into binary
|
||||
takes a 64-bit ``blob`` argument and converts it to a ``bigint`` value. For example, ``bigintAsBlob(3)`` is
|
||||
``0x0000000000000003`` and ``blobAsBigint(0x0000000000000003)`` is ``3``.
|
||||
|
||||
|
||||
.. _vector-similarity-functions:
|
||||
|
||||
Vector similarity functions
|
||||
```````````````````````````
|
||||
|
||||
To obtain the similarity of the given vectors, use a ``SELECT`` query::
|
||||
|
||||
SELECT comment, similarity_cosine(comment_vector, [0.1, 0.15, 0.3, 0.12, 0.05])
|
||||
FROM cycling.comments_vs;
|
||||
|
||||
The supported functions for this type of query are:
|
||||
|
||||
- ``similarity_cosine``
|
||||
- ``similarity_euclidean``
|
||||
- ``similarity_dot_product``
|
||||
|
||||
with the parameters of (``<vector>``, ``<vector>``).
|
||||
|
||||
The ``vector`` is either the name of the float vector column or :ref:`vector of floats <vectors>`.
|
||||
Both arguments must be of the same dimension.
|
||||
|
||||
These functions return a ``float`` value representing the similarity between the given vectors for each row.
|
||||
The similarity value is a floating-point number in a range of [0, 1] that describes how similar two vectors are.
|
||||
Values closer to 1 indicate greater similarity.
|
||||
The ``similarity_euclidean`` and ``similarity_dot_product`` functions do not perform vector normalization prior to computing similarity.
|
||||
|
||||
.. note::
|
||||
The ``similarity_dot_product`` function assumes that all input vectors are L2-normalized.
|
||||
Supplying non-normalized vectors will result in dot product values that do not represent cosine similarity and therefore are not meaningful for similarity comparison.
|
||||
If the input vectors are not normalized, consider using the ``similarity_cosine`` function instead.
|
||||
|
||||
|
||||
.. _udfs:
|
||||
|
||||
User-defined functions :label-caution:`Experimental`
|
||||
|
||||
@@ -42,55 +42,14 @@ Creating a secondary index on a table uses the ``CREATE INDEX`` statement:
|
||||
|
||||
create_index_statement: CREATE [ CUSTOM ] INDEX [ IF NOT EXISTS ] [ `index_name` ]
|
||||
: ON `table_name` '(' `index_identifier` ')'
|
||||
: [ USING `string` [ WITH `index_properties` ] ]
|
||||
: [ USING `string` [ WITH OPTIONS = `map_literal` ] ]
|
||||
index_identifier: `column_name`
|
||||
:| ( FULL ) '(' `column_name` ')'
|
||||
index_properties: index_property (AND index_property)*
|
||||
index_property: OPTIONS = `map_literal`
|
||||
:| view_property
|
||||
|
||||
where `view_property` is any :ref:`property <mv-options>` that can be used when creating
|
||||
a :doc:`materialized view </features/materialized-views>`. The only exception is `CLUSTERING ORDER BY`,
|
||||
which is not supported by secondary indexes.
|
||||
|
||||
If the statement is provided with a materialized view property, it will not be applied to the index itself.
|
||||
Instead, it will be applied to the underlying materialized view of it.
|
||||
|
||||
For instance::
|
||||
|
||||
CREATE INDEX userIndex ON NerdMovies (user);
|
||||
CREATE INDEX ON Mutants (abilityId);
|
||||
|
||||
-- Create a secondary index called `catsIndex` on the table `Animals`.
|
||||
-- The indexed column is `cats`. Both properties, `comment` and
|
||||
-- `synchronous_updates`, are view properties, so the underlying materialized
|
||||
-- view will be configured with: `comment = 'everyone likes cats'` and
|
||||
-- `synchronous_updates = true`.
|
||||
CREATE INDEX catsIndex ON Animals (cats) WITH comment = 'everyone likes cats' AND synchronous_updates = true;
|
||||
|
||||
-- Create a secondary index called `dogsIndex` on the same table, `Animals`.
|
||||
-- This time, the indexed column is `dogs`. The property `gc_grace_seconds` is
|
||||
-- a view property, so the underlying materialized view will be configured with
|
||||
-- `gc_grace_seconds = 13`.
|
||||
CREATE INDEX dogsIndex ON Animals (dogs) WITH gc_grace_seconds = 13;
|
||||
|
||||
-- The view property `CLUSTERING ORDER BY` is not supported by secondary indexes,
|
||||
-- so this statement will be rejected by Scylla.
|
||||
CREATE INDEX bearsIndex ON Animals (bears) WITH CLUSTERING ORDER BY (bears ASC);
|
||||
|
||||
View properties of a secondary index have the same limitations as those imposed by materialized views.
|
||||
For instance, a materialized view cannot be created specifying ``gc_grace_seconds = 0``, so creating
|
||||
a secondary index with the same property will not be possible either.
|
||||
|
||||
Example::
|
||||
|
||||
-- This statement will be rejected by Scylla because creating
|
||||
-- a materialized view with `gc_grace_seconds = 0` is not possible.
|
||||
CREATE INDEX names ON clients (name) WITH gc_grace_seconds = 0;
|
||||
|
||||
-- This statement will also be rejected by Scylla.
|
||||
-- It's not possible to use `COMPACT STORAGE` with a materialized view.
|
||||
CREATE INDEX names ON clients (name) WITH COMPACT STORAGE;
|
||||
CREATE INDEX userIndex ON NerdMovies (user);
|
||||
CREATE INDEX ON Mutants (abilityId);
|
||||
|
||||
The ``CREATE INDEX`` statement is used to create a new (automatic) secondary index for a given (existing) column in a
|
||||
given table. A name for the index itself can be specified before the ``ON`` keyword, if desired. If data already exists
|
||||
|
||||
@@ -30,7 +30,7 @@ SELECT * FROM system.role_attributes WHERE role='r' and attribute_name='service
|
||||
### Service Level Configuration Table
|
||||
|
||||
```
|
||||
CREATE TABLE system.service_levels_v2 (
|
||||
CREATE TABLE system_distributed.service_levels (
|
||||
service_level text PRIMARY KEY,
|
||||
timeout duration,
|
||||
workload_type text,
|
||||
@@ -45,19 +45,14 @@ The table column names meanings are:
|
||||
*shares* - a number that represents this service level priority in relation to other service levels.
|
||||
|
||||
```
|
||||
select * from system.service_levels_v2 ;
|
||||
select * from system_distributed.service_levels ;
|
||||
|
||||
service_level | shares | timeout | workload_type
|
||||
---------------+--------+---------+---------------
|
||||
sl | 100 | 500ms | interactive
|
||||
service_level | timeout | workload_type
|
||||
---------------+---------+---------------
|
||||
sl | 500ms | interactive
|
||||
|
||||
```
|
||||
|
||||
* Please note that `system.service_levels_v2` is used as service levels configuration table only with consistent topology and service levels on Raft,
|
||||
which is enabled by default but not mandatory yet.
|
||||
Otherwise, the old `system_distributed.service_levels` is used as configuration table.
|
||||
For more information see [service levels on Raft](#service-levels-on-raft) section. *
|
||||
|
||||
### Service Level Timeout
|
||||
|
||||
Service level timeout can be used to assign a default timeout value for all operations for a particular service level.
|
||||
@@ -195,20 +190,6 @@ The command displays a table with: option name, effective service level the valu
|
||||
timeout | sl1 | 2s
|
||||
```
|
||||
|
||||
## Service levels on Raft
|
||||
|
||||
Since ScyllaDB 6.0 and ScyllaDB Enterprise 2024.2, service levels metadata is managed by Raft group0 and stored in `system.service_levels_v2`.
|
||||
|
||||
In existing clusters, service levels metadata is automatically copied from `system_distributed.service_levels` to `system.service_levels_v2`
|
||||
during the topology upgrade (see [upgrade to raft topology section](topology-over-raft.md#upgrade-from-legacy-topology-to-raft-based-topology)). Because of this, no service levels operations should be done during the topology upgrade.
|
||||
|
||||
Before the service levels on Raft, service levels cache was updated in 10s intervals by polling `system_distributed.service_levels` table.
|
||||
Once the service levels are migrated to raft, the cache is updated on every group0 state apply fiber, if there are any mutations to
|
||||
`system.service_levels_v2`, `system.role_attributes` or ` system.role_members`.
|
||||
|
||||
With service levels on Raft, the cache also has an additional layer, called `service level effective cache`. This layer combines service levels
|
||||
and auth information and stores effective service level for each role (see [effective service level section](#effective-service-level)).
|
||||
|
||||
## Implementation
|
||||
### Integration with auth
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user