Compare commits

..

7 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
ea26e4b3a5 Use tablet_task_info fields for tablet task timing
For tablet_virtual_task, set creation_time to tablet_task_info::request_time
and start_time to tablet_task_info::sched_time, matching the actual semantics
of when the request was created vs when it was scheduled for execution.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 15:05:50 +00:00
copilot-swe-agent[bot]
3b793ef09f Merge topology_request_tracking_mutation_builder calls for keyspace_rf_change
Combined the two separate topology_request_tracking_mutation_builder calls
into one, setting both start_time and done status in the same mutation builder
to reduce redundancy.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:53:23 +00:00
copilot-swe-agent[bot]
df0a59ba03 Fix start_time setting to be together with operation start
Refactored start_time setting for global requests to be included in the same
mutation batch that starts the actual operation, matching the pattern used for
node operations. This avoids an extra update_topology_state call and ensures
start_time is set atomically with the operation start.

Also updated nodetool tasks documentation to include creation_time field in
example outputs for status, list, and tree commands.

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:40:01 +00:00
copilot-swe-agent[bot]
69024a09b2 Add creation_time field to nodetool tasks subcommands
Extended scylla-nodetool.cc to display creation_time in all task-related outputs:
- tasks_print_status: Added creation_time to time field formatting
- tasks_print_trees: Added creation_time column to task tree display
- tasks_print_stats_list: Added creation_time column to task stats list

Co-authored-by: Deexie <56607372+Deexie@users.noreply.github.com>
2025-12-08 14:28:01 +00:00
copilot-swe-agent[bot]
bb8f28a1ab Fix start_time setting: remove from request creation, add to execution start
Remove incorrect start_time setting from request creation sites for:
- cleanup requests
- new_cdc_generation requests
- truncate_table requests
- keyspace_rf_change requests

Add start_time setting in topology_coordinator::handle_global_request
when execution begins, matching the pattern for node operations.

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 14:09:22 +00:00
copilot-swe-agent[bot]
32bc7e3a1c Add creation_time field to task status and stats API
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-08 13:26:09 +00:00
copilot-swe-agent[bot]
fb4e37248d Initial plan 2025-12-08 13:03:16 +00:00
440 changed files with 3862 additions and 15937 deletions

8
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall
auth/* @nuivall @ptrsmrn
# CACHE
row_cache* @tgrabiec
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall
cql3/* @tgrabiec @nuivall @ptrsmrn
# COUNTERS
counters* @nuivall
tests/counter_test* @nuivall
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
# DOCS
docs/* @annastuchlik @tzach

View File

@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
if is_draft:
labels_to_add.append("conflicts")
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
pr_comment += "Please resolve them and mark this PR as ready for review"
backport_pr.create_issue_comment(pr_comment)
# Apply all labels at once if we have any

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -0,0 +1,12 @@
name: Call Jira Status In Progress
on:
pull_request_target:
types: [opened]
jobs:
call-jira-status-in-progress:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_progress.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status In Review
on:
pull_request_target:
types: [ready_for_review, review_requested]
jobs:
call-jira-status-in-review:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -0,0 +1,12 @@
name: Call Jira Status Ready For Merge
on:
pull_request_target:
types: [labeled]
jobs:
call-jira-status-update:
uses: scylladb/github-automation/.github/workflows/main_update_jira_status_to_ready_for_merge.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,41 +0,0 @@
name: Sync Jira Based on PR Events
on:
pull_request_target:
types: [opened, ready_for_review, review_requested, labeled, unlabeled, closed]
permissions:
contents: read
pull-requests: write
issues: write
jobs:
jira-sync-pr-opened:
if: github.event.action == 'opened'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_opened.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-in-review:
if: github.event.action == 'ready_for_review' || github.event.action == 'review_requested'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_in_review.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-sync-add-label:
if: github.event.action == 'labeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_add_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-remove-label:
if: github.event.action == 'unlabeled'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_remove_label.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}
jira-status-pr-closed:
if: github.event.action == 'closed'
uses: scylladb/github-automation/.github/workflows/main_jira_sync_pr_closed.yml@main
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,14 +0,0 @@
name: Call Jira release creation for new milestone
on:
milestone:
types: [created]
jobs:
sync-milestone-to-jira:
uses: scylladb/github-automation/.github/workflows/main_sync_milestone_to_jira_release.yml@main
with:
# Comma-separated list of Jira project keys
jira_project_keys: "SCYLLADB,CUSTOMER"
secrets:
caller_jira_auth: ${{ secrets.USER_AND_KEY_FOR_JIRA_AUTOMATION }}

View File

@@ -1,13 +0,0 @@
name: validate_pr_author_email
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
jobs:
validate_pr_author_email:
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main

View File

@@ -13,5 +13,5 @@ jobs:
- uses: codespell-project/actions-codespell@master
with:
only_warn: 1
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison,iif,tread"
ignore_words_list: "ans,datas,fo,ser,ue,crate,nd,reenable,strat,stap,te,raison"
skip: "./.git,./build,./tools,*.js,*.lock,./test,./licenses,./redis/lolwut.cc,*.svg"

View File

@@ -7,7 +7,7 @@ on:
- enterprise
paths:
- '**/*.cc'
- 'scripts/metrics-config.yml'
- 'scripts/metrics-config.yml'
- 'scripts/get_description.py'
- 'docs/_ext/scylladb_metrics.py'
@@ -15,20 +15,20 @@ jobs:
validate-metrics:
runs-on: ubuntu-latest
name: Check metrics documentation coverage
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
- name: Install dependencies
run: pip install PyYAML
- name: Validate metrics
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml

View File

@@ -3,13 +3,10 @@ name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
pull_request_target:
types:
- unlabeled
jobs:
trigger-jenkins:
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI-Route Jenkins Job

View File

@@ -18,7 +18,6 @@ target_sources(alternator
consumed_capacity.cc
ttl.cc
parsed_expression_cache.cc
http_compression.cc
${cql_grammar_srcs})
target_include_directories(alternator
PUBLIC

View File

@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
if (!comparison_operator.IsString()) {
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
}
std::string op = rjson::to_string(comparison_operator);
std::string op = comparison_operator.GetString();
auto it = ops.find(op);
if (it == ops.end()) {
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
}
if (kv1.name == "S") {
return cmp(rjson::to_string_view(kv1.value),
rjson::to_string_view(kv2.value));
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
}
if (kv1.name == "B") {
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(rjson::to_string_view(kv_v.value),
rjson::to_string_view(kv_lb.value),
rjson::to_string_view(kv_ub.value),
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
bounds_from_query);
}
if (kv_v.name == "B") {

View File

@@ -8,8 +8,6 @@
#include "consumed_capacity.hh"
#include "error.hh"
#include "utils/rjson.hh"
#include <fmt/format.h>
namespace alternator {
@@ -34,12 +32,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
if (!return_consumed->IsString()) {
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
}
std::string_view consumed = rjson::to_string_view(*return_consumed);
std::string consumed = return_consumed->GetString();
if (consumed == "INDEXES") {
throw api_error::validation("INDEXES consumed capacity is not supported");
}
if (consumed != "TOTAL") {
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
throw api_error::validation("Unknown consumed capacity "+ consumed);
}
return true;
}

View File

@@ -28,7 +28,6 @@ static logging::logger logger("alternator_controller");
controller::controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -40,7 +39,6 @@ controller::controller(
: protocol_server(sg)
, _gossiper(gossiper)
, _proxy(proxy)
, _ss(ss)
, _mm(mm)
, _sys_dist_ks(sys_dist_ks)
, _cdc_gen_svc(cdc_gen_svc)
@@ -91,7 +89,7 @@ future<> controller::start_server() {
auto get_timeout_in_ms = [] (const db::config& cfg) -> utils::updateable_value<uint32_t> {
return cfg.alternator_timeout_in_ms;
};
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks),
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
@@ -171,7 +169,7 @@ future<> controller::request_stop_server() {
});
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
future<utils::chunked_vector<client_data>> controller::get_client_data() {
return _server.local().get_client_data();
}

View File

@@ -15,7 +15,6 @@
namespace service {
class storage_proxy;
class storage_service;
class migration_manager;
class memory_limiter;
}
@@ -58,7 +57,6 @@ class server;
class controller : public protocol_server {
sharded<gms::gossiper>& _gossiper;
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<service::migration_manager>& _mm;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<cdc::generation_service>& _cdc_gen_svc;
@@ -76,7 +74,6 @@ public:
controller(
sharded<gms::gossiper>& gossiper,
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<service::migration_manager>& mm,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<cdc::generation_service>& cdc_gen_svc,
@@ -96,7 +93,7 @@ public:
// This virtual function is called (on each shard separately) when the
// virtual table "system.clients" is read. It is expected to generate a
// list of clients connected to this server (on this shard).
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
};
}

View File

@@ -67,14 +67,6 @@ using namespace std::chrono_literals;
logging::logger elogger("alternator-executor");
namespace std {
template <> struct hash<std::pair<sstring, sstring>> {
size_t operator () (const std::pair<sstring, sstring>& p) const {
return std::hash<sstring>()(p.first) * 1009 + std::hash<sstring>()(p.second) * 3;
}
};
}
namespace alternator {
// Alternator-specific table properties stored as hidden table tags:
@@ -256,66 +248,14 @@ static const rjson::value::Member& get_single_member(const rjson::value& v, cons
return *(v.MemberBegin());
}
class executor::describe_table_info_manager : public service::migration_listener::empty_listener {
executor &_executor;
struct table_info {
utils::simple_value_with_expiry<std::uint64_t> size_in_bytes;
};
std::unordered_map<std::pair<sstring, sstring>, table_info> info_for_tables;
bool active = false;
public:
describe_table_info_manager(executor& executor) : _executor(executor) {
_executor._proxy.data_dictionary().real_database_ptr()->get_notifier().register_listener(this);
active = true;
}
describe_table_info_manager(const describe_table_info_manager &) = delete;
describe_table_info_manager(describe_table_info_manager&&) = delete;
~describe_table_info_manager() {
if (active) {
on_fatal_internal_error(elogger, "describe_table_info_manager was not stopped before destruction");
}
}
describe_table_info_manager &operator = (const describe_table_info_manager &) = delete;
describe_table_info_manager &operator = (describe_table_info_manager&&) = delete;
static std::chrono::high_resolution_clock::time_point now() {
return std::chrono::high_resolution_clock::now();
}
std::optional<std::uint64_t> get_cached_size_in_bytes(const sstring &ks_name, const sstring &cf_name) const {
auto it = info_for_tables.find({ks_name, cf_name});
if (it != info_for_tables.end()) {
return it->second.size_in_bytes.get();
}
return std::nullopt;
}
void cache_size_in_bytes(sstring ks_name, sstring cf_name, std::uint64_t size_in_bytes, std::chrono::high_resolution_clock::time_point expiry) {
info_for_tables[{std::move(ks_name), std::move(cf_name)}].size_in_bytes.set_if_longer_expiry(size_in_bytes, expiry);
}
future<> stop() {
co_await _executor._proxy.data_dictionary().real_database_ptr()->get_notifier().unregister_listener(this);
active = false;
co_return;
}
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
if (!ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX)) return;
info_for_tables.erase({ks_name, cf_name});
}
};
executor::executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
smp_service_group ssg,
utils::updateable_value<uint32_t> default_timeout_in_ms)
: _gossiper(gossiper),
_ss(ss),
_proxy(proxy),
_mm(mm),
_sdks(sdks),
@@ -328,7 +268,6 @@ executor::executor(gms::gossiper& gossiper,
_stats))
{
s_default_timeout_in_ms = std::move(default_timeout_in_ms);
_describe_table_info_manager = std::make_unique<describe_table_info_manager>(*this);
register_metrics(_metrics, _stats);
}
@@ -480,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
if (!table_name_value->IsString()) {
throw api_error::validation("Non-string TableName field in request");
}
std::string table_name = rjson::to_string(*table_name_value);
std::string table_name = table_name_value->GetString();
return table_name;
}
@@ -607,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
// does exist but the index does not (ValidationException).
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
@@ -648,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
attribute_name, value));
}
return rjson::to_string(*attribute_value);
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
}
// Convenience function for getting the value of a boolean attribute, or a
@@ -813,44 +752,12 @@ static future<bool> is_view_built(
}
future<> executor::cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl) {
auto expiry = describe_table_info_manager::now() + ttl;
return container().invoke_on_all(
[schema, size_in_bytes, expiry] (executor& exec) {
exec._describe_table_info_manager->cache_size_in_bytes(schema->ks_name(), schema->cf_name(), size_in_bytes, expiry);
});
}
future<> executor::fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting) {
auto cached_size = _describe_table_info_manager->get_cached_size_in_bytes(schema->ks_name(), schema->cf_name());
std::uint64_t total_size = 0;
if (cached_size) {
total_size = *cached_size;
} else {
// there's no point in trying to estimate value of table that is being deleted, as other nodes more often than not might
// move forward with deletion faster than we calculate the size
if (!deleting) {
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// which is also fine, as the specification doesn't give precision guarantees of any kind.
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
}
}
rjson::add(table_description, "TableSizeBytes", total_size);
}
future<rjson::value> executor::fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
static future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::storage_proxy& proxy, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit)
{
rjson::value table_description = rjson::empty_object();
auto tags_ptr = db::get_tags_of_table(schema);
rjson::add(table_description, "TableName", rjson::from_string(schema->cf_name()));
co_await fill_table_size(table_description, schema, tbl_status == table_status::deleting);
auto creation_timestamp = get_table_creation_time(*schema);
@@ -894,7 +801,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", wcu);
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
data_dictionary::table t = _proxy.data_dictionary().find_column_family(schema);
data_dictionary::table t = proxy.data_dictionary().find_column_family(schema);
if (tbl_status != table_status::deleting) {
rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp));
@@ -931,7 +840,7 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
// (for a built view) or CREATING+Backfilling (if view building
// is in progress).
if (!is_lsi) {
if (co_await is_view_built(vptr, _proxy, client_state, trace_state, permit)) {
if (co_await is_view_built(vptr, proxy, client_state, trace_state, permit)) {
rjson::add(view_entry, "IndexStatus", "ACTIVE");
} else {
rjson::add(view_entry, "IndexStatus", "CREATING");
@@ -959,8 +868,9 @@ future<rjson::value> executor::fill_table_description(schema_ptr schema, table_s
}
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
}
executor::supplement_table_stream_info(table_description, *schema, _proxy);
executor::supplement_table_stream_info(table_description, *schema, proxy);
// FIXME: still missing some response fields (issue #5026)
co_return table_description;
}
@@ -980,7 +890,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
get_stats_from_schema(_proxy, *schema)->api_operations.describe_table++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
rjson::value table_description = co_await fill_table_description(schema, table_status::active, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::active, _proxy, client_state, trace_state, permit);
rjson::value response = rjson::empty_object();
rjson::add(response, "Table", std::move(table_description));
elogger.trace("returning {}", response);
@@ -1083,7 +993,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
auto& p = _proxy.container();
schema_ptr schema = get_table(_proxy, request);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, client_state, trace_state, permit);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
size_t retries = mm.get_concurrent_ddl_retries();
@@ -1170,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
}
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == name) {
auto type = attribute_info["AttributeType"].GetString();
data_type dt = parse_key_type(type);
if (computed_column) {
// Computed column for GSI (doesn't choose a real column as-is
@@ -1206,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First element of KeySchema must be an object");
}
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
throw api_error::validation("First key in KeySchema must be a HASH key");
}
v = rjson::find((*key_schema)[0], "AttributeName");
@@ -1214,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First key in KeySchema must have string AttributeName");
}
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
std::string hash_key = rjson::to_string(*v);
std::string hash_key = v->GetString();
std::string range_key;
if (key_schema->Size() == 2) {
if (!(*key_schema)[1].IsObject()) {
throw api_error::validation("Second element of KeySchema must be an object");
}
v = rjson::find((*key_schema)[1], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
throw api_error::validation("Second key in KeySchema must be a RANGE key");
}
v = rjson::find((*key_schema)[1], "AttributeName");
@@ -1647,7 +1557,8 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
}
future<executor::request_return_type> executor::create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode) {
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request,
service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats, const db::tablets_mode_t::mode tablets_mode) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
@@ -1834,7 +1745,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
if (executor::add_stream_options(*stream_specification, builder, _proxy)) {
if (executor::add_stream_options(*stream_specification, builder, sp)) {
validate_cdc_log_name_length(builder.cf_name());
}
}
@@ -1853,7 +1764,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
set_table_creation_time(tags_map, db_clock::now());
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, _stats);
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
schema_ptr schema = builder.build();
for (auto& view_builder : view_builders) {
@@ -1869,18 +1780,18 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
}
size_t retries = _mm.get_concurrent_ddl_retries();
size_t retries = mm.get_concurrent_ddl_retries();
for (;;) {
auto group0_guard = co_await _mm.start_group0_operation();
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
utils::chunked_vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, _proxy, _gossiper, ts, tags_map, _proxy.features(), tablets_mode);
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts, tags_map, sp.features(), tablets_mode);
// Alternator Streams doesn't yet work when the table uses tablets (#23838)
if (stream_specification && stream_specification->IsObject()) {
auto stream_enabled = rjson::find(*stream_specification, "StreamEnabled");
if (stream_enabled && stream_enabled->IsBool() && stream_enabled->GetBool()) {
locator::replication_strategy_params params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option());
const auto& topo = _proxy.local_db().get_token_metadata().get_topology();
const auto& topo = sp.local_db().get_token_metadata().get_topology();
auto rs = locator::abstract_replication_strategy::create_replication_strategy(ksm->strategy_name(), params, topo);
if (rs->uses_tablets()) {
co_return api_error::validation("Streams not yet supported on a table using tablets (issue #23838). "
@@ -1888,19 +1799,14 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
}
}
// Creating an index in tablets mode requires the rf_rack_valid_keyspaces option to be enabled.
// GSI and LSI indexes are based on materialized views which require this option to avoid consistency issues.
if (!view_builders.empty() && ksm->uses_tablets() && !_proxy.data_dictionary().get_config().rf_rack_valid_keyspaces()) {
co_return api_error::validation("GlobalSecondaryIndexes and LocalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
}
try {
schema_mutations = service::prepare_new_keyspace_announcement(_proxy.local_db(), ksm, ts);
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
} catch (exceptions::already_exists_exception&) {
if (_proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
co_return api_error::resource_in_use(fmt::format("Table {} already exists", table_name));
}
}
if (_proxy.data_dictionary().try_find_table(schema->id())) {
if (sp.data_dictionary().try_find_table(schema->id())) {
// This should never happen, the ID is supposed to be unique
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
}
@@ -1909,9 +1815,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
for (schema_builder& view_builder : view_builders) {
schemas.push_back(view_builder.build());
}
co_await service::prepare_new_column_families_announcement(schema_mutations, _proxy, *ksm, schemas, ts);
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
if (ksm->uses_tablets()) {
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, _proxy);
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
}
// If a role is allowed to create a table, we must give it permissions to
@@ -1936,7 +1842,7 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
std::tie(schema_mutations, group0_guard) = co_await std::move(mc).extract();
try {
co_await _mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
co_await mm.announce(std::move(schema_mutations), std::move(group0_guard), fmt::format("alternator-executor: create {} table", table_name));
break;
} catch (const service::group0_concurrent_modification& ex) {
elogger.info("Failed to execute CreateTable {} due to concurrent schema modifications. {}.",
@@ -1948,9 +1854,9 @@ future<executor::request_return_type> executor::create_table_on_shard0(service::
}
}
co_await _mm.wait_for_schema_agreement(_proxy.local_db(), db::timeout_clock::now() + 10s, nullptr);
co_await mm.wait_for_schema_agreement(sp.local_db(), db::timeout_clock::now() + 10s, nullptr);
rjson::value status = rjson::empty_object();
executor::supplement_table_info(request, *schema, _proxy);
executor::supplement_table_info(request, *schema, sp);
rjson::add(status, "TableDescription", std::move(request));
co_return rjson::print(std::move(status));
}
@@ -1959,11 +1865,10 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), &e = this->container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
const db::tablets_mode_t::mode tablets_mode = _proxy.data_dictionary().get_config().tablets_mode_for_new_keyspaces(); // type cast
// `invoke_on` hopped us to shard 0, but `this` points to `executor` is from 'old' shard, we need to hop it too.
co_return co_await e.local().create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), enforce_authorization, warn_authorization, std::move(tablets_mode));
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, e.local()._stats, std::move(tablets_mode));
});
}
@@ -1982,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string def_type = type_to_string(def.type);
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
auto type = attribute_info["AttributeType"].GetString();
if (type != def_type) {
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
}
@@ -2114,10 +2019,6 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_return api_error::validation(fmt::format(
"LSI {} already exists in table {}, can't use same name for GSI", index_name, table_name));
}
if (p.local().local_db().find_keyspace(keyspace_name).get_replication_strategy().uses_tablets() &&
!p.local().data_dictionary().get_config().rf_rack_valid_keyspaces()) {
co_return api_error::validation("GlobalSecondaryIndexes with tablets require the rf_rack_valid_keyspaces option to be enabled.");
}
elogger.trace("Adding GSI {}", index_name);
// FIXME: read and handle "Projection" parameter. This will
@@ -2461,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
@@ -2882,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
return;
}
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
if (!used.contains(rjson::to_string(it->name))) {
if (!used.contains(it->name.GetString())) {
throw api_error::validation(
format("{} has spurious '{}', not used in {}",
field_name, rjson::to_string_view(it->name), operation));
field_name, it->name.GetString(), operation));
}
}
}
@@ -3099,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
}
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
try {
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
} catch(data_dictionary::no_such_column_family&) {
@@ -3154,44 +3055,17 @@ public:
}
};
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit)
{
if (!cas_shard.this_shard()) {
_stats.shard_bounce_for_lwt++;
return container().invoke_on(cas_shard.shard(), _ssg,
[cs = client_state.move_to_other_shard(),
&mb = mutation_builders,
&dk,
ks = schema->ks_name(),
cf = schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(executor& self) mutable {
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt), &self]
(service::client_state& client_state) mutable {
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
});
}
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector<put_or_delete_item>& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
auto timeout = executor::default_timeout();
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
return proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
@@ -3217,11 +3091,13 @@ struct schema_decorated_key_equal {
// FIXME: if we failed writing some of the mutations, need to return a list
// of these failed mutations rather than fail the whole write (issue #5650).
future<> executor::do_batch_write(
static future<> do_batch_write(service::storage_proxy& proxy,
smp_service_group ssg,
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit) {
service_permit permit,
stats& stats) {
if (mutation_builders.empty()) {
return make_ready_future<>();
}
@@ -3243,7 +3119,7 @@ future<> executor::do_batch_write(
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
return proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
@@ -3252,7 +3128,7 @@ future<> executor::do_batch_write(
false,
cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
});
} else {
// Do the write via LWT:
@@ -3264,35 +3140,46 @@ future<> executor::do_batch_write(
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto&& b : std::move(mutation_builders)) {
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
.schema = b.first,
.dk = dht::decorate_key(*b.first, b.second.pk())
});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders->try_emplace(schema_decorated_key{b.first, dk});
it->second.push_back(std::move(b.second));
}
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
_stats.write_using_lwt++;
return parallel_for_each(*key_builders_ptr, [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (const auto& e) {
stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
auto s = e.first.schema;
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, e.second, client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
&mb = e.second,
&dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, &mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
static const auto* injection_name = "alternator_executor_batch_write_wait";
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
const auto ks = handler.get("keyspace");
const auto cf = handler.get("table");
const auto shard = std::atoll(handler.get("shard")->data());
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
elogger.info("{}: hit", injection_name);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
elogger.info("{}: continue", injection_name);
}
}).then([&e, desired_shard = std::move(desired_shard),
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
{
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
});
// The desired_shard on the original shard remains alive for the duration
// of cas_write on this shard and prevents any tablet operations.
// However, we need a local instance of cas_shard on this shard
// to pass it to sp::cas, so we just create a new one.
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
}).finally([key_builders = std::move(key_builders)]{});
}
}
@@ -3440,7 +3327,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
_stats.api_operations.batch_write_item_batch_total += total_items;
_stats.api_operations.batch_write_item_histogram.add(total_items);
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
// FIXME: Issue #5650: If we failed writing some of the updates,
// need to return a list of these failed updates in UnprocessedItems
// rather than fail the whole write (issue #5650).
@@ -3485,7 +3372,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
}
rjson::value newv = rjson::empty_object();
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
std::string attr = rjson::to_string(it->name);
std::string attr = it->name.GetString();
auto x = members.find(attr);
if (x != members.end()) {
if (x->second) {
@@ -3705,7 +3592,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
attribute_path_map_add("AttributesToGet", ret, it->GetString());
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
@@ -4371,12 +4258,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
// Note that it.key() is the name of the column, *it is the operation
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
const column_definition* cdef = _schema->get_column_definition(column_name);
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
}
std::string action = rjson::to_string((it->value)["Action"]);
std::string action = (it->value)["Action"].GetString();
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -5573,7 +5460,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
sstring key = rjson::to_sstring(it->name);
std::string key = it->name.GetString();
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
@@ -5581,13 +5468,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (key == pk_cdef.name_as_text()) {
if (sstring(key) == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
@@ -5988,7 +5875,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
int limit = limit_json ? limit_json->GetInt() : 100;
if (limit < 1 || limit > 100) {
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");
@@ -6177,10 +6064,9 @@ future<> executor::start() {
}
future<> executor::stop() {
co_await _describe_table_info_manager->stop();
// disconnect from the value source, but keep the value unchanged.
s_default_timeout_in_ms = utils::updateable_value<uint32_t>{s_default_timeout_in_ms()};
co_await _parsed_expression_cache->stop();
return _parsed_expression_cache->stop();
}
} // namespace alternator

View File

@@ -17,13 +17,11 @@
#include "service/client_state.hh"
#include "service_permit.hh"
#include "db/timeout_clock.hh"
#include "db/config.hh"
#include "alternator/error.hh"
#include "stats.hh"
#include "utils/rjson.hh"
#include "utils/updateable_value.hh"
#include "utils/simple_value_with_expiry.hh"
#include "tracing/trace_state.hh"
@@ -42,8 +40,6 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
class storage_service;
}
namespace cdc {
@@ -60,9 +56,7 @@ class schema_builder;
namespace alternator {
enum class table_status;
class rmw_operation;
class put_or_delete_item;
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
@@ -140,7 +134,6 @@ class expression_cache;
class executor : public peering_sharded_service<executor> {
gms::gossiper& _gossiper;
service::storage_service& _ss;
service::storage_proxy& _proxy;
service::migration_manager& _mm;
db::system_distributed_keyspace& _sdks;
@@ -153,11 +146,6 @@ class executor : public peering_sharded_service<executor> {
std::unique_ptr<parsed::expression_cache> _parsed_expression_cache;
struct describe_table_info_manager;
std::unique_ptr<describe_table_info_manager> _describe_table_info_manager;
future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl);
future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting);
public:
using client_state = service::client_state;
// request_return_type is the return type of the executor methods, which
@@ -183,7 +171,6 @@ public:
executor(gms::gossiper& gossiper,
service::storage_proxy& proxy,
service::storage_service& ss,
service::migration_manager& mm,
db::system_distributed_keyspace& sdks,
cdc::metadata& cdc_metadata,
@@ -231,18 +218,6 @@ private:
friend class rmw_operation;
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit);
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit);
public:
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);

View File

@@ -1,301 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "alternator/http_compression.hh"
#include "alternator/server.hh"
#include <seastar/coroutine/maybe_yield.hh>
#include <zlib.h>
static logging::logger slogger("alternator-http-compression");
namespace alternator {
static constexpr size_t compressed_buffer_size = 1024;
class zlib_compressor {
z_stream _zs;
temporary_buffer<char> _output_buf;
noncopyable_function<future<>(temporary_buffer<char>&&)> _write_func;
public:
zlib_compressor(bool gzip, int compression_level, noncopyable_function<future<>(temporary_buffer<char>&&)> write_func)
: _write_func(std::move(write_func)) {
memset(&_zs, 0, sizeof(_zs));
if (deflateInit2(&_zs, std::clamp(compression_level, Z_NO_COMPRESSION, Z_BEST_COMPRESSION), Z_DEFLATED,
(gzip ? 16 : 0) + MAX_WBITS, 8, Z_DEFAULT_STRATEGY) != Z_OK) {
// Should only happen if memory allocation fails
throw std::bad_alloc();
}
}
~zlib_compressor() {
deflateEnd(&_zs);
}
future<> close() {
return compress(nullptr, 0, true);
}
future<> compress(const char* buf, size_t len, bool is_last_chunk = false) {
_zs.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(buf));
_zs.avail_in = (uInt) len;
int mode = is_last_chunk ? Z_FINISH : Z_NO_FLUSH;
while(_zs.avail_in > 0 || is_last_chunk) {
co_await coroutine::maybe_yield();
if (_output_buf.empty()) {
if (is_last_chunk) {
uint32_t max_buffer_size = 0;
deflatePending(&_zs, &max_buffer_size, nullptr);
max_buffer_size += deflateBound(&_zs, _zs.avail_in) + 1;
_output_buf = temporary_buffer<char>(std::min(compressed_buffer_size, (size_t) max_buffer_size));
} else {
_output_buf = temporary_buffer<char>(compressed_buffer_size);
}
_zs.next_out = reinterpret_cast<unsigned char*>(_output_buf.get_write());
_zs.avail_out = compressed_buffer_size;
}
int e = deflate(&_zs, mode);
if (e < Z_OK) {
throw api_error::internal("Error during compression of response body");
}
if (e == Z_STREAM_END || _zs.avail_out < compressed_buffer_size / 4) {
_output_buf.trim(compressed_buffer_size - _zs.avail_out);
co_await _write_func(std::move(_output_buf));
if (e == Z_STREAM_END) {
break;
}
}
}
}
};
// Helper string_view functions for parsing Accept-Encoding header
struct case_insensitive_cmp_sv {
bool operator()(std::string_view s1, std::string_view s2) const {
return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(),
[](char a, char b) { return ::tolower(a) == ::tolower(b); });
}
};
static inline std::string_view trim_left(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.front())))
sv.remove_prefix(1);
return sv;
}
static inline std::string_view trim_right(std::string_view sv) {
while (!sv.empty() && std::isspace(static_cast<unsigned char>(sv.back())))
sv.remove_suffix(1);
return sv;
}
static inline std::string_view trim(std::string_view sv) {
return trim_left(trim_right(sv));
}
inline std::vector<std::string_view> split(std::string_view text, char separator) {
std::vector<std::string_view> tokens;
if (text == "") {
return tokens;
}
while (true) {
auto pos = text.find_first_of(separator);
if (pos != std::string_view::npos) {
tokens.emplace_back(text.data(), pos);
text.remove_prefix(pos + 1);
} else {
tokens.emplace_back(text);
break;
}
}
return tokens;
}
constexpr response_compressor::compression_type response_compressor::get_compression_type(std::string_view encoding) {
for (size_t i = 0; i < static_cast<size_t>(compression_type::count); ++i) {
if (case_insensitive_cmp_sv{}(encoding, compression_names[i])) {
return static_cast<compression_type>(i);
}
}
return compression_type::unknown;
}
response_compressor::compression_type response_compressor::find_compression(std::string_view accept_encoding, size_t response_size) {
std::optional<float> ct_q[static_cast<size_t>(compression_type::count)];
ct_q[static_cast<size_t>(compression_type::none)] = std::numeric_limits<float>::min(); // enabled, but lowest priority
compression_type selected_ct = compression_type::none;
std::vector<std::string_view> entries = split(accept_encoding, ',');
for (auto& e : entries) {
std::vector<std::string_view> params = split(e, ';');
if (params.size() == 0) {
continue;
}
compression_type ct = get_compression_type(trim(params[0]));
if (ct == compression_type::unknown) {
continue; // ignore unknown encoding types
}
if (ct_q[static_cast<size_t>(ct)].has_value() && ct_q[static_cast<size_t>(ct)] != 0.0f) {
continue; // already processed this encoding
}
if (response_size < _threshold[static_cast<size_t>(ct)]) {
continue; // below threshold treat as unknown
}
for (size_t i = 1; i < params.size(); ++i) { // find "q=" parameter
auto pos = params[i].find("q=");
if (pos == std::string_view::npos) {
continue;
}
std::string_view param = params[i].substr(pos + 2);
param = trim(param);
// parse quality value
float q_value = 1.0f;
auto [ptr, ec] = std::from_chars(param.data(), param.data() + param.size(), q_value);
if (ec != std::errc() || ptr != param.data() + param.size()) {
continue;
}
if (q_value < 0.0) {
q_value = 0.0;
} else if (q_value > 1.0) {
q_value = 1.0;
}
ct_q[static_cast<size_t>(ct)] = q_value;
break; // we parsed quality value
}
if (!ct_q[static_cast<size_t>(ct)].has_value()) {
ct_q[static_cast<size_t>(ct)] = 1.0f; // default quality value
}
// keep the highest encoding (in the order, unless 'any')
if (selected_ct == compression_type::any) {
if (ct_q[static_cast<size_t>(ct)] >= ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
} else {
if (ct_q[static_cast<size_t>(ct)] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = ct;
}
}
}
if (selected_ct == compression_type::any) {
// select any not mentioned or highest quality
selected_ct = compression_type::none;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (!ct_q[i].has_value()) {
return static_cast<compression_type>(i);
}
if (ct_q[i] > ct_q[static_cast<size_t>(selected_ct)]) {
selected_ct = static_cast<compression_type>(i);
}
}
}
return selected_ct;
}
static future<chunked_content> compress(response_compressor::compression_type ct, const db::config& cfg, std::string str) {
chunked_content compressed;
auto write = [&compressed](temporary_buffer<char>&& buf) -> future<> {
compressed.push_back(std::move(buf));
return make_ready_future<>();
};
zlib_compressor compressor(ct != response_compressor::compression_type::deflate,
cfg.alternator_response_gzip_compression_level(), std::move(write));
co_await compressor.compress(str.data(), str.size(), true);
co_return compressed;
}
static sstring flatten(chunked_content&& cc) {
size_t total_size = 0;
for (const auto& chunk : cc) {
total_size += chunk.size();
}
sstring result = sstring{ sstring::initialized_later{}, total_size };
size_t offset = 0;
for (const auto& chunk : cc) {
std::copy(chunk.begin(), chunk.end(), result.begin() + offset);
offset += chunk.size();
}
return result;
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, std::string&& response_body) {
response_compressor::compression_type ct = find_compression(accept_encoding, response_body.size());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->set_content_type(content_type);
return compress(ct, cfg, std::move(response_body)).then([rep = std::move(rep)] (chunked_content compressed) mutable {
rep->_content = flatten(std::move(compressed));
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
});
} else {
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(response_body);
rep->set_content_type(content_type);
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
template<typename Compressor>
class compressed_data_sink_impl : public data_sink_impl {
output_stream<char> _out;
Compressor _compressor;
public:
template<typename... Args>
compressed_data_sink_impl(output_stream<char>&& out, Args&&... args)
: _out(std::move(out)), _compressor(std::forward<Args>(args)..., [this](temporary_buffer<char>&& buf) {
return _out.write(std::move(buf));
}) { }
future<> put(std::span<temporary_buffer<char>> data) override {
return data_sink_impl::fallback_put(data, [this] (temporary_buffer<char>&& buf) {
return do_put(std::move(buf));
});
}
private:
future<> do_put(temporary_buffer<char> buf) {
co_return co_await _compressor.compress(buf.get(), buf.size());
}
future<> close() override {
return _compressor.close().then([this] {
return _out.close();
});
}
};
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
output_stream_options opts;
opts.trim_to_size = true;
std::unique_ptr<data_sink_impl> data_sink_impl;
switch (ct) {
case response_compressor::compression_type::gzip:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), true, level);
break;
case response_compressor::compression_type::deflate:
data_sink_impl = std::make_unique<compressed_data_sink_impl<zlib_compressor>>(std::move(out), false, level);
break;
case response_compressor::compression_type::none:
case response_compressor::compression_type::any:
case response_compressor::compression_type::unknown:
on_internal_error(slogger,"Compression not selected");
default:
on_internal_error(slogger, "Unsupported compression type for data sink");
}
return bw(output_stream<char>(data_sink(std::move(data_sink_impl)), compressed_buffer_size, opts));
};
}
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
if (ct != response_compressor::compression_type::none) {
rep->add_header("Content-Encoding", get_encoding_name(ct));
rep->write_body(content_type, compress(ct, cfg, std::move(body_writer)));
} else {
rep->write_body(content_type, std::move(body_writer));
}
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
}
} // namespace alternator

View File

@@ -1,91 +0,0 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "alternator/executor.hh"
#include <seastar/http/httpd.hh>
#include "db/config.hh"
namespace alternator {
class response_compressor {
public:
enum class compression_type {
gzip,
deflate,
compressions_count,
any = compressions_count,
none,
count,
unknown = count
};
static constexpr std::string_view compression_names[] = {
"gzip",
"deflate",
"*",
"identity"
};
static sstring get_encoding_name(compression_type ct) {
return sstring(compression_names[static_cast<size_t>(ct)]);
}
static constexpr compression_type get_compression_type(std::string_view encoding);
sstring get_accepted_encoding(const http::request& req) {
if (get_threshold() == 0) {
return "";
}
return req.get_header("Accept-Encoding");
}
compression_type find_compression(std::string_view accept_encoding, size_t response_size);
response_compressor(const db::config& cfg)
: cfg(cfg)
,_gzip_level_observer(
cfg.alternator_response_gzip_compression_level.observe([this](int v) {
update_threshold();
}))
,_gzip_threshold_observer(
cfg.alternator_response_compression_threshold_in_bytes.observe([this](uint32_t v) {
update_threshold();
}))
{
update_threshold();
}
response_compressor(const response_compressor& rhs) : response_compressor(rhs.cfg) {}
private:
const db::config& cfg;
utils::observable<int>::observer _gzip_level_observer;
utils::observable<uint32_t>::observer _gzip_threshold_observer;
uint32_t _threshold[static_cast<size_t>(compression_type::count)];
size_t get_threshold() { return _threshold[static_cast<size_t>(compression_type::any)]; }
void update_threshold() {
_threshold[static_cast<size_t>(compression_type::none)] = std::numeric_limits<uint32_t>::max();
_threshold[static_cast<size_t>(compression_type::any)] = std::numeric_limits<uint32_t>::max();
uint32_t gzip = cfg.alternator_response_gzip_compression_level() <= 0 ? std::numeric_limits<uint32_t>::max()
: cfg.alternator_response_compression_threshold_in_bytes();
_threshold[static_cast<size_t>(compression_type::gzip)] = gzip;
_threshold[static_cast<size_t>(compression_type::deflate)] = gzip;
for (size_t i = 0; i < static_cast<size_t>(compression_type::compressions_count); ++i) {
if (_threshold[i] < _threshold[static_cast<size_t>(compression_type::any)]) {
_threshold[static_cast<size_t>(compression_type::any)] = _threshold[i];
}
}
}
public:
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, std::string&& response_body);
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
};
}

View File

@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
return {"", nullptr};
}
auto it = v.MemberBegin();
const std::string it_key = rjson::to_string(it->name);
const std::string it_key = it->name.GetString();
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
return {std::move(it_key), nullptr};
}

View File

@@ -34,7 +34,6 @@
#include "client_data.hh"
#include "utils/updateable_value.hh"
#include <zlib.h>
#include "alternator/http_compression.hh"
static logging::logger slogger("alternator-server");
@@ -112,12 +111,9 @@ class api_handler : public handler_base {
// type applies to all replies, both success and error.
static constexpr const char* REPLY_CONTENT_TYPE = "application/x-amz-json-1.0";
public:
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle,
const db::config& config) : _response_compressor(config), _f_handle(
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
sstring accept_encoding = _response_compressor.get_accepted_encoding(*req);
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped(
[this, rep = std::move(rep), accept_encoding=std::move(accept_encoding)](future<executor::request_return_type> resf) mutable {
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
if (resf.failed()) {
// Exceptions of type api_error are wrapped as JSON and
// returned to the client as expected. Other types of
@@ -137,20 +133,22 @@ public:
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
auto res = resf.get();
return std::visit(overloaded_functor {
std::visit(overloaded_functor {
[&] (std::string&& str) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(str));
// Note that despite the move, there is a copy here -
// as str is std::string and rep->_content is sstring.
rep->_content = std::move(str);
rep->set_content_type(REPLY_CONTENT_TYPE);
},
[&] (executor::body_writer&& body_writer) {
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
REPLY_CONTENT_TYPE, std::move(body_writer));
rep->write_body(REPLY_CONTENT_TYPE, std::move(body_writer));
},
[&] (const api_error& err) {
generate_error_reply(*rep, err);
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
}
}, std::move(res));
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}) { }
@@ -179,7 +177,6 @@ protected:
slogger.trace("api_handler error case: {}", rep._content);
}
response_compressor _response_compressor;
future_handler_function _f_handle;
};
@@ -711,12 +708,8 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto system_clients_entry = _ongoing_requests.emplace(
req->get_client_address(), std::move(user_agent_header),
req->get_client_address(), req->get_header("User-Agent"),
username, current_scheduling_group(),
req->get_protocol_name() == "https");
@@ -761,7 +754,7 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
void server::set_routes(routes& r) {
api_handler* req_handler = new api_handler([this] (std::unique_ptr<request> req) mutable {
return handle_api_request(std::move(req));
}, _proxy.data_dictionary().get_config());
});
r.put(operation_type::POST, "/", req_handler);
r.put(operation_type::GET, "/", new health_handler(_pending_requests));
@@ -992,10 +985,10 @@ client_data server::ongoing_request::make_client_data() const {
return cd;
}
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
future<utils::chunked_vector<client_data>> server::get_client_data() {
utils::chunked_vector<client_data> ret;
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
ret.emplace_back(r.make_client_data());
});
co_return ret;
}

View File

@@ -55,7 +55,6 @@ class server : public peering_sharded_service<server> {
// though it isn't really relevant for Alternator which defines its own
// timeouts separately. We can create this object only once.
updateable_timeout_config _timeout_config;
client_options_cache_type _connection_options_keys_and_values;
alternator_callbacks_map _callbacks;
@@ -89,7 +88,7 @@ class server : public peering_sharded_service<server> {
// is called when reading the "system.clients" virtual table.
struct ongoing_request {
socket_address _client_address;
client_options_cache_entry_type _user_agent;
sstring _user_agent;
sstring _username;
scheduling_group _scheduling_group;
bool _is_https;
@@ -108,7 +107,7 @@ public:
// table "system.clients" is read. It is expected to generate a list of
// clients connected to this server (on this shard). This function is
// called by alternator::controller::get_client_data().
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
future<utils::chunked_vector<client_data>> get_client_data();
private:
void set_routes(seastar::httpd::routes& r);
// If verification succeeds, returns the authenticated user's username

View File

@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
}
sstring attribute_name = rjson::to_sstring(*v);
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {

View File

@@ -31,7 +31,6 @@ set(swagger_files
api-doc/column_family.json
api-doc/commitlog.json
api-doc/compaction_manager.json
api-doc/client_routes.json
api-doc/config.json
api-doc/cql_server_test.json
api-doc/endpoint_snitch_info.json
@@ -69,7 +68,6 @@ target_sources(api
PRIVATE
api.cc
cache_service.cc
client_routes.cc
collectd.cc
column_family.cc
commitlog.cc

View File

@@ -1,23 +0,0 @@
, "client_routes_entry": {
"id": "client_routes_entry",
"summary": "An entry storing client routes",
"properties": {
"connection_id": {"type": "string"},
"host_id": {"type": "string", "format": "uuid"},
"address": {"type": "string"},
"port": {"type": "integer"},
"tls_port": {"type": "integer"},
"alternator_port": {"type": "integer"},
"alternator_https_port": {"type": "integer"}
},
"required": ["connection_id", "host_id", "address"]
}
, "client_routes_key": {
"id": "client_routes_key",
"summary": "A key of client_routes_entry",
"properties": {
"connection_id": {"type": "string"},
"host_id": {"type": "string", "format": "uuid"}
}
}

View File

@@ -1,74 +0,0 @@
, "/v2/client-routes":{
"get": {
"description":"List all client route entries",
"operationId":"get_client_routes",
"tags":["client_routes"],
"produces":[
"application/json"
],
"parameters":[],
"responses":{
"200":{
"schema":{
"type":"array",
"items":{ "$ref":"#/definitions/client_routes_entry" }
}
},
"default":{
"description":"unexpected error",
"schema":{"$ref":"#/definitions/ErrorModel"}
}
}
},
"post": {
"description":"Upsert one or more client route entries",
"operationId":"set_client_routes",
"tags":["client_routes"],
"parameters":[
{
"name":"body",
"in":"body",
"required":true,
"schema":{
"type":"array",
"items":{ "$ref":"#/definitions/client_routes_entry" }
}
}
],
"responses":{
"200":{ "description": "OK" },
"default":{
"description":"unexpected error",
"schema":{ "$ref":"#/definitions/ErrorModel" }
}
}
},
"delete": {
"description":"Delete one or more client route entries",
"operationId":"delete_client_routes",
"tags":["client_routes"],
"parameters":[
{
"name":"body",
"in":"body",
"required":true,
"schema":{
"type":"array",
"items":{ "$ref":"#/definitions/client_routes_key" }
}
}
],
"responses":{
"200":{
"description": "OK"
},
"default":{
"description":"unexpected error",
"schema":{
"$ref":"#/definitions/ErrorModel"
}
}
}
}
}

View File

@@ -3051,7 +3051,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -349,9 +349,13 @@
"type":"long",
"description":"The shard the task is running on"
},
"creation_time":{
"type":"datetime",
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
},
"start_time":{
"type":"datetime",
"description":"The start time of the task; unspecified (equal to epoch) when state == created"
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
},
"end_time":{
"type":"datetime",
@@ -398,13 +402,17 @@
"type":"boolean",
"description":"Boolean flag indicating whether the task can be aborted"
},
"creation_time":{
"type":"datetime",
"description":"The creation time of the task (when it was queued); extracted from the task_id UUID"
},
"start_time":{
"type":"datetime",
"description":"The start time of the task"
"description":"The start time of the task (when execution began); unspecified (equal to epoch) when state == created"
},
"end_time":{
"type":"datetime",
"description":"The end time of the task (unspecified when the task is not completed)"
"description":"The end time of the task (when execution completed); unspecified (equal to epoch) when the task is not completed"
},
"error":{
"type":"string",

View File

@@ -37,7 +37,6 @@
#include "raft.hh"
#include "gms/gossip_address_map.hh"
#include "service_levels.hh"
#include "client_routes.hh"
logging::logger apilog("api");
@@ -68,11 +67,9 @@ future<> set_server_init(http_context& ctx) {
rb02->set_api_doc(r);
rb02->register_api_file(r, "swagger20_header");
rb02->register_api_file(r, "metrics");
rb02->register_api_file(r, "client_routes");
rb->register_function(r, "system",
"The system related API");
rb02->add_definitions_file(r, "metrics");
rb02->add_definitions_file(r, "client_routes");
set_system(ctx, r);
rb->register_function(r, "error_injection",
"The error injection API");
@@ -132,16 +129,6 @@ future<> unset_server_storage_service(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_storage_service(ctx, r); });
}
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr) {
return ctx.http_server.set_routes([&ctx, &cr] (routes& r) {
set_client_routes(ctx, r, cr);
});
}
future<> unset_server_client_routes(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_client_routes(ctx, r); });
}
future<> set_load_meter(http_context& ctx, service::load_meter& lm) {
return ctx.http_server.set_routes([&ctx, &lm] (routes& r) { set_load_meter(ctx, r, lm); });
}

View File

@@ -29,7 +29,6 @@ class storage_proxy;
class storage_service;
class raft_group0_client;
class raft_group_registry;
class client_routes_service;
} // namespace service
@@ -100,8 +99,6 @@ future<> set_server_snitch(http_context& ctx, sharded<locator::snitch_ptr>& snit
future<> unset_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, service::raft_group0_client&);
future<> unset_server_storage_service(http_context& ctx);
future<> set_server_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr);
future<> unset_server_client_routes(http_context& ctx);
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
future<> unset_server_sstables_loader(http_context& ctx);
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb, sharded<gms::gossiper>& g);

View File

@@ -1,176 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/http/short_streams.hh>
#include "client_routes.hh"
#include "api/api.hh"
#include "service/storage_service.hh"
#include "service/client_routes.hh"
#include "utils/rjson.hh"
#include "api/api-doc/client_routes.json.hh"
using namespace seastar::httpd;
using namespace std::chrono_literals;
using namespace json;
extern logging::logger apilog;
namespace api {
static void validate_client_routes_endpoint(sharded<service::client_routes_service>& cr, sstring endpoint_name) {
if (!cr.local().get_feature_service().client_routes) {
apilog.warn("{}: called before the cluster feature was enabled", endpoint_name);
throw std::runtime_error(fmt::format("{} requires all nodes to support the CLIENT_ROUTES cluster feature", endpoint_name));
}
}
static sstring parse_string(const char* name, rapidjson::Value const& v) {
const auto it = v.FindMember(name);
if (it == v.MemberEnd()) {
throw bad_param_exception(fmt::format("Missing '{}'", name));
}
if (!it->value.IsString()) {
throw bad_param_exception(fmt::format("'{}' must be a string", name));
}
return {it->value.GetString(), it->value.GetStringLength()};
}
static std::optional<uint32_t> parse_port(const char* name, rapidjson::Value const& v) {
const auto it = v.FindMember(name);
if (it == v.MemberEnd()) {
return std::nullopt;
}
if (!it->value.IsInt()) {
throw bad_param_exception(fmt::format("'{}' must be an integer", name));
}
auto port = it->value.GetInt();
if (port < 1 || port > 65535) {
throw bad_param_exception(fmt::format("'{}' value={} is outside the allowed port range", name, port));
}
return port;
}
static std::vector<service::client_routes_service::client_route_entry> parse_set_client_array(const rapidjson::Document& root) {
if (!root.IsArray()) {
throw bad_param_exception("Body must be a JSON array");
}
std::vector<service::client_routes_service::client_route_entry> v;
v.reserve(root.GetArray().Size());
for (const auto& element : root.GetArray()) {
if (!element.IsObject()) { throw bad_param_exception("Each element must be object"); }
const auto port = parse_port("port", element);
const auto tls_port = parse_port("tls_port", element);
const auto alternator_port = parse_port("alternator_port", element);
const auto alternator_https_port = parse_port("alternator_https_port", element);
if (!port.has_value() && !tls_port.has_value() && !alternator_port.has_value() && !alternator_https_port.has_value()) {
throw bad_param_exception("At least one port field ('port', 'tls_port', 'alternator_port', 'alternator_https_port') must be specified");
}
v.emplace_back(
parse_string("connection_id", element),
utils::UUID{parse_string("host_id", element)},
parse_string("address", element),
port,
tls_port,
alternator_port,
alternator_https_port
);
}
return v;
}
static
future<json::json_return_type>
rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
validate_client_routes_endpoint(cr, "rest_set_client_routes");
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
co_await cr.local().set_client_routes(parse_set_client_array(root));
co_return seastar::json::json_void();
}
static std::vector<service::client_routes_service::client_route_key> parse_delete_client_array(const rapidjson::Document& root) {
if (!root.IsArray()) {
throw bad_param_exception("Body must be a JSON array");
}
std::vector<service::client_routes_service::client_route_key> v;
v.reserve(root.GetArray().Size());
for (const auto& element : root.GetArray()) {
v.emplace_back(
parse_string("connection_id", element),
utils::UUID{parse_string("host_id", element)}
);
}
return v;
}
static
future<json::json_return_type>
rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
validate_client_routes_endpoint(cr, "delete_client_routes");
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
co_return seastar::json::json_void();
}
static
future<json::json_return_type>
rest_get_client_routes(http_context& ctx, sharded<service::client_routes_service>& cr, std::unique_ptr<http::request> req) {
validate_client_routes_endpoint(cr, "get_client_routes");
co_return co_await cr.invoke_on(0, [] (service::client_routes_service& cr) -> future<json::json_return_type> {
co_return json::json_return_type(stream_range_as_array(co_await cr.get_client_routes(), [](const service::client_routes_service::client_route_entry & entry) {
seastar::httpd::client_routes_json::client_routes_entry obj;
obj.connection_id = entry.connection_id;
obj.host_id = fmt::to_string(entry.host_id);
obj.address = entry.address;
if (entry.port.has_value()) { obj.port = entry.port.value(); }
if (entry.tls_port.has_value()) { obj.tls_port = entry.tls_port.value(); }
if (entry.alternator_port.has_value()) { obj.alternator_port = entry.alternator_port.value(); }
if (entry.alternator_https_port.has_value()) { obj.alternator_https_port = entry.alternator_https_port.value(); }
return obj;
}));
});
}
void set_client_routes(http_context& ctx, routes& r, sharded<service::client_routes_service>& cr) {
seastar::httpd::client_routes_json::set_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
return rest_set_client_routes(ctx, cr, std::move(req));
});
seastar::httpd::client_routes_json::delete_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
return rest_delete_client_routes(ctx, cr, std::move(req));
});
seastar::httpd::client_routes_json::get_client_routes.set(r, [&ctx, &cr] (std::unique_ptr<seastar::http::request> req) {
return rest_get_client_routes(ctx, cr, std::move(req));
});
}
void unset_client_routes(http_context& ctx, routes& r) {
seastar::httpd::client_routes_json::set_client_routes.unset(r);
seastar::httpd::client_routes_json::delete_client_routes.unset(r);
seastar::httpd::client_routes_json::get_client_routes.unset(r);
}
}

View File

@@ -1,20 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/sharded.hh>
#include <seastar/json/json_elements.hh>
#include "api/api_init.hh"
namespace api {
void set_client_routes(http_context& ctx, httpd::routes& r, sharded<service::client_routes_service>& cr);
void unset_client_routes(http_context& ctx, httpd::routes& r);
}

View File

@@ -547,13 +547,17 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
vp.insert(b.second);
}
}
std::vector<sstring> res;
replica::database& db = vb.local().get_db();
auto uuid = validate_table(db, ks, cf_name);
replica::column_family& cf = db.find_column_family(uuid);
co_return cf.get_index_manager().list_indexes()
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
| std::ranges::to<std::vector>();
res.reserve(cf.get_index_manager().list_indexes().size());
for (auto&& i : cf.get_index_manager().list_indexes()) {
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
res.emplace_back(i.metadata().name());
}
}
co_return res;
});
}

View File

@@ -55,6 +55,7 @@ tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& g
res.scope = status.scope;
res.state = status.state;
res.is_abortable = bool(status.is_abortable);
res.creation_time = get_time(status.creation_time);
res.start_time = get_time(status.start_time);
res.end_time = get_time(status.end_time);
res.error = status.error;
@@ -83,6 +84,7 @@ tm::task_stats make_stats(tasks::task_stats stats) {
res.table = stats.table;
res.entity = stats.entity;
res.shard = stats.shard;
res.creation_time = get_time(stats.creation_time);
res.start_time = get_time(stats.start_time);
res.end_time = get_time(stats.end_time);;
return res;

View File

@@ -9,6 +9,7 @@
#include "auth/allow_all_authenticator.hh"
#include "service/migration_manager.hh"
#include "utils/alien_worker.hh"
#include "utils/class_registrator.hh"
namespace auth {
@@ -22,6 +23,7 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -14,6 +14,7 @@
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
class query_processor;
@@ -29,7 +30,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
}
virtual future<> start() override {

View File

@@ -15,7 +15,6 @@
#include "db/system_keyspace.hh"
#include "schema/schema.hh"
#include <iterator>
#include <seastar/core/abort_source.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/core/format.hh>
@@ -23,11 +22,9 @@ namespace auth {
logging::logger logger("auth-cache");
cache::cache(cql3::query_processor& qp, abort_source& as) noexcept
cache::cache(cql3::query_processor& qp) noexcept
: _current_version(0)
, _qp(qp)
, _loading_sem(1)
, _as(as) {
, _qp(qp) {
}
lw_shared_ptr<const cache::role_record> cache::get(const role_name_t& role) const noexcept {
@@ -119,8 +116,6 @@ future<> cache::load_all() {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
auto units = co_await get_units(_loading_sem, 1, _as);
++_current_version;
logger.info("Loading all roles");
@@ -151,9 +146,6 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
if (legacy_mode(_qp)) {
co_return;
}
SCYLLA_ASSERT(this_shard_id() == 0);
auto units = co_await get_units(_loading_sem, 1, _as);
for (const auto& name : roles) {
logger.info("Loading role {}", name);
auto role = co_await fetch_role(name);

View File

@@ -8,7 +8,6 @@
#pragma once
#include <seastar/core/abort_source.hh>
#include <unordered_set>
#include <unordered_map>
@@ -16,7 +15,6 @@
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/semaphore.hh>
#include <absl/container/flat_hash_map.h>
@@ -43,7 +41,7 @@ public:
version_tag_t version; // used for seamless cache reloads
};
explicit cache(cql3::query_processor& qp, abort_source& as) noexcept;
explicit cache(cql3::query_processor& qp) noexcept;
lw_shared_ptr<const role_record> get(const role_name_t& role) const noexcept;
future<> load_all();
future<> load_roles(std::unordered_set<role_name_t> roles);
@@ -54,8 +52,6 @@ private:
roles_map _roles;
version_tag_t _current_version;
cql3::query_processor& _qp;
semaphore _loading_sem;
abort_source& _as;
future<lw_shared_ptr<role_record>> fetch_role(const role_name_t& role) const;
future<> prune_all() noexcept;

View File

@@ -35,13 +35,14 @@ static const class_registrator<auth::authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
, auth::cache&
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();
@@ -76,9 +77,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
}
continue;
} catch (const std::out_of_range&) {
} catch (std::out_of_range&) {
// just fallthrough
} catch (const boost::regex_error&) {
} catch (boost::regex_error&) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
}
}

View File

@@ -10,6 +10,7 @@
#pragma once
#include "auth/authenticator.hh"
#include "utils/alien_worker.hh"
#include <boost/regex_fwd.hpp> // IWYU pragma: keep
namespace cql3 {
@@ -33,7 +34,7 @@ class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
~certificate_authenticator();
future<> start() override;

View File

@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
try {
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
} catch (const exceptions::already_exists_exception&) {}
} catch (exceptions::already_exists_exception&) {}
}
}

View File

@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
} else {
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
}
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
}
}
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
[resource](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}

View File

@@ -49,7 +49,8 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
cache&,
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -63,13 +64,14 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
, _hashing_worker(hashing_worker)
{}
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
@@ -328,18 +330,20 @@ future<authenticated_user> password_authenticator::authenticate(
}
salted_hash = role->salted_hash;
}
const bool password_match = co_await passwords::check(password, *salted_hash);
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
return passwords::check(password, *salted_hash);
});
if (!password_match) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
co_return username;
} catch (const std::system_error &) {
} catch (std::system_error &) {
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (const exceptions::authentication_exception& e) {
} catch (exceptions::authentication_exception& e) {
std::throw_with_nested(e);
} catch (const exceptions::unavailable_exception& e) {
} catch (exceptions::unavailable_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));

View File

@@ -18,6 +18,7 @@
#include "auth/passwords.hh"
#include "auth/cache.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
namespace db {
class config;
@@ -48,12 +49,13 @@ class password_authenticator : public authenticator {
shared_promise<> _superuser_created_promise;
// We used to also support bcrypt, SHA-256, and MD5 (ref. scylladb#24524).
constexpr static auth::passwords::scheme _scheme = passwords::scheme::sha_512;
utils::alien_worker& _hashing_worker;
public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
~password_authenticator();

View File

@@ -7,8 +7,6 @@
*/
#include "auth/passwords.hh"
#include "utils/crypt_sha512.hh"
#include <seastar/core/coroutine.hh>
#include <cerrno>
@@ -23,46 +21,25 @@ static thread_local crypt_data tlcrypt = {};
namespace detail {
void verify_hashing_output(const char * res) {
if (!res || (res[0] == '*')) {
throw std::system_error(errno, std::system_category());
}
}
void verify_scheme(scheme scheme) {
const sstring random_part_of_salt = "aaaabbbbccccdddd";
const sstring salt = sstring(prefix_for_scheme(scheme)) + random_part_of_salt;
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
try {
verify_hashing_output(e);
} catch (const std::system_error& ex) {
throw no_supported_schemes();
if (e && (e[0] != '*')) {
return;
}
throw no_supported_schemes();
}
sstring hash_with_salt(const sstring& pass, const sstring& salt) {
auto res = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
verify_hashing_output(res);
return res;
}
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt) {
sstring res;
// Only SHA-512 hashes for passphrases shorter than 256 bytes can be computed using
// the __crypt_sha512 method. For other computations, we fall back to the
// crypt_r implementation from `<crypt.h>`, which can stall.
if (salt.starts_with(prefix_for_scheme(scheme::sha_512)) && pass.size() <= 255) {
char buf[128];
const char * output_ptr = co_await __crypt_sha512(pass.c_str(), salt.c_str(), buf);
verify_hashing_output(output_ptr);
res = output_ptr;
} else {
const char * output_ptr = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
verify_hashing_output(output_ptr);
res = output_ptr;
if (!res || (res[0] == '*')) {
throw std::system_error(errno, std::system_category());
}
co_return res;
return res;
}
std::string_view prefix_for_scheme(scheme c) noexcept {
@@ -81,9 +58,8 @@ no_supported_schemes::no_supported_schemes()
: std::runtime_error("No allowed hashing schemes are supported on this system") {
}
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash) {
const auto pwd_hash = co_await detail::hash_with_salt_async(pass, salted_hash);
co_return pwd_hash == salted_hash;
bool check(const sstring& pass, const sstring& salted_hash) {
return detail::hash_with_salt(pass, salted_hash) == salted_hash;
}
} // namespace auth::passwords

View File

@@ -11,7 +11,6 @@
#include <random>
#include <stdexcept>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
@@ -76,19 +75,10 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
///
/// Hash a password combined with an implementation-specific salt string.
/// Deprecated in favor of `hash_with_salt_async`.
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
[[deprecated("Use hash_with_salt_async instead")]] sstring hash_with_salt(const sstring& pass, const sstring& salt);
///
/// Async version of `hash_with_salt` that returns a future.
/// If possible, hashing uses `coroutine::maybe_yield` to prevent reactor stalls.
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt);
sstring hash_with_salt(const sstring& pass, const sstring& salt);
} // namespace detail
@@ -117,6 +107,6 @@ sstring hash(const sstring& pass, RandomNumberEngine& g, scheme scheme) {
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash);
bool check(const sstring& pass, const sstring& salted_hash);
} // namespace auth::passwords

View File

@@ -35,9 +35,10 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
cache&,
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -12,6 +12,7 @@
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
class query_processor;
@@ -29,7 +30,7 @@ namespace auth {
class saslauthd_authenticator : public authenticator {
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
public:
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
future<> start() override;

View File

@@ -191,7 +191,8 @@ service::service(
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache)
cache& cache,
utils::alien_worker& hashing_worker)
: service(
std::move(c),
cache,
@@ -199,7 +200,7 @@ service::service(
g0,
mn,
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
used_by_maintenance_socket) {
}
@@ -225,7 +226,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (const ::service::group0_concurrent_modification&) {
} catch (::service::group0_concurrent_modification&) {
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}

View File

@@ -27,6 +27,7 @@
#include "cql3/description.hh"
#include "seastarx.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
#include "utils/observable.hh"
#include "utils/serialized_action.hh"
#include "service/maintenance_mode.hh"
@@ -130,7 +131,8 @@ public:
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&);
cache&,
utils::alien_worker&);
future<> start(::service::migration_manager&, db::system_keyspace&);

View File

@@ -192,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
log.info("Created default superuser role '{}'.", _superuser);
} catch (const exceptions::unavailable_exception& e) {
} catch(const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}

View File

@@ -38,8 +38,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -81,7 +81,7 @@ public:
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -126,7 +126,7 @@ public:
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
_complete = true;
return {};
}
@@ -141,7 +141,7 @@ public:
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -241,7 +241,8 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
auth::cache&,
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<
auth::authorizer,

View File

@@ -10,9 +10,7 @@
#include <seastar/net/inet_address.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include "utils/loading_shared_values.hh"
#include <list>
#include <optional>
enum class client_type {
@@ -29,20 +27,6 @@ enum class client_connection_stage {
ready,
};
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
struct options_cache_value_type {};
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
using client_options_cache_key_type = client_options_cache_type::key_type;
// This struct represents a single OPTION key-value pair from the client's connection options.
// Both key and value are represented by corresponding "references" to their cached values.
// Each "reference" is effectively a lw_shared_ptr value.
struct client_option_key_value_cached_entry {
client_options_cache_entry_type key;
client_options_cache_entry_type value;
};
sstring to_string(client_connection_stage ct);
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
@@ -53,8 +37,8 @@ struct client_data {
client_connection_stage connection_stage = client_connection_stage::established;
int32_t shard_id; /// ID of server-side shard which is processing the connection.
std::optional<client_options_cache_entry_type> driver_name;
std::optional<client_options_cache_entry_type> driver_version;
std::optional<sstring> driver_name;
std::optional<sstring> driver_version;
std::optional<sstring> hostname;
std::optional<int32_t> protocol_version;
std::optional<sstring> ssl_cipher_suite;
@@ -62,7 +46,6 @@ struct client_data {
std::optional<sstring> ssl_protocol;
std::optional<sstring> username;
std::optional<sstring> scheduling_group_name;
std::list<client_option_key_value_cached_entry> client_options;
sstring stage_str() const { return to_string(connection_stage); }
sstring client_type_str() const { return to_string(ct); }

View File

@@ -125,6 +125,10 @@ if(target_arch)
add_compile_options("-march=${target_arch}")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
endif()
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")

View File

@@ -12,7 +12,6 @@
#include <seastar/core/condition-variable.hh>
#include "schema/schema_fwd.hh"
#include "sstables/open_info.hh"
#include "compaction_descriptor.hh"
class reader_permit;
@@ -45,7 +44,7 @@ public:
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
virtual sstables::shared_sstable make_sstable() const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;

View File

@@ -416,9 +416,7 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
}
descriptor.creator = [&t] (shard_id) {
// All compaction types going through this path will work on normal input sstables only.
// Off-strategy, for example, waits until the sstables move out of staging state.
return t.make_sstable(sstables::sstable_state::normal);
return t.make_sstable();
};
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
@@ -1849,10 +1847,6 @@ protected:
throw make_compaction_stopped_exception();
}
}, false);
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
throw make_compaction_stopped_exception();
}
co_return co_await do_rewrite_sstable(std::move(sst));
}
};
@@ -2290,16 +2284,12 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
}
future<std::vector<sstables::shared_sstable>>
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
// Throw an error if split cannot be performed due to e.g. out of space prevention.
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
// which is unneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
if (is_disabled()) {
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
"reason might be out of space prevention", sst->get_filename()))));
if (!can_proceed(&t)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
std::vector<sstables::shared_sstable> ret;
@@ -2307,11 +2297,8 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
compaction_progress_monitor monitor;
compaction_data info = create_compaction_data();
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
desc.creator = [&t, sst] (shard_id _) {
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
// For example, if base table has views, it's important that sstable produced by repair will be
// in the staging state.
return t.make_sstable(sst->state());
desc.creator = [&t] (shard_id _) {
return t.make_sstable();
};
desc.replacer = [&] (compaction_completion_desc d) {
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));

View File

@@ -376,8 +376,7 @@ public:
// Splits a single SSTable by segregating all its data according to the classifier.
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
// Run a custom job for a given table, defined by a function
// it completes when future returned by job is ready or returns immediately

View File

@@ -571,10 +571,10 @@ commitlog_total_space_in_mb: -1
# - "none": auditing is disabled (default)
# - "table": save audited events in audit.audit_log column family
# - "syslog": send audited events via syslog (depends on OS, but usually to /dev/log)
audit: "table"
# audit: "none"
#
# List of statement categories that should be audited.
audit_categories: "DCL,DDL,AUTH,ADMIN"
# audit_categories: "DCL,DDL,AUTH"
#
# List of tables that should be audited.
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"

View File

@@ -368,87 +368,6 @@ def find_ninja():
sys.exit(1)
def find_compiler(name):
"""
Find a compiler by name, skipping ccache wrapper directories.
This is useful when using sccache to avoid double-caching through ccache.
Args:
name: The compiler name (e.g., 'clang++', 'clang', 'gcc')
Returns:
Path to the compiler, skipping ccache directories, or None if not found.
"""
ccache_dirs = {'/usr/lib/ccache', '/usr/lib64/ccache'}
for path_dir in os.environ.get('PATH', '').split(os.pathsep):
# Skip ccache wrapper directories
if os.path.realpath(path_dir) in ccache_dirs or path_dir in ccache_dirs:
continue
candidate = os.path.join(path_dir, name)
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
return candidate
return None
def resolve_compilers_for_compiler_cache(args, compiler_cache):
"""
When using a compiler cache, resolve compiler paths to avoid ccache directories.
This prevents double-caching when ccache symlinks are in PATH.
Args:
args: The argument namespace with cc and cxx attributes.
compiler_cache: Path to the compiler cache binary, or None.
"""
if not compiler_cache:
return
if not os.path.isabs(args.cxx):
real_cxx = find_compiler(args.cxx)
if real_cxx:
args.cxx = real_cxx
if not os.path.isabs(args.cc):
real_cc = find_compiler(args.cc)
if real_cc:
args.cc = real_cc
def find_compiler_cache(preference):
"""
Find a compiler cache based on the preference.
Args:
preference: One of 'auto', 'sccache', 'ccache', 'none', or a path to a binary.
Returns:
Path to the compiler cache binary, or None if not found/disabled.
"""
if preference == 'none':
return None
if preference == 'auto':
# Prefer sccache over ccache
for cache in ['sccache', 'ccache']:
path = which(cache)
if path:
return path
return None
if preference in ('sccache', 'ccache'):
path = which(preference)
if path:
return path
print(f"Warning: {preference} not found on PATH, disabling compiler cache")
return None
# Assume it's a path to a binary
if os.path.isfile(preference) and os.access(preference, os.X_OK):
return preference
print(f"Warning: compiler cache '{preference}' not found or not executable, disabling compiler cache")
return None
modes = {
'debug': {
'cxxflags': '-DDEBUG -DSANITIZE -DDEBUG_LSA_SANITIZER -DSCYLLA_ENABLE_ERROR_INJECTION',
@@ -813,8 +732,6 @@ arg_parser.add_argument('--compiler', action='store', dest='cxx', default='clang
help='C++ compiler path')
arg_parser.add_argument('--c-compiler', action='store', dest='cc', default='clang',
help='C compiler path')
arg_parser.add_argument('--compiler-cache', action='store', dest='compiler_cache', default='auto',
help='Compiler cache to use: auto (default, prefers sccache), sccache, ccache, none, or a path to a binary')
add_tristate(arg_parser, name='dpdk', dest='dpdk', default=False,
help='Use dpdk (from seastar dpdk sources)')
arg_parser.add_argument('--dpdk-target', action='store', dest='dpdk_target', default='',
@@ -942,7 +859,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/alien_worker.cc',
'utils/array-search.cc',
'utils/base64.cc',
'utils/crypt_sha512.cc',
'utils/logalloc.cc',
'utils/large_bitset.cc',
'utils/buffer_input_stream.cc',
@@ -1034,7 +950,6 @@ scylla_core = (['message/messaging_service.cc',
'cql3/functions/aggregate_fcts.cc',
'cql3/functions/castas_fcts.cc',
'cql3/functions/error_injection_fcts.cc',
'cql3/functions/vector_similarity_fcts.cc',
'cql3/statements/cf_prop_defs.cc',
'cql3/statements/cf_statement.cc',
'cql3/statements/authentication_statement.cc',
@@ -1242,7 +1157,6 @@ scylla_core = (['message/messaging_service.cc',
'locator/topology.cc',
'locator/util.cc',
'service/client_state.cc',
'service/client_routes.cc',
'service/storage_service.cc',
'service/session.cc',
'service/task_manager_module.cc',
@@ -1403,8 +1317,6 @@ api = ['api/api.cc',
'api/storage_proxy.cc',
Json2Code('api/api-doc/cache_service.json'),
'api/cache_service.cc',
Json2Code('api/api-doc/client_routes.json'),
'api/client_routes.cc',
Json2Code('api/api-doc/collectd.json'),
'api/collectd.cc',
Json2Code('api/api-doc/endpoint_snitch_info.json'),
@@ -1454,7 +1366,6 @@ alternator = [
'alternator/auth.cc',
'alternator/streams.cc',
'alternator/ttl.cc',
'alternator/http_compression.cc'
]
idls = ['idl/gossip_digest.idl.hh',
@@ -1568,6 +1479,7 @@ deps = {
pure_boost_tests = set([
'test/boost/anchorless_list_test',
'test/boost/auth_passwords_test',
'test/boost/auth_resource_test',
'test/boost/big_decimal_test',
'test/boost/caching_options_test',
@@ -1700,7 +1612,6 @@ deps['test/boost/combined_tests'] += [
'test/boost/schema_registry_test.cc',
'test/boost/secondary_index_test.cc',
'test/boost/sessions_test.cc',
'test/boost/simple_value_with_expiry_test.cc',
'test/boost/sstable_compaction_test.cc',
'test/boost/sstable_compressor_factory_test.cc',
'test/boost/sstable_compression_config_test.cc',
@@ -1784,18 +1695,6 @@ deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vect
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
# We need to link these files to all Boost tests to make sure that
# we can execute `--list_json_content` on them. That will produce
# a similar result as calling `--list_content={HRF,DOT}`.
# Unfortunately, to be able to do that, we're forced to link the
# relevant code by hand.
for key in deps.keys():
for prefix in boost_tests_prefixes:
if key.startswith(prefix):
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
wasm_deps = {}
wasm_deps['wasm/return_input.wat'] = 'test/resource/wasm/rust/return_input.rs'
@@ -2100,7 +1999,7 @@ def semicolon_separated(*flags):
def real_relpath(path, start):
return os.path.relpath(os.path.realpath(path), os.path.realpath(start))
def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
def configure_seastar(build_dir, mode, mode_config):
seastar_cxx_ld_flags = mode_config['cxx_ld_flags']
# We want to "undo" coverage for seastar if we have it enabled.
if args.coverage:
@@ -2147,10 +2046,6 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
'-DSeastar_IO_URING=ON',
]
if compiler_cache:
seastar_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
if args.stack_guards is not None:
stack_guards = 'ON' if args.stack_guards else 'OFF'
seastar_cmake_args += ['-DSeastar_STACK_GUARDS={}'.format(stack_guards)]
@@ -2182,7 +2077,7 @@ def configure_seastar(build_dir, mode, mode_config, compiler_cache=None):
subprocess.check_call(seastar_cmd, shell=False, cwd=cmake_dir)
def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
def configure_abseil(build_dir, mode, mode_config):
abseil_cflags = mode_config['lib_cflags']
cxx_flags = mode_config['cxxflags']
if '-DSANITIZE' in cxx_flags:
@@ -2208,10 +2103,6 @@ def configure_abseil(build_dir, mode, mode_config, compiler_cache=None):
'-DABSL_PROPAGATE_CXX_STD=ON',
]
if compiler_cache:
abseil_cmake_args += [f'-DCMAKE_CXX_COMPILER_LAUNCHER={compiler_cache}',
f'-DCMAKE_C_COMPILER_LAUNCHER={compiler_cache}']
cmake_args = abseil_cmake_args[:]
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
abseil_cmd = ['cmake', '-G', 'Ninja', real_relpath('abseil', abseil_build_dir)] + cmake_args
@@ -2357,6 +2248,15 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
if debuginfo and mode_config['can_have_debug_info']:
cxxflags += ['-g', '-gz']
if 'clang' in cxx:
# Since AssignmentTracking was enabled by default in clang
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
# coroutine frame debugging info (`coro_frame_ty`) is broken.
#
# It seems that we aren't losing much by disabling AssigmentTracking,
# so for now we choose to disable it to get `coro_frame_ty` back.
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
return cxxflags
@@ -2384,15 +2284,10 @@ def write_build_file(f,
scylla_product,
scylla_version,
scylla_release,
compiler_cache,
args):
use_precompiled_header = not args.disable_precompiled_header
warnings = get_warning_options(args.cxx)
rustc_target = pick_rustc_target('wasm32-wasi', 'wasm32-wasip1')
# If compiler cache is available, prefix the compiler with it
cxx_with_cache = f'{compiler_cache} {args.cxx}' if compiler_cache else args.cxx
# For Rust, sccache is used via RUSTC_WRAPPER environment variable
rustc_wrapper = f'RUSTC_WRAPPER={compiler_cache} ' if compiler_cache and 'sccache' in compiler_cache else ''
f.write(textwrap.dedent('''\
configure_args = {configure_args}
builddir = {outdir}
@@ -2455,7 +2350,7 @@ def write_build_file(f,
command = clang --target=wasm32 --no-standard-libraries -Wl,--export-all -Wl,--no-entry $in -o $out
description = C2WASM $out
rule rust2wasm
command = {rustc_wrapper}cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
command = cargo build --target={rustc_target} --example=$example --locked --manifest-path=test/resource/wasm/rust/Cargo.toml --target-dir=$builddir/wasm/ $
&& wasm-opt -Oz $builddir/wasm/{rustc_target}/debug/examples/$example.wasm -o $builddir/wasm/$example.wasm $
&& wasm-strip $builddir/wasm/$example.wasm
description = RUST2WASM $out
@@ -2471,7 +2366,7 @@ def write_build_file(f,
command = llvm-profdata merge $in -output=$out
''').format(configure_args=configure_args,
outdir=outdir,
cxx=cxx_with_cache,
cxx=args.cxx,
user_cflags=user_cflags,
warnings=warnings,
defines=defines,
@@ -2479,7 +2374,6 @@ def write_build_file(f,
user_ldflags=user_ldflags,
libs=libs,
rustc_target=rustc_target,
rustc_wrapper=rustc_wrapper,
link_pool_depth=link_pool_depth,
seastar_path=args.seastar_path,
ninja=ninja,
@@ -2564,10 +2458,10 @@ def write_build_file(f,
description = TEST {mode}
# This rule is unused for PGO stages. They use the rust lib from the parent mode.
rule rust_lib.{mode}
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' {rustc_wrapper}cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
command = CARGO_BUILD_DEP_INFO_BASEDIR='.' cargo build --locked --manifest-path=rust/Cargo.toml --target-dir=$builddir/{mode} --profile=rust-{mode} $
&& touch $out
description = RUST_LIB $out
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, rustc_wrapper=rustc_wrapper, **modeval))
''').format(mode=mode, antlr3_exec=args.antlr3_exec, fmt_lib=fmt_lib, test_repeat=args.test_repeat, test_timeout=args.test_timeout, **modeval))
f.write(
'build {mode}-build: phony {artifacts} {wasms} {vector_search_validator_bins}\n'.format(
mode=mode,
@@ -2631,7 +2525,7 @@ def write_build_file(f,
# In debug/sanitize modes, we compile with fsanitizers,
# so must use the same options during the link:
if '-DSANITIZE' in modes[mode]['cxxflags']:
f.write(' libs = -fsanitize=address -fsanitize=undefined -lubsan\n')
f.write(' libs = -fsanitize=address -fsanitize=undefined\n')
else:
f.write(' libs =\n')
f.write(f'build $builddir/{mode}/{binary}.stripped: strip $builddir/{mode}/{binary}\n')
@@ -3027,9 +2921,6 @@ def create_build_system(args):
os.makedirs(outdir, exist_ok=True)
compiler_cache = find_compiler_cache(args.compiler_cache)
resolve_compilers_for_compiler_cache(args, compiler_cache)
scylla_product, scylla_version, scylla_release = generate_version(args.date_stamp)
for mode, mode_config in build_modes.items():
@@ -3046,8 +2937,8 @@ def create_build_system(args):
# {outdir}/{mode}/seastar/build.ninja, and
# {outdir}/{mode}/seastar/seastar.pc is queried for building flags
for mode, mode_config in build_modes.items():
configure_seastar(outdir, mode, mode_config, compiler_cache)
configure_abseil(outdir, mode, mode_config, compiler_cache)
configure_seastar(outdir, mode, mode_config)
configure_abseil(outdir, mode, mode_config)
user_cflags += ' -isystem abseil'
for mode, mode_config in build_modes.items():
@@ -3070,7 +2961,6 @@ def create_build_system(args):
scylla_product,
scylla_version,
scylla_release,
compiler_cache,
args)
generate_compdb('compile_commands.json', ninja, args.buildfile, selected_modes)
@@ -3113,10 +3003,6 @@ def configure_using_cmake(args):
selected_modes = args.selected_modes or default_modes
selected_configs = ';'.join(build_modes[mode].cmake_build_type for mode
in selected_modes)
compiler_cache = find_compiler_cache(args.compiler_cache)
resolve_compilers_for_compiler_cache(args, compiler_cache)
settings = {
'CMAKE_CONFIGURATION_TYPES': selected_configs,
'CMAKE_CROSS_CONFIGS': selected_configs,
@@ -3134,14 +3020,6 @@ def configure_using_cmake(args):
'Scylla_WITH_DEBUG_INFO' : 'ON' if args.debuginfo else 'OFF',
'Scylla_USE_PRECOMPILED_HEADER': 'OFF' if args.disable_precompiled_header else 'ON',
}
if compiler_cache:
settings['CMAKE_CXX_COMPILER_LAUNCHER'] = compiler_cache
settings['CMAKE_C_COMPILER_LAUNCHER'] = compiler_cache
# For Rust, sccache is used via RUSTC_WRAPPER
if 'sccache' in compiler_cache:
settings['Scylla_RUSTC_WRAPPER'] = compiler_cache
if args.date_stamp:
settings['Scylla_DATE_STAMP'] = args.date_stamp
if args.staticboost:
@@ -3173,7 +3051,7 @@ def configure_using_cmake(args):
if not args.dist_only:
for mode in selected_modes:
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode], compiler_cache)
configure_seastar(build_dir, build_modes[mode].cmake_build_type, modes[mode])
cmake_command = ['cmake']
cmake_command += [f'-D{var}={value}' for var, value in settings.items()]

View File

@@ -47,7 +47,6 @@ target_sources(cql3
functions/aggregate_fcts.cc
functions/castas_fcts.cc
functions/error_injection_fcts.cc
functions/vector_similarity_fcts.cc
statements/cf_prop_defs.cc
statements/cf_statement.cc
statements/authentication_statement.cc

View File

@@ -431,7 +431,6 @@ unaliasedSelector returns [uexpression tmp]
| K_TTL '(' c=cident ')' { tmp = column_mutation_attribute{column_mutation_attribute::attribute_kind::ttl,
unresolved_identifier{std::move(c)}}; }
| f=functionName args=selectionFunctionArgs { tmp = function_call{std::move(f), std::move(args)}; }
| f=similarityFunctionName args=vectorSimilarityArgs { tmp = function_call{std::move(f), std::move(args)}; }
| K_CAST '(' arg=unaliasedSelector K_AS t=native_type ')' { tmp = cast{.style = cast::cast_style::sql, .arg = std::move(arg), .type = std::move(t)}; }
)
( '.' fi=cident { tmp = field_selection{std::move(tmp), std::move(fi)}; }
@@ -446,18 +445,6 @@ selectionFunctionArgs returns [std::vector<expression> a]
')'
;
vectorSimilarityArgs returns [std::vector<expression> a]
: '(' ')'
| '(' v1=vectorSimilarityArg { a.push_back(std::move(v1)); }
( ',' vn=vectorSimilarityArg { a.push_back(std::move(vn)); } )*
')'
;
vectorSimilarityArg returns [uexpression a]
: s=unaliasedSelector { a = std::move(s); }
| v=value { a = std::move(v); }
;
countArgument
: '*'
| i=INTEGER { if (i->getText() != "1") {
@@ -1696,10 +1683,6 @@ functionName returns [cql3::functions::function_name s]
: (ks=keyspaceName '.')? f=allowedFunctionName { $s.keyspace = std::move(ks); $s.name = std::move(f); }
;
similarityFunctionName returns [cql3::functions::function_name s]
: f=allowedSimilarityFunctionName { $s = cql3::functions::function_name::native_function(std::move(f)); }
;
allowedFunctionName returns [sstring s]
: f=IDENT { $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
| f=QUOTED_NAME { $s = $f.text; }
@@ -1708,11 +1691,6 @@ allowedFunctionName returns [sstring s]
| K_COUNT { $s = "count"; }
;
allowedSimilarityFunctionName returns [sstring s]
: f=(K_SIMILARITY_COSINE | K_SIMILARITY_EUCLIDEAN | K_SIMILARITY_DOT_PRODUCT)
{ $s = $f.text; std::transform(s.begin(), s.end(), s.begin(), ::tolower); }
;
functionArgs returns [std::vector<expression> a]
: '(' ')'
| '(' t1=term { a.push_back(std::move(t1)); }
@@ -2409,10 +2387,6 @@ K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
K_VECTOR_SEARCH_INDEXING: V E C T O R '_' S E A R C H '_' I N D E X I N G;
K_SIMILARITY_EUCLIDEAN: S I M I L A R I T Y '_' E U C L I D E A N;
K_SIMILARITY_COSINE: S I M I L A R I T Y '_' C O S I N E;
K_SIMILARITY_DOT_PRODUCT: S I M I L A R I T Y '_' D O T '_' P R O D U C T;
// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');

View File

@@ -25,11 +25,6 @@ public:
NOT_ASSIGNABLE,
};
struct vector_test_result {
test_result result;
std::optional<size_t> dimension_opt;
};
static bool is_assignable(test_result tr) {
return tr != test_result::NOT_ASSIGNABLE;
}
@@ -49,8 +44,6 @@ public:
*/
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const = 0;
virtual vector_test_result test_assignment_any_size_float_vector() const = 0;
virtual std::optional<data_type> assignment_testable_type_opt() const = 0;
// for error reporting

View File

@@ -1434,112 +1434,6 @@ test_assignment(const expression& expr, data_dictionary::database db, const sstr
}, expr);
}
template <cql3_type::kind... Kinds>
assignment_testable::vector_test_result
test_assignment_any_size_float_vector(const expression& expr) {
using test_result = assignment_testable::vector_test_result;
const test_result NOT_ASSIGNABLE = {assignment_testable::test_result::NOT_ASSIGNABLE, std::nullopt};
const test_result WEAKLY_ASSIGNABLE = {assignment_testable::test_result::WEAKLY_ASSIGNABLE, std::nullopt};
auto is_float_or_bind = [] (const expression& e) {
return expr::visit(overloaded_functor{
[] (const bind_variable&) {
return true;
},
[] (const untyped_constant& uc) {
return uc.partial_type == untyped_constant::type_class::floating_point
|| uc.partial_type == untyped_constant::type_class::integer;
},
[] (const constant& value) {
auto kind = value.type->as_cql3_type().get_kind();
return cql3_type::kind_enum_set::frozen<Kinds...>().contains(kind);
},
[] (const auto&) {
return false;
},
}, e);
};
auto validate_assignment = [&] (const data_type& dt) -> test_result {
auto vt = dynamic_pointer_cast<const vector_type_impl>(dt->underlying_type());
if (!vt) {
return NOT_ASSIGNABLE;
}
auto elem_kind = vt->get_elements_type()->as_cql3_type().get_kind();
if (cql3_type::kind_enum_set::frozen<Kinds...>().contains(elem_kind)) {
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, vt->get_dimension()};
}
return NOT_ASSIGNABLE;
};
return expr::visit(overloaded_functor{
[&] (const constant& value) -> test_result {
return validate_assignment(value.type);
},
[&] (const binary_operator&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const conjunction&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const column_value& col_val) -> test_result {
return validate_assignment(col_val.col->type);
},
[&] (const subscript&) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const unresolved_identifier& ui) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const column_mutation_attribute& cma) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const function_call& fc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const cast& c) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const field_selection& fs) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const bind_variable& bv) -> test_result {
return WEAKLY_ASSIGNABLE;
},
[&] (const untyped_constant& uc) -> test_result {
return uc.partial_type == untyped_constant::type_class::null
? WEAKLY_ASSIGNABLE
: NOT_ASSIGNABLE;
},
[&] (const tuple_constructor& tc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const collection_constructor& c) -> test_result {
switch (c.style) {
case collection_constructor::style_type::list_or_vector: {
if(std::ranges::all_of(c.elements, is_float_or_bind)) {
return {assignment_testable::test_result::WEAKLY_ASSIGNABLE, c.elements.size()};
}
return NOT_ASSIGNABLE;
}
case collection_constructor::style_type::set: return NOT_ASSIGNABLE;
case collection_constructor::style_type::map: return NOT_ASSIGNABLE;
case collection_constructor::style_type::vector:
on_internal_error(expr_logger, "vector style type found in test_assignment, should have been introduced post-prepare");
}
on_internal_error(expr_logger, fmt::format("unexpected collection_constructor style {}", static_cast<unsigned>(c.style)));
},
[&] (const usertype_constructor& uc) -> test_result {
return NOT_ASSIGNABLE;
},
[&] (const temporary& t) -> test_result {
return NOT_ASSIGNABLE;
},
}, expr);
}
assignment_testable::vector_test_result
test_assignment_any_size_float_vector(const expression& expr) {
return test_assignment_any_size_float_vector<cql3_type::kind::FLOAT, cql3_type::kind::DOUBLE>(expr);
}
expression
prepare_expression(const expression& expr, data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, lw_shared_ptr<column_specification> receiver) {
auto e_opt = try_prepare_expression(expr, db, keyspace, schema_opt, std::move(receiver));
@@ -1573,9 +1467,6 @@ public:
virtual test_result test_assignment(data_dictionary::database db, const sstring& keyspace, const schema* schema_opt, const column_specification& receiver) const override {
return expr::test_assignment(_e, db, keyspace, schema_opt, receiver);
}
virtual vector_test_result test_assignment_any_size_float_vector() const override {
return expr::test_assignment_any_size_float_vector(_e);
}
virtual sstring assignment_testable_source_context() const override {
return fmt::format("{}", _e);
}

View File

@@ -16,7 +16,6 @@
#include "cql3/functions/user_function.hh"
#include "cql3/functions/user_aggregate.hh"
#include "cql3/functions/uuid_fcts.hh"
#include "cql3/functions/vector_similarity_fcts.hh"
#include "data_dictionary/data_dictionary.hh"
#include "as_json_function.hh"
#include "cql3/prepare_context.hh"
@@ -399,14 +398,6 @@ functions::get(data_dictionary::database db,
}
});
const auto func_name = name.has_keyspace() ? name : name.as_native_function();
if (SIMILARITY_FUNCTIONS.contains(func_name)) {
auto arg_types = retrieve_vector_arg_types(func_name, provided_args);
auto fun = ::make_shared<vector_similarity_fct>(func_name.name, arg_types);
validate_types(db, keyspace, schema.get(), fun, provided_args, receiver_ks, receiver_cf);
return fun;
}
if (name.has_keyspace()
? name == TOKEN_FUNCTION_NAME
: name.name == TOKEN_FUNCTION_NAME.name) {

View File

@@ -1,150 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "vector_similarity_fcts.hh"
#include "types/types.hh"
#include "types/vector.hh"
#include "exceptions/exceptions.hh"
namespace cql3 {
namespace functions {
namespace {
// The computations of similarity scores match the exact formulas of Cassandra's (jVector's) implementation to ensure compatibility.
// There exist tests checking the compliance of the results.
// Reference:
// https://github.com/datastax/jvector/blob/f967f1c9249035b63b55a566fac7d4dc38380349/jvector-base/src/main/java/io/github/jbellis/jvector/vector/VectorSimilarityFunction.java#L36-L69
// You should only use this function if you need to preserve the original vectors and cannot normalize
// them in advance.
float compute_cosine_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double dot_product = 0.0;
double squared_norm_a = 0.0;
double squared_norm_b = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
dot_product += a * b;
squared_norm_a += a * a;
squared_norm_b += b * b;
}
if (squared_norm_a == 0 || squared_norm_b == 0) {
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
}
// The cosine similarity is in the range [-1, 1].
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
// for consistency with other similarity functions.
return (1 + (dot_product / (std::sqrt(squared_norm_a * squared_norm_b)))) / 2;
}
float compute_euclidean_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double sum = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
double diff = a - b;
sum += diff * diff;
}
// The squared Euclidean (L2) distance is of range [0, inf).
// It is mapped to a similarity score in the range (0, 1] (0 -> 1, inf -> 0)
// for consistency with other similarity functions.
return (1 / (1 + sum));
}
// Assumes that both vectors are L2-normalized.
// This similarity is intended as an optimized way to perform cosine similarity calculation.
float compute_dot_product_similarity(const std::vector<data_value>& v1, const std::vector<data_value>& v2) {
double dot_product = 0.0;
for (size_t i = 0; i < v1.size(); ++i) {
double a = value_cast<float>(v1[i]);
double b = value_cast<float>(v2[i]);
dot_product += a * b;
}
// The dot product is in the range [-1, 1] for L2-normalized vectors.
// It is mapped to a similarity score in the range [0, 1] (-1 -> 0, 1 -> 1)
// for consistency with other similarity functions.
return ((1 + dot_product) / 2);
}
} // namespace
thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS = {
{SIMILARITY_COSINE_FUNCTION_NAME, compute_cosine_similarity},
{SIMILARITY_EUCLIDEAN_FUNCTION_NAME, compute_euclidean_similarity},
{SIMILARITY_DOT_PRODUCT_FUNCTION_NAME, compute_dot_product_similarity},
};
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args) {
if (provided_args.size() != 2) {
throw exceptions::invalid_request_exception(fmt::format("Invalid number of arguments for function {}(vector<float, n>, vector<float, n>)", name));
}
auto [first_result, first_dim_opt] = provided_args[0]->test_assignment_any_size_float_vector();
auto [second_result, second_dim_opt] = provided_args[1]->test_assignment_any_size_float_vector();
auto invalid_type_error_message = [&name](const shared_ptr<assignment_testable>& arg) {
auto type = arg->assignment_testable_type_opt();
const auto& source_context = arg->assignment_testable_source_context();
if (type) {
return fmt::format("Function {} requires a float vector argument, but found {} of type {}", name, source_context, type.value()->cql3_type_name());
} else {
return fmt::format("Function {} requires a float vector argument, but found {}", name, source_context);
}
};
if (!is_assignable(first_result)) {
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[0]));
}
if (!is_assignable(second_result)) {
throw exceptions::invalid_request_exception(invalid_type_error_message(provided_args[1]));
}
if (!first_dim_opt && !second_dim_opt) {
throw exceptions::invalid_request_exception(fmt::format("Cannot infer type of argument {} for function {}(vector<float, n>, vector<float, n>)",
provided_args[0]->assignment_testable_source_context(), name));
}
if (first_dim_opt && second_dim_opt) {
if (*first_dim_opt != *second_dim_opt) {
throw exceptions::invalid_request_exception(fmt::format(
"All arguments must have the same vector dimensions, but found vector<float, {}> and vector<float, {}>", *first_dim_opt, *second_dim_opt));
}
}
size_t dimension = first_dim_opt ? *first_dim_opt : *second_dim_opt;
auto type = vector_type_impl::get_instance(float_type, dimension);
return {type, type};
}
bytes_opt vector_similarity_fct::execute(std::span<const bytes_opt> parameters) {
if (std::any_of(parameters.begin(), parameters.end(), [](const auto& param) {
return !param;
})) {
return std::nullopt;
}
const auto& type = arg_types()[0];
data_value v1 = type->deserialize(*parameters[0]);
data_value v2 = type->deserialize(*parameters[1]);
const auto& v1_elements = value_cast<std::vector<data_value>>(v1);
const auto& v2_elements = value_cast<std::vector<data_value>>(v2);
float result = SIMILARITY_FUNCTIONS.at(_name)(v1_elements, v2_elements);
return float_type->decompose(result);
}
} // namespace functions
} // namespace cql3

View File

@@ -1,37 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "native_scalar_function.hh"
#include "cql3/assignment_testable.hh"
#include "cql3/functions/function_name.hh"
namespace cql3 {
namespace functions {
static const function_name SIMILARITY_COSINE_FUNCTION_NAME = function_name::native_function("similarity_cosine");
static const function_name SIMILARITY_EUCLIDEAN_FUNCTION_NAME = function_name::native_function("similarity_euclidean");
static const function_name SIMILARITY_DOT_PRODUCT_FUNCTION_NAME = function_name::native_function("similarity_dot_product");
using similarity_function_t = float (*)(const std::vector<data_value>&, const std::vector<data_value>&);
extern thread_local const std::unordered_map<function_name, similarity_function_t> SIMILARITY_FUNCTIONS;
std::vector<data_type> retrieve_vector_arg_types(const function_name& name, const std::vector<shared_ptr<assignment_testable>>& provided_args);
class vector_similarity_fct : public native_scalar_function {
public:
vector_similarity_fct(const sstring& name, const std::vector<data_type>& arg_types)
: native_scalar_function(name, float_type, arg_types) {
}
virtual bytes_opt execute(std::span<const bytes_opt> parameters) override;
};
} // namespace functions
} // namespace cql3

View File

@@ -64,10 +64,6 @@ bool query_processor::topology_global_queue_empty() {
return remote().first.get().ss.topology_global_queue_empty();
}
future<bool> query_processor::ongoing_rf_change(const service::group0_guard& guard, sstring ks) {
return remote().first.get().ss.ongoing_rf_change(guard, std::move(ks));
}
static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}

View File

@@ -474,7 +474,6 @@ public:
void reset_cache();
bool topology_global_queue_empty();
future<bool> ongoing_rf_change(const service::group0_guard& guard, sstring ks);
query_options make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,

View File

@@ -1322,10 +1322,6 @@ const std::vector<expr::expression>& statement_restrictions::index_restrictions(
return _index_restrictions;
}
bool statement_restrictions::is_empty() const {
return !_where.has_value();
}
// Current score table:
// local and restrictions include full partition key: 2
// global: 1

View File

@@ -408,8 +408,6 @@ public:
/// Checks that the primary key restrictions don't contain null values, throws invalid_request_exception otherwise.
void validate_primary_key(const query_options& options) const;
bool is_empty() const;
};
statement_restrictions analyze_statement_restrictions(

View File

@@ -32,7 +32,7 @@ bool
selectable_processes_selection(const expr::expression& selectable) {
return expr::visit(overloaded_functor{
[&] (const expr::constant&) -> bool {
return true;
on_internal_error(slogger, "no way to express SELECT constant in the grammar yet");
},
[&] (const expr::conjunction& conj) -> bool {
on_internal_error(slogger, "no way to express 'SELECT a AND b' in the grammar yet");

View File

@@ -19,7 +19,6 @@
#include "locator/abstract_replication_strategy.hh"
#include "mutation/canonical_mutation.hh"
#include "prepared_statement.hh"
#include "seastar/coroutine/exception.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
@@ -139,7 +138,6 @@ bool cql3::statements::alter_keyspace_statement::changes_tablets(query_processor
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::query_state& state, const query_options& options, service::group0_batch& mc) const {
using namespace cql_transport;
bool unknown_keyspace = false;
try {
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
auto ks = qp.db().find_keyspace(_name);
@@ -160,19 +158,14 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
// when in reality nothing or only schema is being changed
if (changes_tablets(qp)) {
if (!qp.proxy().features().topology_global_request_queue && !qp.topology_global_queue_empty()) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
if (qp.proxy().features().rack_list_rf && co_await qp.ongoing_rf_change(mc.guard(),_name)) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception(format("Another RF change for this keyspace {} ongoing, please retry.", _name)));
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
qp.db().real_database().validate_keyspace_update(*ks_md_update);
service::topology_mutation_builder builder(ts);
service::topology_request_tracking_mutation_builder rtbuilder{global_request_id, qp.proxy().features().topology_requests_type_column};
rtbuilder.set("done", false)
.set("start_time", db_clock::now());
rtbuilder.set("done", false);
if (!qp.proxy().features().topology_global_request_queue) {
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(global_request_id);
@@ -248,15 +241,10 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
target_type,
keyspace());
mc.add_mutations(std::move(muts), "CQL alter keyspace");
co_return std::make_tuple(std::move(ret), warnings);
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), warnings));
} catch (data_dictionary::no_such_keyspace& e) {
unknown_keyspace = true;
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
}
if (unknown_keyspace) {
co_await coroutine::return_exception(
exceptions::invalid_request_exception("Unknown keyspace " + _name));
}
std::unreachable();
}
std::unique_ptr<cql3::statements::prepared_statement>

View File

@@ -190,7 +190,7 @@ future<utils::chunked_vector<mutation>> batch_statement::get_mutations(query_pro
co_return vresult;
}
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const {
void batch_statement::verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) {
if (mutations.size() <= 1) {
return; // We only warn for batch spanning multiple mutations
}
@@ -209,9 +209,8 @@ void batch_statement::verify_batch_size(query_processor& qp, const utils::chunke
for (auto&& m : mutations) {
ks_cf_pairs.insert(m.schema()->ks_name() + "." + m.schema()->cf_name());
}
const auto batch_type = _type == type::LOGGED ? "Logged" : "Unlogged";
return seastar::format("{} batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
batch_type, mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
return seastar::format("Batch modifying {:d} partitions in {} is of size {:d} bytes, exceeding specified {} threshold of {:d} by {:d}.",
mutations.size(), fmt::join(ks_cf_pairs, ", "), size, type, threshold, size - threshold);
};
if (size > fail_threshold) {
_logger.error("{}", error("FAIL", fail_threshold).c_str());

View File

@@ -116,7 +116,7 @@ public:
* Checks batch size to ensure threshold is met. If not, a warning is logged.
* @param cfs ColumnFamilies that will store the batch's mutations.
*/
void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations) const;
static void verify_batch_size(query_processor& qp, const utils::chunked_vector<mutation>& mutations);
virtual future<shared_ptr<cql_transport::messages::result_message>> execute(
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const override;

View File

@@ -710,12 +710,11 @@ std::vector<lw_shared_ptr<column_specification>> listing_describe_statement::get
future<std::vector<std::vector<managed_bytes_opt>>> listing_describe_statement::describe(cql3::query_processor& qp, const service::client_state& client_state) const {
auto db = qp.db();
auto raw_ks = client_state.get_raw_keyspace();
std::vector<sstring> keyspaces;
// For most describe statements we should limit the results to the USEd
// keyspace (client_state.get_raw_keyspace()), if any. However for DESC
// KEYSPACES we must list all keyspaces, not just the USEd one.
if (_element != element_type::keyspace && !client_state.get_raw_keyspace().empty()) {
keyspaces.push_back(client_state.get_raw_keyspace());
if (!raw_ks.empty()) {
keyspaces.push_back(raw_ks);
} else {
keyspaces = db.get_all_keyspaces();
std::ranges::sort(keyspaces);

View File

@@ -61,7 +61,7 @@ expand_to_racks(const locator::token_metadata& tm,
// Handle ALTER:
// ([]|0) -> numeric is allowed, there are no existing replicas
// numeric -> numeric' is not supported unless numeric == numeric'. User should convert RF to rack list of equal count first.
// numeric -> numeric' is not supported. User should convert RF to rack list of equal count first.
// rack_list -> len(rack_list) is allowed (no-op)
// rack_list -> numeric is not allowed
if (old_options.contains(dc)) {
@@ -75,8 +75,6 @@ expand_to_racks(const locator::token_metadata& tm,
"Cannot change replication factor for '{}' from {} to numeric {}, use rack list instead",
dc, old_rf_val, data.count()));
}
} else if (old_rf.count() == data.count()) {
return rf;
} else if (old_rf.count() > 0) {
throw exceptions::configuration_exception(fmt::format(
"Cannot change replication factor for '{}' from {} to {}, only rack list is allowed",
@@ -155,8 +153,6 @@ static locator::replication_strategy_config_options prepare_options(
}
// Validate options.
bool numeric_to_rack_list_transition = false;
bool rf_change = false;
for (auto&& [dc, opt] : options) {
locator::replication_factor_data rf(opt);
@@ -166,7 +162,6 @@ static locator::replication_strategy_config_options prepare_options(
old_rf = locator::replication_factor_data(i->second);
}
rf_change = rf_change || (old_rf && old_rf->count() != rf.count()) || (!old_rf && rf.count() != 0);
if (!rf.is_rack_based()) {
if (old_rf && old_rf->is_rack_based() && rf.count() != 0) {
if (old_rf->count() != rf.count()) {
@@ -192,11 +187,12 @@ static locator::replication_strategy_config_options prepare_options(
throw exceptions::configuration_exception(fmt::format(
"Rack list for '{}' contains duplicate entries", dc));
}
numeric_to_rack_list_transition = numeric_to_rack_list_transition || (old_rf && !old_rf->is_rack_based() && old_rf->count() != 0);
}
if (numeric_to_rack_list_transition && rf_change) {
throw exceptions::configuration_exception("Cannot change replication factor from numeric to rack list and rf value at the same time");
if (old_rf && !old_rf->is_rack_based() && old_rf->count() != 0) {
// FIXME: Allow this if replicas already conform to the given rack list.
// FIXME: Implement automatic colocation to allow transition to rack list.
throw exceptions::configuration_exception(fmt::format(
"Cannot change replication factor from numeric to rack list for '{}'", dc));
}
}
if (!rf && options.empty() && old_options.empty()) {
@@ -416,7 +412,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(s
? std::optional<unsigned>(0) : std::nullopt;
auto initial_tablets = get_initial_tablets(default_initial_tablets, cfg.enforce_tablets());
bool uses_tablets = initial_tablets.has_value();
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
bool rack_list_enabled = feat.rack_list_rf;
auto options = prepare_options(sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), {}, rack_list_enabled, uses_tablets);
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
std::move(options), initial_tablets, get_consistency_option(), get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
@@ -432,7 +428,7 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
throw exceptions::invalid_request_exception("Cannot alter replication strategy vnode/tablets flavor");
}
auto sc = get_replication_strategy_class();
bool rack_list_enabled = utils::get_local_injector().enter("create_with_numeric") ? false : feat.rack_list_rf;
bool rack_list_enabled = feat.rack_list_rf;
if (sc) {
options = prepare_options(*sc, tm, cfg.rf_rack_valid_keyspaces(), get_replication_options(), old_options, rack_list_enabled, uses_tablets);
} else {

View File

@@ -1976,7 +1976,7 @@ mutation_fragments_select_statement::do_execute(query_processor& qp, service::qu
if (it == indexes.end()) {
throw exceptions::invalid_request_exception("ANN ordering by vector requires the column to be indexed using 'vector_index'");
}
if (index_opt || parameters->allow_filtering() || !(restrictions->is_empty()) || check_needs_allow_filtering_anyway(*restrictions)) {
if (index_opt || parameters->allow_filtering() || restrictions->need_filtering() || check_needs_allow_filtering_anyway(*restrictions)) {
throw exceptions::invalid_request_exception("ANN ordering by vector does not support filtering");
}
index_opt = *it;

View File

@@ -42,11 +42,6 @@ table::get_index_manager() const {
return _ops->get_index_manager(*this);
}
db_clock::time_point
table::get_truncation_time() const {
return _ops->get_truncation_time(*this);
}
lw_shared_ptr<keyspace_metadata>
keyspace::metadata() const {
return _ops->get_keyspace_metadata(*this);

View File

@@ -77,7 +77,6 @@ public:
schema_ptr schema() const;
const std::vector<view_ptr>& views() const;
const secondary_index::secondary_index_manager& get_index_manager() const;
db_clock::time_point get_truncation_time() const;
};
class keyspace {

View File

@@ -27,7 +27,6 @@ public:
virtual std::optional<table> try_find_table(database db, table_id id) const = 0;
virtual const secondary_index::secondary_index_manager& get_index_manager(table t) const = 0;
virtual schema_ptr get_table_schema(table t) const = 0;
virtual db_clock::time_point get_truncation_time(table t) const = 0;
virtual lw_shared_ptr<keyspace_metadata> get_keyspace_metadata(keyspace ks) const = 0;
virtual bool is_internal(keyspace ks) const = 0;
virtual const locator::abstract_replication_strategy& get_replication_strategy(keyspace ks) const = 0;

View File

@@ -1,20 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "mutation/mutation.hh"
#include "utils/UUID.hh"
namespace db {
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id);
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id);
}

View File

@@ -10,24 +10,20 @@
#include <chrono>
#include <exception>
#include <ranges>
#include <seastar/core/future-util.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/sleep.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include "batchlog_manager.hh"
#include "batchlog.hh"
#include "data_dictionary/data_dictionary.hh"
#include "mutation/canonical_mutation.hh"
#include "service/storage_proxy.hh"
#include "system_keyspace.hh"
#include "utils/rate_limiter.hh"
#include "utils/log.hh"
#include "utils/murmur_hash.hh"
#include "db_clock.hh"
#include "unimplemented.hh"
#include "idl/frozen_schema.dist.hh"
@@ -37,94 +33,17 @@
#include "cql3/untyped_result_set.hh"
#include "service_permit.hh"
#include "cql3/query_processor.hh"
#include "replica/database.hh"
static logging::logger blogger("batchlog_manager");
namespace db {
// Yields 256 batchlog shards. Even on the largest nodes we currently run on,
// this should be enough to give every core a batchlog partition.
static constexpr unsigned batchlog_shard_bits = 8;
int32_t batchlog_shard_of(db_clock::time_point written_at) {
const int64_t count = written_at.time_since_epoch().count();
std::array<uint64_t, 2> result;
utils::murmur_hash::hash3_x64_128(bytes_view(reinterpret_cast<const signed char*>(&count), sizeof(count)), 0, result);
uint64_t hash = result[0] ^ result[1];
return hash & ((1ULL << batchlog_shard_bits) - 1);
}
std::pair<partition_key, clustering_key>
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, int32_t batchlog_shard, db_clock::time_point written_at, std::optional<utils::UUID> id) {
auto pkey = partition_key::from_exploded(schema, {serialized(version), serialized(int8_t(stage)), serialized(batchlog_shard)});
std::vector<bytes> ckey_components;
ckey_components.reserve(2);
ckey_components.push_back(serialized(written_at));
if (id) {
ckey_components.push_back(serialized(*id));
}
auto ckey = clustering_key::from_exploded(schema, ckey_components);
return {std::move(pkey), std::move(ckey)};
}
std::pair<partition_key, clustering_key>
get_batchlog_key(const schema& schema, int32_t version, db::batchlog_stage stage, db_clock::time_point written_at, std::optional<utils::UUID> id) {
return get_batchlog_key(schema, version, stage, batchlog_shard_of(written_at), written_at, id);
}
mutation get_batchlog_mutation_for(schema_ptr schema, managed_bytes data, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
auto timestamp = api::new_timestamp();
mutation m(schema, key);
// Avoid going through data_value and therefore `bytes`, as it can be large (#24809).
auto cdef_data = schema->get_column_definition(to_bytes("data"));
m.set_cell(ckey, *cdef_data, atomic_cell::make_live(*cdef_data->type, timestamp, std::move(data)));
return m;
}
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
auto data = [&mutations] {
utils::chunked_vector<canonical_mutation> fm(mutations.begin(), mutations.end());
bytes_ostream out;
for (auto& m : fm) {
ser::serialize(out, m);
}
return std::move(out).to_managed_bytes();
}();
return get_batchlog_mutation_for(std::move(schema), std::move(data), version, stage, now, id);
}
mutation get_batchlog_mutation_for(schema_ptr schema, const utils::chunked_vector<mutation>& mutations, int32_t version, db_clock::time_point now, const utils::UUID& id) {
return get_batchlog_mutation_for(std::move(schema), mutations, version, batchlog_stage::initial, now, id);
}
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db::batchlog_stage stage, db_clock::time_point now, const utils::UUID& id) {
auto [key, ckey] = get_batchlog_key(*schema, version, stage, now, id);
mutation m(schema, key);
auto timestamp = api::new_timestamp();
m.partition().apply_delete(*schema, ckey, tombstone(timestamp, gc_clock::now()));
return m;
}
mutation get_batchlog_delete_mutation(schema_ptr schema, int32_t version, db_clock::time_point now, const utils::UUID& id) {
return get_batchlog_delete_mutation(std::move(schema), version, batchlog_stage::initial, now, id);
}
} // namespace db
const std::chrono::seconds db::batchlog_manager::replay_interval;
const uint32_t db::batchlog_manager::page_size;
db::batchlog_manager::batchlog_manager(cql3::query_processor& qp, db::system_keyspace& sys_ks, batchlog_manager_config config)
: _qp(qp)
, _sys_ks(sys_ks)
, _replay_timeout(config.replay_timeout)
, _write_request_timeout(std::chrono::duration_cast<db_clock::duration>(config.write_request_timeout))
, _replay_rate(config.replay_rate)
, _delay(config.delay)
, _replay_cleanup_after_replays(config.replay_cleanup_after_replays)
@@ -233,75 +152,18 @@ future<> db::batchlog_manager::stop() {
}
future<size_t> db::batchlog_manager::count_all_batches() const {
sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
sstring query = format("SELECT count(*) FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG);
return _qp.execute_internal(query, cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> rs) {
return size_t(rs->one().get_as<int64_t>("count"));
});
}
future<> db::batchlog_manager::maybe_migrate_v1_to_v2() {
if (_migration_done) {
return make_ready_future<>();
}
return with_gate(_gate, [this] () mutable -> future<> {
blogger.info("Migrating batchlog entries from v1 -> v2");
auto schema_v1 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
auto schema_v2 = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
auto batch = [this, schema_v1, schema_v2] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
// check version of serialization format
if (!row.has("version")) {
blogger.warn("Not migrating logged batch because of unknown version");
co_return stop_iteration::no;
}
auto version = row.get_as<int32_t>("version");
if (version != netw::messaging_service::current_version) {
blogger.warn("Not migrating logged batch because of incorrect version");
co_return stop_iteration::no;
}
auto id = row.get_as<utils::UUID>("id");
auto written_at = row.get_as<db_clock::time_point>("written_at");
auto data = row.get_blob_fragmented("data");
auto& sp = _qp.proxy();
utils::get_local_injector().inject("batchlog_manager_fail_migration", [] { throw std::runtime_error("Error injection: failing batchlog migration"); });
auto migrate_mut = get_batchlog_mutation_for(schema_v2, std::move(data), version, batchlog_stage::failed_replay, written_at, id);
co_await sp.mutate_locally(migrate_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
mutation delete_mut(schema_v1, partition_key::from_single_value(*schema_v1, serialized(id)));
delete_mut.partition().apply_delete(*schema_v1, clustering_key_prefix::make_empty(), tombstone(api::new_timestamp(), gc_clock::now()));
co_await sp.mutate_locally(delete_mut, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
co_return stop_iteration::no;
};
try {
co_await _qp.query_internal(
format("SELECT * FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
db::consistency_level::ONE,
{},
page_size,
std::move(batch));
} catch (...) {
blogger.warn("Batchlog v1 to v2 migration failed: {}; will retry", std::current_exception());
co_return;
}
co_await container().invoke_on_all([] (auto& bm) {
bm._migration_done = true;
});
blogger.info("Done migrating batchlog entries from v1 -> v2");
});
db_clock::duration db::batchlog_manager::get_batch_log_timeout() const {
// enough time for the actual write + BM removal mutation
return _write_request_timeout * 2;
}
future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches(post_replay_cleanup cleanup) {
co_await maybe_migrate_v1_to_v2();
typedef db_clock::rep clock_type;
db::all_batches_replayed all_replayed = all_batches_replayed::yes;
@@ -310,26 +172,21 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
auto throttle = _replay_rate / _qp.proxy().get_token_metadata_ptr()->count_normal_token_owners();
auto limiter = make_lw_shared<utils::rate_limiter>(throttle);
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
struct replay_stats {
std::optional<db_clock::time_point> min_too_fresh;
bool need_cleanup = false;
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG);
auto delete_batch = [this, schema = std::move(schema)] (utils::UUID id) {
auto key = partition_key::from_singular(*schema, id);
mutation m(schema, key);
auto now = service::client_state(service::client_state::internal_tag()).get_timestamp();
m.partition().apply_delete(*schema, clustering_key_prefix::make_empty(), tombstone(now, gc_clock::now()));
return _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
};
std::unordered_map<int32_t, replay_stats> replay_stats_per_shard;
// Use a stable `now` across all batches, so skip/replay decisions are the
// same across a while prefix of written_at (across all ids).
const auto now = db_clock::now();
auto batch = [this, cleanup, limiter, schema, &all_replayed, &replay_stats_per_shard, now] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
const auto stage = static_cast<batchlog_stage>(row.get_as<int8_t>("stage"));
const auto batch_shard = row.get_as<int32_t>("shard");
auto batch = [this, limiter, delete_batch = std::move(delete_batch), &all_replayed](const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
auto written_at = row.get_as<db_clock::time_point>("written_at");
auto id = row.get_as<utils::UUID>("id");
// enough time for the actual write + batchlog entry mutation delivery (two separate requests).
auto timeout = _replay_timeout;
auto now = db_clock::now();
auto timeout = get_batch_log_timeout();
if (utils::get_local_injector().is_enabled("skip_batch_replay")) {
blogger.debug("Skipping batch replay due to skip_batch_replay injection");
@@ -337,48 +194,52 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
co_return stop_iteration::no;
}
// check version of serialization format
if (!row.has("version")) {
blogger.warn("Skipping logged batch because of unknown version");
co_await delete_batch(id);
co_return stop_iteration::no;
}
auto version = row.get_as<int32_t>("version");
if (version != netw::messaging_service::current_version) {
blogger.warn("Skipping logged batch because of incorrect version {}; current version = {}", version, netw::messaging_service::current_version);
co_await delete_batch(id);
co_return stop_iteration::no;
}
auto data = row.get_blob_unfragmented("data");
blogger.debug("Replaying batch {} from stage {} and batch shard {}", id, int32_t(stage), batch_shard);
utils::chunked_vector<mutation> mutations;
bool send_failed = false;
auto& shard_written_at = replay_stats_per_shard.try_emplace(batch_shard, replay_stats{}).first->second;
blogger.debug("Replaying batch {}", id);
try {
utils::chunked_vector<std::pair<canonical_mutation, schema_ptr>> fms;
auto fms = make_lw_shared<std::deque<canonical_mutation>>();
auto in = ser::as_input_stream(data);
while (in.size()) {
auto fm = ser::deserialize(in, std::type_identity<canonical_mutation>());
const auto tbl = _qp.db().try_find_table(fm.column_family_id());
if (!tbl) {
continue;
}
if (written_at <= tbl->get_truncation_time()) {
continue;
}
schema_ptr s = tbl->schema();
if (s->tombstone_gc_options().mode() == tombstone_gc_mode::repair) {
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
}
fms.emplace_back(std::move(fm), std::move(s));
fms->emplace_back(ser::deserialize(in, std::type_identity<canonical_mutation>()));
schema_ptr s = _qp.db().find_schema(fms->back().column_family_id());
timeout = std::min(timeout, std::chrono::duration_cast<db_clock::duration>(s->tombstone_gc_options().propagation_delay_in_seconds()));
}
if (now < written_at + timeout) {
blogger.debug("Skipping replay of {}, too fresh", id);
shard_written_at.min_too_fresh = std::min(shard_written_at.min_too_fresh.value_or(written_at), written_at);
co_return stop_iteration::no;
}
auto size = data.size();
for (const auto& [fm, s] : fms) {
mutations.emplace_back(fm.to_mutation(s));
co_await maybe_yield();
}
auto mutations = co_await map_reduce(*fms, [this, written_at] (canonical_mutation& fm) {
const auto& cf = _qp.proxy().local_db().find_column_family(fm.column_family_id());
return make_ready_future<canonical_mutation*>(written_at > cf.get_truncation_time() ? &fm : nullptr);
},
utils::chunked_vector<mutation>(),
[this] (utils::chunked_vector<mutation> mutations, canonical_mutation* fm) {
if (fm) {
schema_ptr s = _qp.db().find_schema(fm->column_family_id());
mutations.emplace_back(fm->to_mutation(s));
}
return mutations;
});
if (!mutations.empty()) {
const auto ttl = [written_at]() -> clock_type {
@@ -404,11 +265,7 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
co_await limiter->reserve(size);
_stats.write_attempts += mutations.size();
auto timeout = db::timeout_clock::now() + write_timeout;
if (cleanup) {
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(mutations, timeout);
} else {
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
}
co_await _qp.proxy().send_batchlog_replay_to_all_replicas(std::move(mutations), timeout);
}
}
} catch (data_dictionary::no_such_keyspace& ex) {
@@ -422,80 +279,31 @@ future<db::all_batches_replayed> db::batchlog_manager::replay_all_failed_batches
// Do _not_ remove the batch, assuning we got a node write error.
// Since we don't have hints (which origin is satisfied with),
// we have to resort to keeping this batch to next lap.
if (!cleanup || stage == batchlog_stage::failed_replay) {
co_return stop_iteration::no;
}
send_failed = true;
co_return stop_iteration::no;
}
auto& sp = _qp.proxy();
if (send_failed) {
blogger.debug("Moving batch {} to stage failed_replay", id);
auto m = get_batchlog_mutation_for(schema, mutations, netw::messaging_service::current_version, batchlog_stage::failed_replay, written_at, id);
co_await sp.mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
}
// delete batch
auto m = get_batchlog_delete_mutation(schema, netw::messaging_service::current_version, stage, written_at, id);
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
shard_written_at.need_cleanup = true;
co_await delete_batch(id);
co_return stop_iteration::no;
};
co_await with_gate(_gate, [this, cleanup, &all_replayed, batch = std::move(batch), now, &replay_stats_per_shard] () mutable -> future<> {
blogger.debug("Started replayAllFailedBatches with cleanup: {}", cleanup);
co_await with_gate(_gate, [this, cleanup, batch = std::move(batch)] () mutable -> future<> {
blogger.debug("Started replayAllFailedBatches (cpu {})", this_shard_id());
co_await utils::get_local_injector().inject("add_delay_to_batch_replay", std::chrono::milliseconds(1000));
auto schema = _qp.db().find_schema(system_keyspace::NAME, system_keyspace::BATCHLOG_V2);
co_await coroutine::parallel_for_each(std::views::iota(0, 16), [&] (int32_t chunk) -> future<> {
const int32_t batchlog_chunk_base = chunk * 16;
for (int32_t i = 0; i < 16; ++i) {
int32_t batchlog_shard = batchlog_chunk_base + i;
co_await _qp.query_internal(
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
db::consistency_level::ONE,
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::failed_replay)), data_value(batchlog_shard)},
page_size,
batch);
co_await _qp.query_internal(
format("SELECT * FROM {}.{} WHERE version = ? AND stage = ? AND shard = ? BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG_V2),
db::consistency_level::ONE,
{data_value(netw::messaging_service::current_version), data_value(int8_t(batchlog_stage::initial)), data_value(batchlog_shard)},
page_size,
batch);
if (cleanup != post_replay_cleanup::yes) {
continue;
}
auto it = replay_stats_per_shard.find(batchlog_shard);
if (it == replay_stats_per_shard.end() || !it->second.need_cleanup) {
// Nothing was replayed on this batchlog shard, nothing to cleanup.
continue;
}
const auto write_time = it->second.min_too_fresh.value_or(now - _replay_timeout);
const auto end_weight = it->second.min_too_fresh ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed;
auto [key, ckey] = get_batchlog_key(*schema, netw::messaging_service::current_version, batchlog_stage::initial, batchlog_shard, write_time, {});
auto end_pos = position_in_partition(partition_region::clustered, end_weight, std::move(ckey));
range_tombstone rt(position_in_partition::before_all_clustered_rows(), std::move(end_pos), tombstone(api::new_timestamp(), gc_clock::now()));
blogger.trace("Clean up batchlog shard {} with range tombstone {}", batchlog_shard, rt);
mutation m(schema, key);
m.partition().apply_row_tombstone(*schema, std::move(rt));
co_await _qp.proxy().mutate_locally(m, tracing::trace_state_ptr(), db::commitlog::force_sync::no);
co_await _qp.query_internal(
format("SELECT id, data, written_at, version FROM {}.{} BYPASS CACHE", system_keyspace::NAME, system_keyspace::BATCHLOG),
db::consistency_level::ONE,
{},
page_size,
std::move(batch)).then([this, cleanup] {
if (cleanup == post_replay_cleanup::no) {
return make_ready_future<>();
}
// Replaying batches could have generated tombstones, flush to disk,
// where they can be compacted away.
return replica::database::flush_table_on_all_shards(_qp.proxy().get_db(), system_keyspace::NAME, system_keyspace::BATCHLOG);
}).then([] {
blogger.debug("Finished replayAllFailedBatches");
});
blogger.debug("Finished replayAllFailedBatches with all_replayed: {}", all_replayed);
});
co_return all_replayed;

View File

@@ -34,17 +34,12 @@ class system_keyspace;
using all_batches_replayed = bool_class<struct all_batches_replayed_tag>;
struct batchlog_manager_config {
db_clock::duration replay_timeout;
std::chrono::duration<double> write_request_timeout;
uint64_t replay_rate = std::numeric_limits<uint64_t>::max();
std::chrono::milliseconds delay = std::chrono::milliseconds(0);
unsigned replay_cleanup_after_replays;
};
enum class batchlog_stage : int8_t {
initial,
failed_replay
};
class batchlog_manager : public peering_sharded_service<batchlog_manager> {
public:
using post_replay_cleanup = bool_class<class post_replay_cleanup_tag>;
@@ -64,7 +59,7 @@ private:
cql3::query_processor& _qp;
db::system_keyspace& _sys_ks;
db_clock::duration _replay_timeout;
db_clock::duration _write_request_timeout;
uint64_t _replay_rate;
std::chrono::milliseconds _delay;
unsigned _replay_cleanup_after_replays = 100;
@@ -76,14 +71,6 @@ private:
gc_clock::time_point _last_replay;
// Was the v1 -> v2 migration already done since last restart?
// The migration is attempted once after each restart. This is redundant but
// keeps thing simple. Once no upgrade path exists from a ScyllaDB version
// which can still produce v1 entries, this migration code can be removed.
bool _migration_done = false;
future<> maybe_migrate_v1_to_v2();
future<all_batches_replayed> replay_all_failed_batches(post_replay_cleanup cleanup);
public:
// Takes a QP, not a distributes. Because this object is supposed
@@ -98,13 +85,10 @@ public:
future<all_batches_replayed> do_batch_log_replay(post_replay_cleanup cleanup);
future<size_t> count_all_batches() const;
db_clock::duration get_batch_log_timeout() const;
gc_clock::time_point get_last_replay() const {
return _last_replay;
}
const stats& stats() const {
return _stats;
}
private:
future<> batchlog_replay_loop();
};

View File

@@ -54,14 +54,12 @@ public:
uint64_t applied_mutations = 0;
uint64_t corrupt_bytes = 0;
uint64_t truncated_at = 0;
uint64_t broken_files = 0;
stats& operator+=(const stats& s) {
invalid_mutations += s.invalid_mutations;
skipped_mutations += s.skipped_mutations;
applied_mutations += s.applied_mutations;
corrupt_bytes += s.corrupt_bytes;
broken_files += s.broken_files;
return *this;
}
stats operator+(const stats& s) const {
@@ -194,8 +192,6 @@ db::commitlog_replayer::impl::recover(const commitlog::descriptor& d, const comm
s->corrupt_bytes += e.bytes();
} catch (commitlog::segment_truncation& e) {
s->truncated_at = e.position();
} catch (commitlog::header_checksum_error&) {
++s->broken_files;
} catch (...) {
throw;
}
@@ -374,9 +370,6 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files, sstring fna
if (stats.truncated_at != 0) {
rlogger.warn("Truncated file: {} at position {}.", f, stats.truncated_at);
}
if (stats.broken_files != 0) {
rlogger.warn("Corrupted file header: {}. Skipped.", f);
}
rlogger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
, f
, stats.applied_mutations

View File

@@ -1105,14 +1105,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Like native_transport_port, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_shard_aware_transport_port_ssl(this, "native_shard_aware_transport_port_ssl", value_status::Used, 19142,
"Like native_transport_port_ssl, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_transport_port_proxy_protocol(this, "native_transport_port_proxy_protocol", value_status::Used, 0,
"Port on which the CQL native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
, native_transport_port_ssl_proxy_protocol(this, "native_transport_port_ssl_proxy_protocol", value_status::Used, 0,
"Port on which the CQL TLS native transport listens for clients using proxy protocol v2. Disabled (0) by default.")
, native_shard_aware_transport_port_proxy_protocol(this, "native_shard_aware_transport_port_proxy_protocol", value_status::Used, 0,
"Like native_transport_port_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_shard_aware_transport_port_ssl_proxy_protocol(this, "native_shard_aware_transport_port_ssl_proxy_protocol", value_status::Used, 0,
"Like native_transport_port_ssl_proxy_protocol, but clients-side port number (modulo smp) is used to route the connection to the specific shard.")
, native_transport_max_threads(this, "native_transport_max_threads", value_status::Invalid, 128,
"The maximum number of thread handling requests. The meaning is the same as rpc_max_threads.\n"
"Default is different (128 versus unlimited).\n"
@@ -1160,7 +1152,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Number of threads with which to deliver hints. In multiple data-center deployments, consider increasing this number because cross data-center handoff is generally slower.")
, batchlog_replay_throttle_in_kb(this, "batchlog_replay_throttle_in_kb", value_status::Unused, 1024,
"Total maximum throttle. Throttling is reduced proportionally to the number of nodes in the cluster.")
, batchlog_replay_cleanup_after_replays(this, "batchlog_replay_cleanup_after_replays", liveness::LiveUpdate, value_status::Used, 1,
, batchlog_replay_cleanup_after_replays(this, "batchlog_replay_cleanup_after_replays", liveness::LiveUpdate, value_status::Used, 60,
"Clean up batchlog memtable after every N replays. Replays are issued on a timer, every 60 seconds. So if batchlog_replay_cleanup_after_replays is set to 60, the batchlog memtable is flushed every 60 * 60 seconds.")
/**
* @Group Request scheduler properties
@@ -1478,15 +1470,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_max_expression_cache_entries_per_shard(this, "alternator_max_expression_cache_entries_per_shard", liveness::LiveUpdate, value_status::Used, 2000, "Maximum number of cached parsed request expressions, per shard.")
, alternator_max_users_query_size_in_trace_output(this, "alternator_max_users_query_size_in_trace_output", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"Maximum size of user's command in trace output (`alternator_op` entry). Larger traces will be truncated and have `<truncated>` message appended - which doesn't count to the maximum limit.")
, alternator_describe_table_info_cache_validity_in_seconds(this, "alternator_describe_table_info_cache_validity_in_seconds", liveness::LiveUpdate, value_status::Used, 60 * 60 * 6,
"The validity of DescribeTable information - table size in bytes. This is how long calculated value will be reused before recalculation.")
, alternator_response_gzip_compression_level(this, "alternator_response_gzip_compression_level", liveness::LiveUpdate, value_status::Used, int8_t(6),
"Controls gzip and deflate compression level for Alternator response bodies (if the client requests it via Accept-Encoding header) Default of 6 is a compromise between speed and compression.\n"
"Valid values:\n"
"\t0 : No compression (disables gzip/deflate)\n"
"\t1-9: Compression levels (1 = fastest, 9 = best compression)")
, alternator_response_compression_threshold_in_bytes(this, "alternator_response_compression_threshold_in_bytes", liveness::LiveUpdate, value_status::Used, uint64_t(4096),
"When the compression is enabled, this value indicates the minimum size of data to compress. Smaller responses will not be compressed.")
, abort_on_ebadf(this, "abort_on_ebadf", value_status::Used, true, "Abort the server on incorrect file descriptor access. Throws exception when disabled.")
, sanitizer_report_backtrace(this, "sanitizer_report_backtrace", value_status::Used, false,
"In debug mode, report log-structured allocator sanitizer violations with a backtrace. Slow.")
@@ -1583,12 +1566,6 @@ db::config::config(std::shared_ptr<db::extensions> exts)
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
, tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60,
"Tablet load stats refresh rate in seconds.")
, force_capacity_based_balancing(this, "force_capacity_based_balancing", liveness::LiveUpdate, value_status::Used, false,
"Forces the load balancer to perform capacity based balancing, instead of size based balancing.")
, size_based_balance_threshold_percentage(this, "size_based_balance_threshold_percentage", liveness::LiveUpdate, value_status::Used, 1.0,
"Sets the maximum difference in percentages between the most loaded and least loaded nodes, below which the load balancer considers nodes balanced.")
, minimal_tablet_size_for_balancing(this, "minimal_tablet_size_for_balancing", liveness::LiveUpdate, value_status::Used, service::default_target_tablet_size / 100,
"Sets the minimal tablet size for the load balancer. For any tablet smaller than this, the balancer will use this size instead of the actual tablet size.")
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")

View File

@@ -324,10 +324,6 @@ public:
named_value<uint16_t> native_transport_port_ssl;
named_value<uint16_t> native_shard_aware_transport_port;
named_value<uint16_t> native_shard_aware_transport_port_ssl;
named_value<uint16_t> native_transport_port_proxy_protocol;
named_value<uint16_t> native_transport_port_ssl_proxy_protocol;
named_value<uint16_t> native_shard_aware_transport_port_proxy_protocol;
named_value<uint16_t> native_shard_aware_transport_port_ssl_proxy_protocol;
named_value<uint32_t> native_transport_max_threads;
named_value<uint32_t> native_transport_max_frame_size_in_mb;
named_value<sstring> broadcast_rpc_address;
@@ -477,9 +473,6 @@ public:
named_value<bool> alternator_allow_system_table_write;
named_value<uint32_t> alternator_max_expression_cache_entries_per_shard;
named_value<uint64_t> alternator_max_users_query_size_in_trace_output;
named_value<uint32_t> alternator_describe_table_info_cache_validity_in_seconds;
named_value<int> alternator_response_gzip_compression_level;
named_value<uint32_t> alternator_response_compression_threshold_in_bytes;
named_value<bool> abort_on_ebadf;
@@ -597,9 +590,6 @@ public:
named_value<bool> rf_rack_valid_keyspaces;
named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds;
named_value<bool> force_capacity_based_balancing;
named_value<float> size_based_balance_threshold_percentage;
named_value<uint64_t> minimal_tablet_size_for_balancing;
static const sstring default_tls_priority;
private:

View File

@@ -248,7 +248,7 @@ future<db::commitlog> hint_endpoint_manager::add_store() noexcept {
// which is larger than the segment ID of the RP of the last written hint.
cfg.base_segment_id = _last_written_rp.base_id();
return commitlog::create_commitlog(std::move(cfg)).then([this] (this auto, commitlog l) -> future<commitlog> {
return commitlog::create_commitlog(std::move(cfg)).then([this] (commitlog l) -> future<commitlog> {
// add_store() is triggered every time hint files are forcefully flushed to I/O (every hints_flush_period).
// When this happens we want to refill _sender's segments only if it has finished with the segments he had before.
if (_sender.have_segments()) {

View File

@@ -26,7 +26,6 @@
#include <seastar/core/smp.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/util/file.hh>
// Boost features.
@@ -644,12 +643,6 @@ future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept
co_return;
}
if (!replay_allowed()) {
auto reason = seastar::format("Precondition violdated while trying to drain {} / {}: "
"hint replay is not allowed", host_id, ip);
on_internal_error(manager_logger, std::move(reason));
}
manager_logger.info("Draining starts for {}", host_id);
const auto holder = seastar::gate::holder{_draining_eps_gate};
@@ -906,7 +899,7 @@ future<> manager::migrate_ip_directories() {
co_await coroutine::parallel_for_each(dirs_to_remove, [] (auto& directory) -> future<> {
try {
manager_logger.warn("Removing hint directory {}", directory.native());
co_await seastar::recursive_remove_directory(directory);
co_await lister::rmdir(directory);
} catch (...) {
on_internal_error(manager_logger,
seastar::format("Removing a hint directory has failed. Reason: {}", std::current_exception()));

View File

@@ -318,10 +318,6 @@ public:
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
/// corresponding hint_endpoint_manager objects.
///
/// Preconditions:
/// * Hint replay must be allowed (i.e. `replay_allowed()` must be true) throughout
/// the execution of this function.
///
/// \param host_id host ID of the node that left the cluster
/// \param ip the IP of the node that left the cluster
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
@@ -346,15 +342,15 @@ public:
return _state.contains(state::started);
}
bool replay_allowed() const noexcept {
return _state.contains(state::replay_allowed);
}
private:
void set_started() noexcept {
_state.set(state::started);
}
bool replay_allowed() const noexcept {
return _state.contains(state::replay_allowed);
}
void set_draining_all() noexcept {
_state.set(state::draining_all);
}

View File

@@ -135,5 +135,5 @@ const std::string db::object_storage_endpoint_param::gs_type = "gs";
auto fmt::formatter<db::object_storage_endpoint_param>::format(const db::object_storage_endpoint_param& e, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{}", e.to_json_string());
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{{}}", e.to_json_string());
}

View File

@@ -1262,9 +1262,16 @@ static future<> do_merge_schema(sharded<service::storage_proxy>& proxy, sharded
{
slogger.trace("do_merge_schema: {}", mutations);
schema_applier ap(proxy, ss, sys_ks, reload);
co_await execute_do_merge_schema(proxy, ap, std::move(mutations)).finally([&ap]() {
return ap.destroy();
});
std::exception_ptr ex;
try {
co_await execute_do_merge_schema(proxy, ap, std::move(mutations));
} catch (...) {
ex = std::current_exception();
}
co_await ap.destroy();
if (ex) {
throw ex;
}
}
/**

View File

@@ -152,8 +152,7 @@ future<> backup_task_impl::do_backup() {
}
future<> backup_task_impl::process_snapshot_dir() {
auto directory = co_await io_check(open_directory, _snapshot_dir.native());
auto snapshot_dir_lister = directory_lister(directory, _snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
auto snapshot_dir_lister = directory_lister(_snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
size_t num_sstable_comps = 0;
try {
@@ -162,7 +161,7 @@ future<> backup_task_impl::process_snapshot_dir() {
while (auto component_ent = co_await snapshot_dir_lister.get()) {
const auto& name = component_ent->name;
auto file_path = _snapshot_dir / name;
auto st = co_await file_stat(directory, name);
auto st = co_await file_stat(file_path.native());
total += st.size;
try {
auto desc = sstables::parse_path(file_path, "", "");

View File

@@ -55,7 +55,6 @@
#include "message/shared_dict.hh"
#include "replica/database.hh"
#include "db/compaction_history_entry.hh"
#include "mutation/async_utils.hh"
#include <unordered_map>
@@ -111,7 +110,6 @@ namespace {
system_keyspace::v3::CDC_LOCAL,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
system_keyspace::CLIENT_ROUTES,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
props.enable_schema_commitlog();
@@ -139,7 +137,6 @@ namespace {
system_keyspace::ROLE_PERMISSIONS,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
system_keyspace::CLIENT_ROUTES,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
props.is_group0_table = true;
@@ -216,30 +213,6 @@ schema_ptr system_keyspace::batchlog() {
return batchlog;
}
schema_ptr system_keyspace::batchlog_v2() {
static thread_local auto batchlog_v2 = [] {
schema_builder builder(generate_legacy_id(NAME, BATCHLOG_V2), NAME, BATCHLOG_V2,
// partition key
{{"version", int32_type}, {"stage", byte_type}, {"shard", int32_type}},
// clustering key
{{"written_at", timestamp_type}, {"id", uuid_type}},
// regular columns
{{"data", bytes_type}},
// static columns
{},
// regular column name type
utf8_type,
// comment
"batches awaiting replay"
);
builder.set_gc_grace_seconds(0);
builder.set_caching_options(caching_options::get_disabled_caching_options());
builder.with_hash_version();
return builder.build(schema_builder::compact_storage::no);
}();
return batchlog_v2;
}
/*static*/ schema_ptr system_keyspace::paxos() {
static thread_local auto paxos = [] {
// FIXME: switch to the new schema_builder interface (with_column(...), etc)
@@ -312,7 +285,6 @@ schema_ptr system_keyspace::topology() {
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
.with_column("upgrade_state", utf8_type, column_kind::static_column)
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
.with_column("paused_rf_change_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
.set_comment("Current state of topology change machine")
.with_hash_version()
.build();
@@ -1419,23 +1391,6 @@ schema_ptr system_keyspace::view_building_tasks() {
return schema;
}
schema_ptr system_keyspace::client_routes() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, CLIENT_ROUTES);
return schema_builder(NAME, CLIENT_ROUTES, std::make_optional(id))
.with_column("connection_id", utf8_type, column_kind::partition_key)
.with_column("host_id", uuid_type, column_kind::clustering_key)
.with_column("address", utf8_type)
.with_column("port", int32_type)
.with_column("tls_port", int32_type)
.with_column("alternator_port", int32_type)
.with_column("alternator_https_port", int32_type)
.with_hash_version()
.build();
}();
return schema;
}
future<system_keyspace::local_info> system_keyspace::load_local_info() {
auto msg = co_await execute_cql(format("SELECT host_id, cluster_name, data_center, rack FROM system.{} WHERE key=?", LOCAL), sstring(LOCAL));
@@ -2349,7 +2304,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
std::copy(schema_tables.begin(), schema_tables.end(), std::back_inserter(r));
auto auth_tables = system_keyspace::auth_tables();
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
r.insert(r.end(), { built_indexes(), hints(), batchlog(), batchlog_v2(), paxos(), local(),
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
peers(), peer_events(), range_xfers(),
compactions_in_progress(), compaction_history(),
sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(),
@@ -2363,7 +2318,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
v3::cdc_local(),
raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(),
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(), view_build_status_v2(),
dicts(), view_building_tasks(), client_routes(), cdc_streams_state(), cdc_streams_history()
dicts(), view_building_tasks(), cdc_streams_state(), cdc_streams_history()
});
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
@@ -2380,9 +2335,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
}
static bool maybe_write_in_user_memory(schema_ptr s) {
return (s.get() == system_keyspace::batchlog().get())
|| (s.get() == system_keyspace::batchlog_v2().get())
|| (s.get() == system_keyspace::paxos().get())
return (s.get() == system_keyspace::batchlog().get()) || (s.get() == system_keyspace::paxos().get())
|| s == system_keyspace::v3::scylla_views_builds_in_progress();
}
@@ -3000,9 +2953,7 @@ future<mutation> system_keyspace::get_group0_history(sharded<replica::database>&
SCYLLA_ASSERT(rs);
auto& ps = rs->partitions();
for (auto& p: ps) {
// Note: we could decorate the frozen_mutation's key to check if it's the expected one
// but since this is a single partition table, we can just check after unfreezing the whole mutation.
auto mut = co_await unfreeze_gently(p.mut(), s);
auto mut = p.mut().unfreeze(s);
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
if (partition_key == GROUP0_HISTORY_KEY) {
co_return mut;
@@ -3160,10 +3111,7 @@ static bool must_have_tokens(service::node_state nst) {
// A decommissioning node doesn't have tokens at the end, they are
// removed during transition to the left_token_ring state.
case service::node_state::decommissioning: return false;
// A removing node might or might not have tokens depending on whether
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
// cases, we allow removing nodes to not have tokens.
case service::node_state::removing: return false;
case service::node_state::removing: return true;
case service::node_state::rebuilding: return true;
case service::node_state::normal: return true;
case service::node_state::left: return false;
@@ -3403,12 +3351,6 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
}
}
if (some_row.has("paused_rf_change_requests")) {
for (auto&& v : deserialize_set_column(*topology(), some_row, "paused_rf_change_requests")) {
ret.paused_rf_change_requests.insert(value_cast<utils::UUID>(v));
}
}
if (some_row.has("enabled_features")) {
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
}
@@ -3620,43 +3562,35 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
return entry;
}
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id) {
auto r = co_await get_topology_request_entry_opt(id);
if (!r) {
on_internal_error(slogger, format("no entry for request id {}", id));
}
co_return std::move(*r);
}
future<std::optional<system_keyspace::topology_requests_entry>> system_keyspace::get_topology_request_entry_opt(utils::UUID id) {
future<system_keyspace::topology_requests_entry> system_keyspace::get_topology_request_entry(utils::UUID id, bool require_entry) {
auto rs = co_await execute_cql(
format("SELECT * FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
if (!rs || rs->empty()) {
co_return std::nullopt;
if (require_entry) {
on_internal_error(slogger, format("no entry for request id {}", id));
} else {
co_return topology_requests_entry{
.id = utils::null_uuid()
};
}
}
const auto& row = rs->one();
co_return topology_request_row_to_entry(id, row);
}
future<system_keyspace::topology_requests_entries> system_keyspace::get_topology_request_entries(std::vector<std::variant<service::topology_request, service::global_topology_request>> request_types, db_clock::time_point end_time_limit) {
sstring request_types_str = "";
bool first = true;
for (const auto& rt : request_types) {
if (!std::exchange(first, false)) {
request_types_str += ", ";
}
request_types_str += std::visit([] (auto&& arg) { return fmt::format("'{}'", arg); }, rt);
}
future<system_keyspace::topology_requests_entries> system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) {
// Running requests.
auto rs_running = co_await execute_cql(
format("SELECT * FROM system.{} WHERE done = false AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, request_types_str));
format("SELECT * FROM system.{} WHERE done = false AND request_type IN ('{}', '{}', '{}', '{}', '{}') ALLOW FILTERING", TOPOLOGY_REQUESTS,
service::topology_request::join, service::topology_request::replace, service::topology_request::rebuild, service::topology_request::leave, service::topology_request::remove));
// Requests which finished after end_time_limit.
auto rs_done = co_await execute_cql(
format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ({}) ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(), request_types_str));
format("SELECT * FROM system.{} WHERE end_time > {} AND request_type IN ('{}', '{}', '{}', '{}', '{}') ALLOW FILTERING", TOPOLOGY_REQUESTS, end_time_limit.time_since_epoch().count(),
service::topology_request::join, service::topology_request::replace, service::topology_request::rebuild, service::topology_request::leave, service::topology_request::remove));
topology_requests_entries m;
for (const auto& row: *rs_done) {
@@ -3674,16 +3608,6 @@ future<system_keyspace::topology_requests_entries> system_keyspace::get_topology
co_return m;
}
future<system_keyspace::topology_requests_entries> system_keyspace::get_node_ops_request_entries(db_clock::time_point end_time_limit) {
return get_topology_request_entries({
service::topology_request::join,
service::topology_request::replace,
service::topology_request::rebuild,
service::topology_request::leave,
service::topology_request::remove
}, end_time_limit);
}
future<mutation> system_keyspace::get_insert_dict_mutation(
std::string_view name,
bytes data,

View File

@@ -163,7 +163,6 @@ public:
static constexpr auto NAME = "system";
static constexpr auto HINTS = "hints";
static constexpr auto BATCHLOG = "batchlog";
static constexpr auto BATCHLOG_V2 = "batchlog_v2";
static constexpr auto PAXOS = "paxos";
static constexpr auto BUILT_INDEXES = "IndexInfo";
static constexpr auto LOCAL = "local";
@@ -199,8 +198,6 @@ public:
static constexpr auto VIEW_BUILD_STATUS_V2 = "view_build_status_v2";
static constexpr auto DICTS = "dicts";
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
static constexpr auto CLIENT_ROUTES = "client_routes";
static constexpr auto VERSIONS = "versions";
// auth
static constexpr auto ROLES = "roles";
@@ -258,7 +255,6 @@ public:
static schema_ptr hints();
static schema_ptr batchlog();
static schema_ptr batchlog_v2();
static schema_ptr paxos();
static schema_ptr built_indexes(); // TODO (from Cassandra): make private
static schema_ptr raft();
@@ -278,7 +274,6 @@ public:
static schema_ptr view_build_status_v2();
static schema_ptr dicts();
static schema_ptr view_building_tasks();
static schema_ptr client_routes();
// auth
static schema_ptr roles();
@@ -670,9 +665,7 @@ public:
future<service::topology_request_state> get_topology_request_state(utils::UUID id, bool require_entry);
topology_requests_entry topology_request_row_to_entry(utils::UUID id, const cql3::untyped_result_set_row& row);
future<topology_requests_entry> get_topology_request_entry(utils::UUID id);
future<std::optional<topology_requests_entry>> get_topology_request_entry_opt(utils::UUID id);
future<system_keyspace::topology_requests_entries> get_topology_request_entries(std::vector<std::variant<service::topology_request, service::global_topology_request>> request_types, db_clock::time_point end_time_limit);
future<topology_requests_entry> get_topology_request_entry(utils::UUID id, bool require_entry);
future<topology_requests_entries> get_node_ops_request_entries(db_clock::time_point end_time_limit);
public:

View File

@@ -1744,115 +1744,6 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
&& std::ranges::contains(shards, this_shard_id());
}
static endpoints_to_update get_view_natural_endpoint_vnodes(
locator::host_id me,
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
locator::endpoint_dc_rack my_location,
const locator::network_topology_strategy* network_topology,
replica::cf_stats& cf_stats) {
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector base_endpoints, view_endpoints;
auto& my_datacenter = my_location.dc;
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (!network_topology || node.get().dc() == my_datacenter) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
view_endpoints.push_back(view_node);
}
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
}
static std::optional<locator::host_id> get_unpaired_view_endpoint(
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
replica::cf_stats& cf_stats) {
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
for (auto&& base_node : base_nodes) {
if (base_dc_racks.contains(base_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
base_node.get().dc(), base_node.get().rack());
return std::nullopt;
}
base_dc_racks.insert(base_node.get().dc_rack());
}
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
for (auto&& view_node : view_nodes) {
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
view_node.get().dc(), view_node.get().rack());
return std::nullopt;
}
// Track unpaired replicas in both sets
if (base_dc_racks.contains(view_node.get().dc_rack())) {
paired_view_dc_racks.insert(view_node.get().dc_rack());
} else {
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
}
}
if (unpaired_view_dc_rack_replicas.size() > 0) {
// There are view replicas that can't be paired with any base replica
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
if (unpaired_view_dc_rack_replicas.size() > 0) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
unpaired_view_dc_rack_replicas | std::views::values);
}
return extra_replica;
}
return std::nullopt;
}
// Calculate the node ("natural endpoint") to which this node should send
// a view update.
//
@@ -1865,19 +1756,29 @@ static std::optional<locator::host_id> get_unpaired_view_endpoint(
// of this function is to find, assuming that this node is one of the base
// replicas for a given partition, the paired view replica.
//
// When using vnodes, we have an optimization called "self-pairing" - if a single
// node is both a base replica and a view replica for a write, the pairing is
// modified so that this node sends the update to itself and this node is removed
// from the lists of nodes paired by index. This self-pairing optimization can
// cause the pairing to change after view ranges are moved between nodes.
// In the past, we used an optimization called "self-pairing" that if a single
// node was both a base replica and a view replica for a write, the pairing is
// modified so that this node would send the update to itself. This self-
// pairing optimization could cause the pairing to change after view ranges
// are moved between nodes, so currently we only use it if
// use_legacy_self_pairing is set to true. When using tablets - where range
// movements are common - it is strongly recommended to set it to false.
//
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
// we pair only nodes in the same datacenter.
//
// If the table uses tablets, then pairing is rack-aware. In this case, in each
// rack where we have a base replica there is also one replica of each view tablet.
// Therefore, the base replicas are naturally paired with the view replicas that
// are in the same rack.
// When use_legacy_self_pairing is enabled, if one of the base replicas
// also happens to be a view replica, it is paired with itself
// (with the other nodes paired by order in the list
// after taking this node out).
//
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
// and the replication factor in the node's datacenter is a multiple of the number
// of racks in the datacenter, then pairing is rack-aware. In this case,
// all racks have the same number of replicas, and those are never migrated
// outside their racks. Therefore, the base replicas are naturally paired with the
// view replicas that are in the same rack, based on the ordinal position.
// Note that typically, there is a single replica per rack and pairing is trivial.
//
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
@@ -1905,12 +1806,19 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_rack_aware_view_pairing,
replica::cf_stats& cf_stats) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
auto& my_location = topology.get_location(me);
auto& my_datacenter = my_location.dc;
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
bool simple_rack_aware_pairing = false;
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector orig_base_endpoints, orig_view_endpoints;
node_vector base_endpoints, view_endpoints;
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
if (auto* np = topology.find_node(ep)) {
@@ -1921,7 +1829,6 @@ endpoints_to_update get_view_natural_endpoint(
// We need to use get_replicas() for pairing to be stable in case base or view tablet
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
return resolve(topology, ep, false);
}) | std::ranges::to<node_vector>();
@@ -1945,43 +1852,231 @@ endpoints_to_update get_view_natural_endpoint(
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
auto leaving_base = it->get().host_id();
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
view_token, use_tablets, cf_stats);
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
}
}
}
if (!use_tablets) {
return get_view_natural_endpoint_vnodes(
me,
base_nodes,
view_nodes,
my_location,
network_topology,
cf_stats);
std::function<bool(const locator::node&)> is_candidate;
if (network_topology) {
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
} else {
is_candidate = [&] (const locator::node&) { return true; };
}
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (is_candidate(node)) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
std::optional<locator::host_id> paired_replica;
for (auto&& view_node : view_nodes) {
if (view_node.get().dc_rack() == my_location) {
paired_replica = view_node.get().host_id();
break;
if (use_legacy_self_pairing) {
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (is_candidate(view_node)) {
view_endpoints.push_back(view_node);
}
}
} else {
for (auto&& view_node : view_nodes) {
process_candidate(view_endpoints, view_node);
}
}
if (paired_replica && base_nodes.size() == view_nodes.size()) {
// We don't need to find any extra replicas, so we can return early
return {.natural_endpoint = paired_replica};
// Try optimizing for simple rack-aware pairing
// If the numbers of base and view replica differ, that means an RF change is taking place
// and we can't use simple rack-aware pairing.
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
// Simple rack-aware pairing is possible when the datacenter replication factor
// is a multiple of the number of racks in the datacenter.
if (dc_rf % racks.size() == 0) {
simple_rack_aware_pairing = true;
size_t rack_rf = dc_rf / racks.size();
// If any rack doesn't have enough nodes to satisfy the per-rack rf
// simple rack-aware pairing is disabled.
for (const auto& [rack, nodes] : racks) {
if (nodes.size() < rack_rf) {
simple_rack_aware_pairing = false;
break;
}
}
}
if (dc_rf != base_endpoints.size()) {
// If the datacenter replication factor is not equal to the number of base replicas,
// we're in progress of a RF change and we can't use simple rack-aware pairing.
simple_rack_aware_pairing = false;
}
if (simple_rack_aware_pairing) {
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
}
}
if (!paired_replica) {
// We couldn't find any view replica in our rack
orig_base_endpoints = base_endpoints;
orig_view_endpoints = view_endpoints;
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
// Use best-match, for the minimum number of base and view replicas in each rack,
// and ordinal match for the rest.
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
if (rack_aware_pairing && !simple_rack_aware_pairing) {
struct indexed_replica {
size_t idx;
std::reference_wrapper<const locator::node> node;
};
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
// First, index all replicas by rack
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
size_t idx = 0;
for (const auto& r: replicas) {
racks[r.get().rack()].emplace_back(idx++, r);
}
};
index_replica_set(base_racks, base_endpoints);
index_replica_set(view_racks, view_endpoints);
// Try optimistically pairing `me` first
const auto& my_base_replicas = base_racks[my_location.rack];
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
if (base_it == my_base_replicas.end()) {
return {};
}
const auto& my_view_replicas = view_racks[my_location.rack];
size_t idx = base_it - my_base_replicas.begin();
if (idx < my_view_replicas.size()) {
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
} else {
// If the number of view replicas is larger than the number of base replicas,
// we need to find the unpaired view replica, so we can't return yet.
paired_replica = my_view_replicas[idx].node;
}
}
// Collect all unpaired base and view replicas,
// where the number of replicas in the base rack is different than the respective view rack
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
for (const auto& [rack, base_replicas] : base_racks) {
const auto& view_replicas = view_racks[rack];
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
unpaired_base_replicas.emplace_back(base_replicas[i]);
}
}
for (const auto& [rack, view_replicas] : view_racks) {
const auto& base_replicas = base_racks[rack];
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
unpaired_view_replicas.emplace_back(view_replicas[i]);
}
}
// Sort by the original ordinality, and copy the sorted results
// back into {base,view}_endpoints, for backward compatible processing below.
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
base_endpoints.clear();
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
view_endpoints.clear();
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (!paired_replica && base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
if (!paired_replica && idx >= view_endpoints.size()) {
// There are fewer view replicas than base replicas
// FIXME: This might still happen when reducing replication factor with tablets,
// see https://github.com/scylladb/scylladb/issues/21492
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
me,
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
} else if (base_endpoints.size() < view_endpoints.size()) {
// There are fewer base replicas than view replicas.
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_endpoints.back();
if (base_endpoints.size() < view_endpoints.size() - 1) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
}
}
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
return {.natural_endpoint = paired_replica,
.endpoint_with_no_pairing = no_pairing_replica};
if (!paired_replica) {
paired_replica = view_endpoints[idx];
}
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
// This can happen when the view replica with no pairing is in another DC.
// We need to send an update to it if there are no base replicas in that DC yet,
// as it won't receive updates otherwise.
std::unordered_set<sstring> dcs_with_base_replicas;
for (const auto& base_node : base_nodes) {
dcs_with_base_replicas.insert(base_node.get().dc());
}
for (const auto& view_node : view_nodes) {
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_node;
break;
}
}
}
// https://github.com/scylladb/scylladb/issues/19439
// With tablets, a node being replaced might transition to "left" state
// but still be kept as a replica.
// As of writing this hints are not prepared to handle nodes that are left
// but are still replicas. Therefore, there is no other sensible option
// right now but to give up attempt to send the update or write a hint
// to the paired, permanently down replica.
// We use the same workaround for the extra replica.
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
if (!replica) {
return std::nullopt;
}
const auto& node = replica->get();
if (!node.left()) {
return node.host_id();
} else {
return std::nullopt;
}
};
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
}
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
@@ -2041,6 +2136,12 @@ future<> view_update_generator::mutate_MV(
{
auto& ks = _db.find_keyspace(base->ks_name());
auto& replication = ks.get_replication_strategy();
// We set legacy self-pairing for old vnode-based tables (for backward
// compatibility), and unset it for tablets - where range movements
// are more frequent and backward compatibility is less important.
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
// on a view, like we have the synchronous_updates_flag.
bool use_legacy_self_pairing = !ks.uses_tablets();
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
auto get_erm = [&] (table_id id) {
auto it = erms.find(id);
@@ -2053,6 +2154,10 @@ future<> view_update_generator::mutate_MV(
for (const auto& mut : view_updates) {
(void)get_erm(mut.s->id());
}
// Enable rack-aware view updates pairing for tablets
// when the cluster feature is enabled so that all replicas agree
// on the pairing algorithm.
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
auto me = base_ermp->get_topology().my_host_id();
static constexpr size_t max_concurrent_updates = 128;
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
@@ -2060,7 +2165,7 @@ future<> view_update_generator::mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto view_ermp = erms.at(mut.s->id());
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
ks.uses_tablets(), cf_stats);
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
if (no_pairing_endpoint) {

View File

@@ -305,7 +305,8 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_tablets,
bool use_legacy_self_pairing,
bool use_tablets_basic_rack_aware_view_pairing,
replica::cf_stats& cf_stats);
/// Verify that the provided keyspace is eligible for storing materialized views.

View File

@@ -198,7 +198,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
future<> view_building_worker::run_staging_sstables_registrator() {
while (!_as.abort_requested()) {
bool sleep = false;
try {
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
co_await create_staging_sstable_tasks();
@@ -215,14 +214,6 @@ future<> view_building_worker::run_staging_sstables_registrator() {
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
} catch (raft::request_aborted&) {
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
} catch (...) {
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
sleep = true;
}
if (sleep) {
vbw_logger.debug("Sleeping after exception.");
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
}
}
}
@@ -426,12 +417,9 @@ future<> view_building_worker::check_for_aborted_tasks() {
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
auto it = vbw._state._batch->tasks.begin();
while (it != vbw._state._batch->tasks.end()) {
auto id = it->first;
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
++it; // Advance the iterator before potentially removing the entry from the map.
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
for (auto& [id, t]: tasks_map) {
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
if (!task_opt || task_opt->get().aborted) {
co_await vbw._state._batch->abort_task(id);
}
@@ -461,7 +449,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
}) | std::ranges::to<std::unordered_set>();;
}
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
// clear the state, save and flush new base table
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
if (processing_base_table != building_state.currently_processed_base_table) {
@@ -583,6 +571,8 @@ future<> view_building_worker::batch::do_work() {
break;
}
}
_vbw.local()._vb_state_machine.event.broadcast();
}
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
@@ -784,15 +774,13 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
tasks.insert({id, *task_opt});
}
#ifdef SEASTAR_DEBUG
{
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
#endif
@@ -823,6 +811,25 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
co_return collect_completed_tasks();
}
}
}

View File

@@ -605,8 +605,8 @@ public:
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("version", utf8_type)
.with_column("build_mode", utf8_type)
@@ -749,7 +749,6 @@ class clients_table : public streaming_virtual_table {
.with_column("ssl_protocol", utf8_type)
.with_column("username", utf8_type)
.with_column("scheduling_group", utf8_type)
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_hash_version()
.build();
}
@@ -767,7 +766,7 @@ class clients_table : public streaming_virtual_table {
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Collect
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
using client_data_vec = utils::chunked_vector<client_data>;
using shard_client_data = std::vector<client_data_vec>;
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
cd_vec.resize(smp::count);
@@ -807,13 +806,13 @@ class clients_table : public streaming_virtual_table {
for (unsigned i = 0; i < smp::count; i++) {
for (auto&& ps_cdc : *cd_vec[i]) {
for (auto&& cd : ps_cdc) {
if (cd_map.contains(cd->ip)) {
cd_map[cd->ip].emplace_back(std::move(cd));
if (cd_map.contains(cd.ip)) {
cd_map[cd.ip].emplace_back(std::move(cd));
} else {
dht::decorated_key key = make_partition_key(cd->ip);
dht::decorated_key key = make_partition_key(cd.ip);
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
ips.insert(decorated_ip{std::move(key), cd->ip});
cd_map[cd->ip].emplace_back(std::move(cd));
ips.insert(decorated_ip{std::move(key), cd.ip});
cd_map[cd.ip].emplace_back(std::move(cd));
}
}
co_await coroutine::maybe_yield();
@@ -826,58 +825,39 @@ class clients_table : public streaming_virtual_table {
co_await result.emit_partition_start(dip.key);
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
return a->port < b->port || a->client_type_str() < b->client_type_str();
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
return a.port < b.port || a.client_type_str() < b.client_type_str();
});
for (const auto& cd : clients) {
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
set_cell(cr.cells(), "shard_id", cd->shard_id);
set_cell(cr.cells(), "connection_stage", cd->stage_str());
if (cd->driver_name) {
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
set_cell(cr.cells(), "shard_id", cd.shard_id);
set_cell(cr.cells(), "connection_stage", cd.stage_str());
if (cd.driver_name) {
set_cell(cr.cells(), "driver_name", *cd.driver_name);
}
if (cd->driver_version) {
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
if (cd.driver_version) {
set_cell(cr.cells(), "driver_version", *cd.driver_version);
}
if (cd->hostname) {
set_cell(cr.cells(), "hostname", *cd->hostname);
if (cd.hostname) {
set_cell(cr.cells(), "hostname", *cd.hostname);
}
if (cd->protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
if (cd.protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
}
if (cd->ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
if (cd.ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
}
if (cd->ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
if (cd.ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
}
if (cd->ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
if (cd.ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
}
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
if (cd->scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
if (cd.scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
}
auto map_type = map_type_impl::get_instance(
utf8_type,
utf8_type,
false
);
auto prepare_client_options = [] (const auto& client_options) {
map_type_impl::native_type tmp;
for (auto& co: client_options) {
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
tmp.push_back(std::move(map_element));
}
return tmp;
};
set_cell(cr.cells(), "client_options",
make_map_value(map_type, prepare_client_options(cd->client_options)));
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();
@@ -1120,10 +1100,9 @@ public:
}
auto tm = _db.local().get_token_metadata_ptr();
auto target_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
const uint64_t default_tablet_size = _db.local().get_config().target_tablet_size_in_bytes();
locator::load_sketch load(tm, stats, default_tablet_size);
locator::load_sketch load(tm);
co_await load.populate();
tm->get_topology().for_each_node([&] (const auto& node) {
@@ -1137,23 +1116,18 @@ public:
if (auto ip = _gossiper.local().get_address_map().find(host)) {
set_cell(r.cells(), "ip", data_value(inet_address(*ip)));
}
set_cell(r.cells(), "tablets_allocated", int64_t(load.get_tablet_count(host)));
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_tablet_count(host))));
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_tablet_count(host) * default_tablet_size)));
set_cell(r.cells(), "tablets_allocated", load.get_load(host));
set_cell(r.cells(), "tablets_allocated_per_shard", data_value(double(load.get_real_avg_shard_load(host))));
set_cell(r.cells(), "storage_allocated_load", data_value(int64_t(load.get_load(host) * target_tablet_size)));
if (stats && stats->capacity.contains(host)) {
auto capacity = stats->capacity.at(host);
set_cell(r.cells(), "storage_capacity", data_value(int64_t(capacity)));
if (auto utilization = load.get_allocated_utilization(host)) {
auto utilization = load.get_allocated_utilization(host, *stats, target_tablet_size);
if (utilization) {
set_cell(r.cells(), "storage_allocated_utilization", data_value(double(*utilization)));
}
if (load.has_complete_data(host)) {
if (auto utilization = load.get_storage_utilization(host)) {
set_cell(r.cells(), "storage_utilization", data_value(double(*utilization)));
}
set_cell(r.cells(), "storage_load", data_value(int64_t(load.get_disk_used(host))));
}
}
mutation_sink(m);
});
@@ -1173,8 +1147,6 @@ private:
.with_column("storage_capacity", long_type)
.with_column("storage_allocated_load", long_type)
.with_column("storage_allocated_utilization", double_type)
.with_column("storage_load", long_type)
.with_column("storage_utilization", double_type)
.with_sharder(1, 0) // shard0-only
.with_hash_version()
.build();

Some files were not shown because too many files have changed in this diff Show More