Compare commits
41 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91dd15d3bf | ||
|
|
c06e63daed | ||
|
|
c1c3b2c5bb | ||
|
|
7e7e378a4b | ||
|
|
77ee7f3417 | ||
|
|
0ff89a58be | ||
|
|
f7ffa395a8 | ||
|
|
3fa3b920de | ||
|
|
e7ca52ee79 | ||
|
|
730eca5dac | ||
|
|
c8cff94a5a | ||
|
|
5fae4cdf80 | ||
|
|
8bbcaacba1 | ||
|
|
3dfa5ebd7f | ||
|
|
24264e24bb | ||
|
|
0c64e3be9a | ||
|
|
d5b63df46e | ||
|
|
f545ed37bc | ||
|
|
5f13880a91 | ||
|
|
11ad32c85e | ||
|
|
4c8c9cd548 | ||
|
|
98f431dd81 | ||
|
|
4ffdb0721f | ||
|
|
775906d749 | ||
|
|
11eca621b0 | ||
|
|
d7818b56df | ||
|
|
033fed5734 | ||
|
|
c6c30b7d0a | ||
|
|
5afcec4a3d | ||
|
|
d3e199984e | ||
|
|
8822c23ad4 | ||
|
|
be9992cfb3 | ||
|
|
daf00a7f24 | ||
|
|
62962f33bb | ||
|
|
060c2f7c0d | ||
|
|
64149b57c3 | ||
|
|
4b004fcdfc | ||
|
|
5e38b3071b | ||
|
|
225b3351fc | ||
|
|
c30b326033 | ||
|
|
807fc68dc5 |
8
.github/CODEOWNERS
vendored
8
.github/CODEOWNERS
vendored
@@ -1,5 +1,5 @@
|
||||
# AUTH
|
||||
auth/* @nuivall @ptrsmrn
|
||||
auth/* @nuivall
|
||||
|
||||
# CACHE
|
||||
row_cache* @tgrabiec
|
||||
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
|
||||
transport/*
|
||||
|
||||
# CQL QUERY LANGUAGE
|
||||
cql3/* @tgrabiec @nuivall @ptrsmrn
|
||||
cql3/* @tgrabiec @nuivall
|
||||
|
||||
# COUNTERS
|
||||
counters* @nuivall @ptrsmrn
|
||||
tests/counter_test* @nuivall @ptrsmrn
|
||||
counters* @nuivall
|
||||
tests/counter_test* @nuivall
|
||||
|
||||
# DOCS
|
||||
docs/* @annastuchlik @tzach
|
||||
|
||||
2
.github/scripts/auto-backport.py
vendored
2
.github/scripts/auto-backport.py
vendored
@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
|
||||
if is_draft:
|
||||
labels_to_add.append("conflicts")
|
||||
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
|
||||
pr_comment += "Please resolve them and mark this PR as ready for review"
|
||||
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
|
||||
backport_pr.create_issue_comment(pr_comment)
|
||||
|
||||
# Apply all labels at once if we have any
|
||||
|
||||
@@ -18,7 +18,7 @@ jobs:
|
||||
|
||||
// Regular expression pattern to check for "Fixes" prefix
|
||||
// Adjusted to dynamically insert the repository full name
|
||||
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
|
||||
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
|
||||
const regex = new RegExp(pattern);
|
||||
|
||||
if (!regex.test(body)) {
|
||||
|
||||
10
.github/workflows/docs-validate-metrics.yml
vendored
10
.github/workflows/docs-validate-metrics.yml
vendored
@@ -7,7 +7,7 @@ on:
|
||||
- enterprise
|
||||
paths:
|
||||
- '**/*.cc'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/metrics-config.yml'
|
||||
- 'scripts/get_description.py'
|
||||
- 'docs/_ext/scylladb_metrics.py'
|
||||
|
||||
@@ -15,20 +15,20 @@ jobs:
|
||||
validate-metrics:
|
||||
runs-on: ubuntu-latest
|
||||
name: Check metrics documentation coverage
|
||||
|
||||
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: '3.10'
|
||||
|
||||
|
||||
- name: Install dependencies
|
||||
run: pip install PyYAML
|
||||
|
||||
|
||||
- name: Validate metrics
|
||||
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml
|
||||
|
||||
5
.github/workflows/trigger-scylla-ci.yaml
vendored
5
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -3,10 +3,13 @@ name: Trigger Scylla CI Route
|
||||
on:
|
||||
issue_comment:
|
||||
types: [created]
|
||||
pull_request_target:
|
||||
types:
|
||||
- unlabeled
|
||||
|
||||
jobs:
|
||||
trigger-jenkins:
|
||||
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
|
||||
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Trigger Scylla-CI-Route Jenkins Job
|
||||
|
||||
@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
|
||||
if (!comparison_operator.IsString()) {
|
||||
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
|
||||
}
|
||||
std::string op = comparison_operator.GetString();
|
||||
std::string op = rjson::to_string(comparison_operator);
|
||||
auto it = ops.find(op);
|
||||
if (it == ops.end()) {
|
||||
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
|
||||
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
|
||||
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
|
||||
}
|
||||
if (kv1.name == "S") {
|
||||
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
|
||||
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
|
||||
return cmp(rjson::to_string_view(kv1.value),
|
||||
rjson::to_string_view(kv2.value));
|
||||
}
|
||||
if (kv1.name == "B") {
|
||||
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
|
||||
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
|
||||
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "S") {
|
||||
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
|
||||
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
|
||||
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
|
||||
return check_BETWEEN(rjson::to_string_view(kv_v.value),
|
||||
rjson::to_string_view(kv_lb.value),
|
||||
rjson::to_string_view(kv_ub.value),
|
||||
bounds_from_query);
|
||||
}
|
||||
if (kv_v.name == "B") {
|
||||
|
||||
@@ -8,6 +8,8 @@
|
||||
|
||||
#include "consumed_capacity.hh"
|
||||
#include "error.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace alternator {
|
||||
|
||||
@@ -32,12 +34,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
|
||||
if (!return_consumed->IsString()) {
|
||||
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
|
||||
}
|
||||
std::string consumed = return_consumed->GetString();
|
||||
std::string_view consumed = rjson::to_string_view(*return_consumed);
|
||||
if (consumed == "INDEXES") {
|
||||
throw api_error::validation("INDEXES consumed capacity is not supported");
|
||||
}
|
||||
if (consumed != "TOTAL") {
|
||||
throw api_error::validation("Unknown consumed capacity "+ consumed);
|
||||
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -419,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
|
||||
if (!table_name_value->IsString()) {
|
||||
throw api_error::validation("Non-string TableName field in request");
|
||||
}
|
||||
std::string table_name = table_name_value->GetString();
|
||||
std::string table_name = rjson::to_string(*table_name_value);
|
||||
return table_name;
|
||||
}
|
||||
|
||||
@@ -546,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
// does exist but the index does not (ValidationException).
|
||||
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
|
||||
throw api_error::validation(
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
|
||||
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
|
||||
} else {
|
||||
throw api_error::resource_not_found(
|
||||
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
|
||||
@@ -587,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
|
||||
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
|
||||
attribute_name, value));
|
||||
}
|
||||
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
|
||||
return rjson::to_string(*attribute_value);
|
||||
}
|
||||
|
||||
// Convenience function for getting the value of a boolean attribute, or a
|
||||
@@ -1080,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
|
||||
}
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (attribute_info["AttributeName"].GetString() == name) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
data_type dt = parse_key_type(type);
|
||||
if (computed_column) {
|
||||
// Computed column for GSI (doesn't choose a real column as-is
|
||||
@@ -1116,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First element of KeySchema must be an object");
|
||||
}
|
||||
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
|
||||
throw api_error::validation("First key in KeySchema must be a HASH key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[0], "AttributeName");
|
||||
@@ -1124,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
|
||||
throw api_error::validation("First key in KeySchema must have string AttributeName");
|
||||
}
|
||||
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
|
||||
std::string hash_key = v->GetString();
|
||||
std::string hash_key = rjson::to_string(*v);
|
||||
std::string range_key;
|
||||
if (key_schema->Size() == 2) {
|
||||
if (!(*key_schema)[1].IsObject()) {
|
||||
throw api_error::validation("Second element of KeySchema must be an object");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "KeyType");
|
||||
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
|
||||
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
|
||||
throw api_error::validation("Second key in KeySchema must be a RANGE key");
|
||||
}
|
||||
v = rjson::find((*key_schema)[1], "AttributeName");
|
||||
@@ -1887,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
|
||||
std::string def_type = type_to_string(def.type);
|
||||
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
|
||||
const rjson::value& attribute_info = *it;
|
||||
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
|
||||
auto type = attribute_info["AttributeType"].GetString();
|
||||
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
|
||||
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
|
||||
if (type != def_type) {
|
||||
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
|
||||
}
|
||||
@@ -2362,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
_cells = std::vector<cell>();
|
||||
_cells->reserve(item.MemberCount());
|
||||
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
validate_value(it->value, "PutItem");
|
||||
const column_definition* cdef = find_attribute(*schema, column_name);
|
||||
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
|
||||
@@ -2783,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
|
||||
return;
|
||||
}
|
||||
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
|
||||
if (!used.contains(it->name.GetString())) {
|
||||
if (!used.contains(rjson::to_string(it->name))) {
|
||||
throw api_error::validation(
|
||||
format("{} has spurious '{}', not used in {}",
|
||||
field_name, it->name.GetString(), operation));
|
||||
field_name, rjson::to_string_view(it->name), operation));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3000,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
|
||||
}
|
||||
|
||||
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
|
||||
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
|
||||
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
|
||||
try {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
@@ -3386,7 +3386,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
|
||||
}
|
||||
rjson::value newv = rjson::empty_object();
|
||||
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
|
||||
std::string attr = it->name.GetString();
|
||||
std::string attr = rjson::to_string(it->name);
|
||||
auto x = members.find(attr);
|
||||
if (x != members.end()) {
|
||||
if (x->second) {
|
||||
@@ -3606,7 +3606,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
|
||||
const rjson::value& attributes_to_get = req["AttributesToGet"];
|
||||
attrs_to_get ret;
|
||||
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
|
||||
attribute_path_map_add("AttributesToGet", ret, it->GetString());
|
||||
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
|
||||
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
|
||||
}
|
||||
if (ret.empty()) {
|
||||
@@ -4272,12 +4272,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
|
||||
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
|
||||
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
|
||||
// Note that it.key() is the name of the column, *it is the operation
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
bytes column_name = to_bytes(rjson::to_string_view(it->name));
|
||||
const column_definition* cdef = _schema->get_column_definition(column_name);
|
||||
if (cdef && cdef->is_primary_key()) {
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
|
||||
}
|
||||
std::string action = (it->value)["Action"].GetString();
|
||||
std::string action = rjson::to_string((it->value)["Action"]);
|
||||
if (action == "DELETE") {
|
||||
// The DELETE operation can do two unrelated tasks. Without a
|
||||
// "Value" option, it is used to delete an attribute. With a
|
||||
@@ -5474,7 +5474,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
std::vector<query::clustering_range> ck_bounds;
|
||||
|
||||
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
|
||||
std::string key = it->name.GetString();
|
||||
sstring key = rjson::to_sstring(it->name);
|
||||
const rjson::value& condition = it->value;
|
||||
|
||||
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
|
||||
@@ -5482,13 +5482,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
|
||||
|
||||
const column_definition& pk_cdef = schema->partition_key_columns().front();
|
||||
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
|
||||
if (sstring(key) == pk_cdef.name_as_text()) {
|
||||
if (key == pk_cdef.name_as_text()) {
|
||||
if (!partition_ranges.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
|
||||
}
|
||||
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
|
||||
if (ck_cdef && key == ck_cdef->name_as_text()) {
|
||||
if (!ck_bounds.empty()) {
|
||||
throw api_error::validation("Currently only a single restriction per key is allowed");
|
||||
}
|
||||
@@ -5889,7 +5889,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
|
||||
|
||||
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
|
||||
rjson::value* limit_json = rjson::find(request, "Limit");
|
||||
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
|
||||
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
|
||||
int limit = limit_json ? limit_json->GetInt() : 100;
|
||||
if (limit < 1 || limit > 100) {
|
||||
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");
|
||||
|
||||
@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
|
||||
return {"", nullptr};
|
||||
}
|
||||
auto it = v.MemberBegin();
|
||||
const std::string it_key = it->name.GetString();
|
||||
const std::string it_key = rjson::to_string(it->name);
|
||||
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
|
||||
return {std::move(it_key), nullptr};
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
|
||||
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
|
||||
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
|
||||
}
|
||||
sstring attribute_name(v->GetString(), v->GetStringLength());
|
||||
sstring attribute_name = rjson::to_sstring(*v);
|
||||
|
||||
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
|
||||
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
|
||||
|
||||
@@ -729,14 +729,6 @@
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"use_sstable_identifier",
|
||||
"description":"Use the sstable identifier UUID, if available, rather than the sstable generation.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
},
|
||||
@@ -3059,7 +3051,7 @@
|
||||
},
|
||||
{
|
||||
"name":"incremental_mode",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
|
||||
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -2020,16 +2020,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
auto tag = req->get_query_param("tag");
|
||||
auto column_families = split(req->get_query_param("cf"), ",");
|
||||
auto sfopt = req->get_query_param("sf");
|
||||
auto usiopt = req->get_query_param("use_sstable_identifier");
|
||||
db::snapshot_options opts = {
|
||||
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
|
||||
.use_sstable_identifier = strcasecmp(usiopt.c_str(), "true") == 0
|
||||
};
|
||||
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
|
||||
|
||||
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
|
||||
try {
|
||||
if (column_families.empty()) {
|
||||
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
|
||||
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
@@ -2037,7 +2033,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
|
||||
}
|
||||
co_return json_void();
|
||||
} catch (...) {
|
||||
@@ -2072,8 +2068,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
auto info = parse_scrub_options(ctx, std::move(req));
|
||||
|
||||
if (!info.snapshot_tag.empty()) {
|
||||
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
|
||||
}
|
||||
|
||||
compaction::compaction_stats stats;
|
||||
|
||||
@@ -146,8 +146,7 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
|
||||
auto info = parse_scrub_options(ctx, std::move(req));
|
||||
|
||||
if (!info.snapshot_tag.empty()) {
|
||||
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
|
||||
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#include "auth/allow_all_authenticator.hh"
|
||||
|
||||
#include "service/migration_manager.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
namespace auth {
|
||||
@@ -23,7 +22,6 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
|
||||
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "auth/common.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
@@ -30,7 +29,7 @@ extern const std::string_view allow_all_authenticator_name;
|
||||
|
||||
class allow_all_authenticator final : public authenticator {
|
||||
public:
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
|
||||
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {
|
||||
}
|
||||
|
||||
virtual future<> start() override {
|
||||
|
||||
@@ -35,14 +35,13 @@ static const class_registrator<auth::authenticator
|
||||
, cql3::query_processor&
|
||||
, ::service::raft_group0_client&
|
||||
, ::service::migration_manager&
|
||||
, auth::cache&
|
||||
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
|
||||
|
||||
enum class auth::certificate_authenticator::query_source {
|
||||
subject, altname
|
||||
};
|
||||
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
|
||||
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&)
|
||||
: _queries([&] {
|
||||
auto& conf = qp.db().get_config();
|
||||
auto queries = conf.auth_certificate_role_queries();
|
||||
@@ -77,9 +76,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
|
||||
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
|
||||
}
|
||||
continue;
|
||||
} catch (std::out_of_range&) {
|
||||
} catch (const std::out_of_range&) {
|
||||
// just fallthrough
|
||||
} catch (boost::regex_error&) {
|
||||
} catch (const boost::regex_error&) {
|
||||
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include <boost/regex_fwd.hpp> // IWYU pragma: keep
|
||||
|
||||
namespace cql3 {
|
||||
@@ -34,7 +33,7 @@ class certificate_authenticator : public authenticator {
|
||||
enum class query_source;
|
||||
std::vector<std::pair<query_source, boost::regex>> _queries;
|
||||
public:
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
~certificate_authenticator();
|
||||
|
||||
future<> start() override;
|
||||
|
||||
@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
|
||||
try {
|
||||
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
|
||||
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
|
||||
} catch (exceptions::already_exists_exception&) {}
|
||||
} catch (const exceptions::already_exists_exception&) {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
|
||||
} else {
|
||||
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
|
||||
}
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
|
||||
}
|
||||
}
|
||||
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
|
||||
[resource](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
|
||||
return make_ready_future();
|
||||
}
|
||||
|
||||
@@ -49,8 +49,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
|
||||
|
||||
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
|
||||
|
||||
@@ -64,14 +63,13 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
|
||||
password_authenticator::~password_authenticator() {
|
||||
}
|
||||
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: _qp(qp)
|
||||
, _group0_client(g0)
|
||||
, _migration_manager(mm)
|
||||
, _cache(cache)
|
||||
, _stopped(make_ready_future<>())
|
||||
, _superuser(default_superuser(qp.db().get_config()))
|
||||
, _hashing_worker(hashing_worker)
|
||||
{}
|
||||
|
||||
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
|
||||
@@ -330,20 +328,18 @@ future<authenticated_user> password_authenticator::authenticate(
|
||||
}
|
||||
salted_hash = role->salted_hash;
|
||||
}
|
||||
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
|
||||
return passwords::check(password, *salted_hash);
|
||||
});
|
||||
const bool password_match = co_await passwords::check(password, *salted_hash);
|
||||
if (!password_match) {
|
||||
throw exceptions::authentication_exception("Username and/or password are incorrect");
|
||||
}
|
||||
co_return username;
|
||||
} catch (std::system_error &) {
|
||||
} catch (const std::system_error &) {
|
||||
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
|
||||
} catch (exceptions::request_execution_exception& e) {
|
||||
} catch (const exceptions::request_execution_exception& e) {
|
||||
std::throw_with_nested(exceptions::authentication_exception(e.what()));
|
||||
} catch (exceptions::authentication_exception& e) {
|
||||
} catch (const exceptions::authentication_exception& e) {
|
||||
std::throw_with_nested(e);
|
||||
} catch (exceptions::unavailable_exception& e) {
|
||||
} catch (const exceptions::unavailable_exception& e) {
|
||||
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
|
||||
} catch (...) {
|
||||
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
#include "auth/passwords.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace db {
|
||||
class config;
|
||||
@@ -49,13 +48,12 @@ class password_authenticator : public authenticator {
|
||||
shared_promise<> _superuser_created_promise;
|
||||
// We used to also support bcrypt, SHA-256, and MD5 (ref. scylladb#24524).
|
||||
constexpr static auth::passwords::scheme _scheme = passwords::scheme::sha_512;
|
||||
utils::alien_worker& _hashing_worker;
|
||||
|
||||
public:
|
||||
static db::consistency_level consistency_for_user(std::string_view role_name);
|
||||
static std::string default_superuser(const db::config&);
|
||||
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
|
||||
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
|
||||
~password_authenticator();
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@
|
||||
*/
|
||||
|
||||
#include "auth/passwords.hh"
|
||||
#include "utils/crypt_sha512.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
#include <cerrno>
|
||||
|
||||
@@ -21,27 +23,48 @@ static thread_local crypt_data tlcrypt = {};
|
||||
|
||||
namespace detail {
|
||||
|
||||
void verify_hashing_output(const char * res) {
|
||||
if (!res || (res[0] == '*')) {
|
||||
throw std::system_error(errno, std::system_category());
|
||||
}
|
||||
}
|
||||
|
||||
void verify_scheme(scheme scheme) {
|
||||
const sstring random_part_of_salt = "aaaabbbbccccdddd";
|
||||
|
||||
const sstring salt = sstring(prefix_for_scheme(scheme)) + random_part_of_salt;
|
||||
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
|
||||
|
||||
if (e && (e[0] != '*')) {
|
||||
return;
|
||||
try {
|
||||
verify_hashing_output(e);
|
||||
} catch (const std::system_error& ex) {
|
||||
throw no_supported_schemes();
|
||||
}
|
||||
|
||||
throw no_supported_schemes();
|
||||
}
|
||||
|
||||
sstring hash_with_salt(const sstring& pass, const sstring& salt) {
|
||||
auto res = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
|
||||
if (!res || (res[0] == '*')) {
|
||||
throw std::system_error(errno, std::system_category());
|
||||
}
|
||||
verify_hashing_output(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt) {
|
||||
sstring res;
|
||||
// Only SHA-512 hashes for passphrases shorter than 256 bytes can be computed using
|
||||
// the __crypt_sha512 method. For other computations, we fall back to the
|
||||
// crypt_r implementation from `<crypt.h>`, which can stall.
|
||||
if (salt.starts_with(prefix_for_scheme(scheme::sha_512)) && pass.size() <= 255) {
|
||||
char buf[128];
|
||||
const char * output_ptr = co_await __crypt_sha512(pass.c_str(), salt.c_str(), buf);
|
||||
verify_hashing_output(output_ptr);
|
||||
res = output_ptr;
|
||||
} else {
|
||||
const char * output_ptr = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
|
||||
verify_hashing_output(output_ptr);
|
||||
res = output_ptr;
|
||||
}
|
||||
co_return res;
|
||||
}
|
||||
|
||||
std::string_view prefix_for_scheme(scheme c) noexcept {
|
||||
switch (c) {
|
||||
case scheme::bcrypt_y: return "$2y$";
|
||||
@@ -58,8 +81,9 @@ no_supported_schemes::no_supported_schemes()
|
||||
: std::runtime_error("No allowed hashing schemes are supported on this system") {
|
||||
}
|
||||
|
||||
bool check(const sstring& pass, const sstring& salted_hash) {
|
||||
return detail::hash_with_salt(pass, salted_hash) == salted_hash;
|
||||
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash) {
|
||||
const auto pwd_hash = co_await detail::hash_with_salt_async(pass, salted_hash);
|
||||
co_return pwd_hash == salted_hash;
|
||||
}
|
||||
|
||||
} // namespace auth::passwords
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include <random>
|
||||
#include <stdexcept>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
|
||||
#include "seastarx.hh"
|
||||
@@ -75,10 +76,19 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
|
||||
|
||||
///
|
||||
/// Hash a password combined with an implementation-specific salt string.
|
||||
/// Deprecated in favor of `hash_with_salt_async`.
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
[[deprecated("Use hash_with_salt_async instead")]] sstring hash_with_salt(const sstring& pass, const sstring& salt);
|
||||
|
||||
///
|
||||
/// Async version of `hash_with_salt` that returns a future.
|
||||
/// If possible, hashing uses `coroutine::maybe_yield` to prevent reactor stalls.
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt);
|
||||
|
||||
} // namespace detail
|
||||
|
||||
@@ -107,6 +117,6 @@ sstring hash(const sstring& pass, RandomNumberEngine& g, scheme scheme) {
|
||||
///
|
||||
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
|
||||
///
|
||||
bool check(const sstring& pass, const sstring& salted_hash);
|
||||
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash);
|
||||
|
||||
} // namespace auth::passwords
|
||||
|
||||
@@ -35,10 +35,9 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
cache&,
|
||||
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
|
||||
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
|
||||
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
|
||||
: _socket_path(qp.db().get_config().saslauthd_socket_path())
|
||||
{}
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
|
||||
#include "auth/authenticator.hh"
|
||||
#include "auth/cache.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
@@ -30,7 +29,7 @@ namespace auth {
|
||||
class saslauthd_authenticator : public authenticator {
|
||||
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
|
||||
public:
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
|
||||
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
|
||||
|
||||
future<> start() override;
|
||||
|
||||
|
||||
@@ -191,8 +191,7 @@ service::service(
|
||||
::service::migration_manager& mm,
|
||||
const service_config& sc,
|
||||
maintenance_socket_enabled used_by_maintenance_socket,
|
||||
cache& cache,
|
||||
utils::alien_worker& hashing_worker)
|
||||
cache& cache)
|
||||
: service(
|
||||
std::move(c),
|
||||
cache,
|
||||
@@ -200,7 +199,7 @@ service::service(
|
||||
g0,
|
||||
mn,
|
||||
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
|
||||
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
|
||||
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
|
||||
used_by_maintenance_socket) {
|
||||
}
|
||||
@@ -226,7 +225,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
|
||||
try {
|
||||
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
|
||||
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
|
||||
} catch (::service::group0_concurrent_modification&) {
|
||||
} catch (const ::service::group0_concurrent_modification&) {
|
||||
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
#include "cql3/description.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "utils/alien_worker.hh"
|
||||
#include "utils/observable.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "service/maintenance_mode.hh"
|
||||
@@ -131,8 +130,7 @@ public:
|
||||
::service::migration_manager&,
|
||||
const service_config&,
|
||||
maintenance_socket_enabled,
|
||||
cache&,
|
||||
utils::alien_worker&);
|
||||
cache&);
|
||||
|
||||
future<> start(::service::migration_manager&, db::system_keyspace&);
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
|
||||
{_superuser},
|
||||
cql3::query_processor::cache_internal::no).discard_result();
|
||||
log.info("Created default superuser role '{}'.", _superuser);
|
||||
} catch(const exceptions::unavailable_exception& e) {
|
||||
} catch (const exceptions::unavailable_exception& e) {
|
||||
log.warn("Skipped default role setup: some nodes were not ready; will retry");
|
||||
throw e;
|
||||
}
|
||||
|
||||
@@ -38,8 +38,8 @@ class transitional_authenticator : public authenticator {
|
||||
public:
|
||||
static const sstring PASSWORD_AUTHENTICATOR_NAME;
|
||||
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
|
||||
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
|
||||
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
|
||||
}
|
||||
transitional_authenticator(std::unique_ptr<authenticator> a)
|
||||
: _authenticator(std::move(a)) {
|
||||
@@ -81,7 +81,7 @@ public:
|
||||
}).handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
@@ -126,7 +126,7 @@ public:
|
||||
virtual bytes evaluate_response(bytes_view client_response) override {
|
||||
try {
|
||||
return _sasl->evaluate_response(client_response);
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
_complete = true;
|
||||
return {};
|
||||
}
|
||||
@@ -141,7 +141,7 @@ public:
|
||||
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (exceptions::authentication_exception&) {
|
||||
} catch (const exceptions::authentication_exception&) {
|
||||
// return anon user
|
||||
return make_ready_future<authenticated_user>(anonymous_user());
|
||||
}
|
||||
@@ -241,8 +241,7 @@ static const class_registrator<
|
||||
cql3::query_processor&,
|
||||
::service::raft_group0_client&,
|
||||
::service::migration_manager&,
|
||||
auth::cache&,
|
||||
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
|
||||
|
||||
static const class_registrator<
|
||||
auth::authorizer,
|
||||
|
||||
@@ -859,6 +859,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/alien_worker.cc',
|
||||
'utils/array-search.cc',
|
||||
'utils/base64.cc',
|
||||
'utils/crypt_sha512.cc',
|
||||
'utils/logalloc.cc',
|
||||
'utils/large_bitset.cc',
|
||||
'utils/buffer_input_stream.cc',
|
||||
@@ -1479,7 +1480,6 @@ deps = {
|
||||
|
||||
pure_boost_tests = set([
|
||||
'test/boost/anchorless_list_test',
|
||||
'test/boost/auth_passwords_test',
|
||||
'test/boost/auth_resource_test',
|
||||
'test/boost/big_decimal_test',
|
||||
'test/boost/caching_options_test',
|
||||
|
||||
@@ -304,8 +304,6 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||
}
|
||||
}));
|
||||
#endif
|
||||
verify_batch_size(qp, mutations);
|
||||
|
||||
bool mutate_atomic = true;
|
||||
if (_type != type::LOGGED) {
|
||||
_stats.batches_pure_unlogged += 1;
|
||||
@@ -313,6 +311,7 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||
} else {
|
||||
if (mutations.size() > 1) {
|
||||
_stats.batches_pure_logged += 1;
|
||||
verify_batch_size(qp, mutations);
|
||||
} else {
|
||||
_stats.batches_unlogged_from_logged += 1;
|
||||
mutate_atomic = false;
|
||||
|
||||
@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
|
||||
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
|
||||
if (tag.empty()) {
|
||||
throw std::runtime_error("You must supply a snapshot name.");
|
||||
}
|
||||
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
|
||||
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
|
||||
};
|
||||
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable {
|
||||
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts);
|
||||
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
|
||||
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
|
||||
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
|
||||
return check_snapshot_not_exist(ks_name, tag);
|
||||
});
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) {
|
||||
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts);
|
||||
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
|
||||
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
|
||||
if (ks_name.empty()) {
|
||||
throw std::runtime_error("You must supply a keyspace name");
|
||||
}
|
||||
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
throw std::runtime_error("You must supply a snapshot name.");
|
||||
}
|
||||
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
|
||||
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts);
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
|
||||
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
|
||||
});
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
|
||||
co_await check_snapshot_not_exist(ks_name, tag, tables);
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {
|
||||
|
||||
@@ -38,13 +38,10 @@ class backup_task_impl;
|
||||
|
||||
} // snapshot namespace
|
||||
|
||||
struct snapshot_options {
|
||||
bool skip_flush = false;
|
||||
bool use_sstable_identifier = false;
|
||||
};
|
||||
|
||||
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
|
||||
public:
|
||||
using skip_flush = bool_class<class skip_flush_tag>;
|
||||
|
||||
struct table_snapshot_details {
|
||||
int64_t total;
|
||||
int64_t live;
|
||||
@@ -73,8 +70,8 @@ public:
|
||||
*
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
*/
|
||||
future<> take_snapshot(sstring tag, snapshot_options opts = {}) {
|
||||
return take_snapshot(tag, {}, opts);
|
||||
future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
|
||||
return take_snapshot(tag, {}, sf);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -83,7 +80,7 @@ public:
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
|
||||
*/
|
||||
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {});
|
||||
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
|
||||
|
||||
/**
|
||||
* Takes the snapshot of multiple tables. A snapshot name must be specified.
|
||||
@@ -92,7 +89,7 @@ public:
|
||||
* @param tables a vector of tables names to snapshot
|
||||
* @param tag the tag given to the snapshot; may not be null or empty
|
||||
*/
|
||||
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
|
||||
|
||||
/**
|
||||
* Remove the snapshot with the given name from the given keyspaces.
|
||||
@@ -130,8 +127,8 @@ private:
|
||||
|
||||
friend class snapshot::backup_task_impl;
|
||||
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
|
||||
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -137,8 +137,6 @@ namespace {
|
||||
system_keyspace::ROLE_PERMISSIONS,
|
||||
system_keyspace::DICTS,
|
||||
system_keyspace::VIEW_BUILDING_TASKS,
|
||||
// repair tasks
|
||||
system_keyspace::REPAIR_TASKS,
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
props.is_group0_table = true;
|
||||
@@ -464,24 +462,6 @@ schema_ptr system_keyspace::repair_history() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::repair_tasks() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
|
||||
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
|
||||
.with_column("task_uuid", uuid_type, column_kind::partition_key)
|
||||
.with_column("operation", utf8_type, column_kind::clustering_key)
|
||||
// First and last token for of the tablet
|
||||
.with_column("first_token", long_type, column_kind::clustering_key)
|
||||
.with_column("last_token", long_type, column_kind::clustering_key)
|
||||
.with_column("timestamp", timestamp_type)
|
||||
.with_column("table_uuid", uuid_type, column_kind::static_column)
|
||||
.set_comment("Record tablet repair tasks")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::built_indexes() {
|
||||
static thread_local auto built_indexes = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
|
||||
@@ -2331,7 +2311,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
corrupt_data(),
|
||||
scylla_local(), db::schema_tables::scylla_table_schema_history(),
|
||||
repair_history(),
|
||||
repair_tasks(),
|
||||
v3::views_builds_in_progress(), v3::built_views(),
|
||||
v3::scylla_views_builds_in_progress(),
|
||||
v3::truncated(),
|
||||
@@ -2573,32 +2552,6 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
|
||||
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
|
||||
constexpr int ttl = 10 * 24 * 3600;
|
||||
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
|
||||
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
|
||||
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
|
||||
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
|
||||
utils::chunked_vector<canonical_mutation> cmuts = {muts.begin(), muts.end()};
|
||||
co_return cmuts;
|
||||
}
|
||||
|
||||
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
|
||||
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
|
||||
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
||||
repair_task_entry ent;
|
||||
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
|
||||
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
|
||||
ent.first_token = row.get_as<int64_t>("first_token");
|
||||
ent.last_token = row.get_as<int64_t>("last_token");
|
||||
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
|
||||
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
|
||||
co_await f(std::move(ent));
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
|
||||
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
|
||||
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
|
||||
@@ -3770,35 +3723,4 @@ future<> system_keyspace::apply_mutation(mutation m) {
|
||||
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
|
||||
}
|
||||
|
||||
// The names are persisted in system tables so should not be changed.
|
||||
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
|
||||
{system_keyspace::repair_task_operation::requested, "requested"},
|
||||
{system_keyspace::repair_task_operation::finished, "finished"},
|
||||
};
|
||||
|
||||
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
|
||||
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
|
||||
for (auto&& [v, s] : repair_task_operation_to_name) {
|
||||
result.emplace(s, v);
|
||||
}
|
||||
return result;
|
||||
});
|
||||
|
||||
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
|
||||
auto i = repair_task_operation_to_name.find(op);
|
||||
if (i == repair_task_operation_to_name.end()) {
|
||||
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
|
||||
}
|
||||
return i->second;
|
||||
}
|
||||
|
||||
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
|
||||
return repair_task_operation_from_name.at(name);
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
|
||||
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
|
||||
}
|
||||
|
||||
@@ -57,8 +57,6 @@ namespace paxos {
|
||||
struct topology_request_state;
|
||||
|
||||
class group0_guard;
|
||||
|
||||
class raft_group0_client;
|
||||
}
|
||||
|
||||
namespace netw {
|
||||
@@ -186,7 +184,6 @@ public:
|
||||
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
|
||||
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
|
||||
static constexpr auto REPAIR_HISTORY = "repair_history";
|
||||
static constexpr auto REPAIR_TASKS = "repair_tasks";
|
||||
static constexpr auto GROUP0_HISTORY = "group0_history";
|
||||
static constexpr auto DISCOVERY = "discovery";
|
||||
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
|
||||
@@ -263,7 +260,6 @@ public:
|
||||
static schema_ptr raft();
|
||||
static schema_ptr raft_snapshots();
|
||||
static schema_ptr repair_history();
|
||||
static schema_ptr repair_tasks();
|
||||
static schema_ptr group0_history();
|
||||
static schema_ptr discovery();
|
||||
static schema_ptr broadcast_kv_store();
|
||||
@@ -402,22 +398,6 @@ public:
|
||||
int64_t range_end;
|
||||
};
|
||||
|
||||
enum class repair_task_operation {
|
||||
requested,
|
||||
finished,
|
||||
};
|
||||
static sstring repair_task_operation_to_string(repair_task_operation op);
|
||||
static repair_task_operation repair_task_operation_from_string(const sstring& name);
|
||||
|
||||
struct repair_task_entry {
|
||||
tasks::task_id task_uuid;
|
||||
repair_task_operation operation;
|
||||
int64_t first_token;
|
||||
int64_t last_token;
|
||||
db_clock::time_point timestamp;
|
||||
table_id table_uuid;
|
||||
};
|
||||
|
||||
struct topology_requests_entry {
|
||||
utils::UUID id;
|
||||
utils::UUID initiating_host;
|
||||
@@ -439,10 +419,6 @@ public:
|
||||
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
|
||||
future<> get_repair_history(table_id, repair_history_consumer f);
|
||||
|
||||
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
|
||||
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
|
||||
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
|
||||
|
||||
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
|
||||
future<replay_positions> get_truncated_positions(table_id);
|
||||
future<> drop_truncation_rp_records();
|
||||
@@ -750,8 +726,3 @@ public:
|
||||
}; // class system_keyspace
|
||||
|
||||
} // namespace db
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
|
||||
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
@@ -41,6 +41,8 @@ class MetricsProcessor:
|
||||
# Get metrics from the file
|
||||
try:
|
||||
metrics_file = metrics.get_metrics_from_file(relative_path, "scylla_", metrics_info, strict=strict)
|
||||
except SystemExit:
|
||||
pass
|
||||
finally:
|
||||
os.chdir(old_cwd)
|
||||
if metrics_file:
|
||||
|
||||
@@ -28,7 +28,8 @@ Incremental Repair is only supported for tables that use the tablets architectur
|
||||
Incremental Repair Modes
|
||||
------------------------
|
||||
|
||||
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation.
|
||||
Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
|
||||
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
|
||||
|
||||
The available modes are:
|
||||
|
||||
|
||||
@@ -53,13 +53,13 @@ ScyllaDB nodetool cluster repair command supports the following options:
|
||||
|
||||
nodetool cluster repair --tablet-tokens 1,10474535988
|
||||
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
|
||||
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
|
||||
|
||||
For example:
|
||||
|
||||
::
|
||||
|
||||
nodetool cluster repair --incremental-mode regular
|
||||
nodetool cluster repair --incremental-mode disabled
|
||||
|
||||
- ``keyspace`` executes a repair on a specific keyspace. The default is all keyspaces.
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ SYNOPSIS
|
||||
[(-u <username> | --username <username>)] snapshot
|
||||
[(-cf <table> | --column-family <table> | --table <table>)]
|
||||
[(-kc <kclist> | --kc.list <kclist>)]
|
||||
[(-sf | --skip-flush)] [--use-sstable-identifier] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
|
||||
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
|
||||
|
||||
OPTIONS
|
||||
.......
|
||||
@@ -37,8 +37,6 @@ Parameter Descriptio
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
--use-sstable-identifier Use the sstable identifier UUID, if available, rather than the sstable generation.
|
||||
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
|
||||
-t <tag> / --tag <tag> The name of the snapshot
|
||||
==================================================================== =====================================================================================
|
||||
|
||||
|
||||
@@ -143,7 +143,6 @@ public:
|
||||
|
||||
gms::feature tablet_incremental_repair { *this, "TABLET_INCREMENTAL_REPAIR"sv };
|
||||
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
|
||||
gms::feature tablet_repair_tasks_table { *this, "TABLET_REPAIR_TASKS_TABLE"sv };
|
||||
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
|
||||
gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv };
|
||||
|
||||
|
||||
@@ -15,3 +15,22 @@ with the Apache License (version 2) and ScyllaDB-Source-Available-1.0.
|
||||
They contain the following tag:
|
||||
|
||||
SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
|
||||
### `musl libc` files
|
||||
|
||||
`licenses/musl-license.txt` is obtained from:
|
||||
https://git.musl-libc.org/cgit/musl/tree/COPYRIGHT
|
||||
|
||||
`utils/crypt_sha512.cc` is obtained from:
|
||||
https://git.musl-libc.org/cgit/musl/tree/src/crypt/crypt_sha512.c
|
||||
|
||||
Both files are obtained from git.musl-libc.org.
|
||||
Import commit:
|
||||
commit 1b76ff0767d01df72f692806ee5adee13c67ef88
|
||||
Author: Alex Rønne Petersen <alex@alexrp.com>
|
||||
Date: Sun Oct 12 05:35:19 2025 +0200
|
||||
|
||||
s390x: shuffle register usage in __tls_get_offset to avoid r0 as address
|
||||
|
||||
musl as a whole is licensed under the standard MIT license included in
|
||||
`licenses/musl-license.txt`.
|
||||
|
||||
193
licenses/musl-license.txt
Normal file
193
licenses/musl-license.txt
Normal file
@@ -0,0 +1,193 @@
|
||||
musl as a whole is licensed under the following standard MIT license:
|
||||
|
||||
----------------------------------------------------------------------
|
||||
Copyright © 2005-2020 Rich Felker, et al.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
"Software"), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
----------------------------------------------------------------------
|
||||
|
||||
Authors/contributors include:
|
||||
|
||||
A. Wilcox
|
||||
Ada Worcester
|
||||
Alex Dowad
|
||||
Alex Suykov
|
||||
Alexander Monakov
|
||||
Andre McCurdy
|
||||
Andrew Kelley
|
||||
Anthony G. Basile
|
||||
Aric Belsito
|
||||
Arvid Picciani
|
||||
Bartosz Brachaczek
|
||||
Benjamin Peterson
|
||||
Bobby Bingham
|
||||
Boris Brezillon
|
||||
Brent Cook
|
||||
Chris Spiegel
|
||||
Clément Vasseur
|
||||
Daniel Micay
|
||||
Daniel Sabogal
|
||||
Daurnimator
|
||||
David Carlier
|
||||
David Edelsohn
|
||||
Denys Vlasenko
|
||||
Dmitry Ivanov
|
||||
Dmitry V. Levin
|
||||
Drew DeVault
|
||||
Emil Renner Berthing
|
||||
Fangrui Song
|
||||
Felix Fietkau
|
||||
Felix Janda
|
||||
Gianluca Anzolin
|
||||
Hauke Mehrtens
|
||||
He X
|
||||
Hiltjo Posthuma
|
||||
Isaac Dunham
|
||||
Jaydeep Patil
|
||||
Jens Gustedt
|
||||
Jeremy Huntwork
|
||||
Jo-Philipp Wich
|
||||
Joakim Sindholt
|
||||
John Spencer
|
||||
Julien Ramseier
|
||||
Justin Cormack
|
||||
Kaarle Ritvanen
|
||||
Khem Raj
|
||||
Kylie McClain
|
||||
Leah Neukirchen
|
||||
Luca Barbato
|
||||
Luka Perkov
|
||||
Lynn Ochs
|
||||
M Farkas-Dyck (Strake)
|
||||
Mahesh Bodapati
|
||||
Markus Wichmann
|
||||
Masanori Ogino
|
||||
Michael Clark
|
||||
Michael Forney
|
||||
Mikhail Kremnyov
|
||||
Natanael Copa
|
||||
Nicholas J. Kain
|
||||
orc
|
||||
Pascal Cuoq
|
||||
Patrick Oppenlander
|
||||
Petr Hosek
|
||||
Petr Skocik
|
||||
Pierre Carrier
|
||||
Reini Urban
|
||||
Rich Felker
|
||||
Richard Pennington
|
||||
Ryan Fairfax
|
||||
Samuel Holland
|
||||
Segev Finer
|
||||
Shiz
|
||||
sin
|
||||
Solar Designer
|
||||
Stefan Kristiansson
|
||||
Stefan O'Rear
|
||||
Szabolcs Nagy
|
||||
Timo Teräs
|
||||
Trutz Behn
|
||||
Will Dietz
|
||||
William Haddon
|
||||
William Pitcock
|
||||
|
||||
Portions of this software are derived from third-party works licensed
|
||||
under terms compatible with the above MIT license:
|
||||
|
||||
The TRE regular expression implementation (src/regex/reg* and
|
||||
src/regex/tre*) is Copyright © 2001-2008 Ville Laurikari and licensed
|
||||
under a 2-clause BSD license (license text in the source files). The
|
||||
included version has been heavily modified by Rich Felker in 2012, in
|
||||
the interests of size, simplicity, and namespace cleanliness.
|
||||
|
||||
Much of the math library code (src/math/* and src/complex/*) is
|
||||
Copyright © 1993,2004 Sun Microsystems or
|
||||
Copyright © 2003-2011 David Schultz or
|
||||
Copyright © 2003-2009 Steven G. Kargl or
|
||||
Copyright © 2003-2009 Bruce D. Evans or
|
||||
Copyright © 2008 Stephen L. Moshier or
|
||||
Copyright © 2017-2018 Arm Limited
|
||||
and labelled as such in comments in the individual source files. All
|
||||
have been licensed under extremely permissive terms.
|
||||
|
||||
The ARM memcpy code (src/string/arm/memcpy.S) is Copyright © 2008
|
||||
The Android Open Source Project and is licensed under a two-clause BSD
|
||||
license. It was taken from Bionic libc, used on Android.
|
||||
|
||||
The AArch64 memcpy and memset code (src/string/aarch64/*) are
|
||||
Copyright © 1999-2019, Arm Limited.
|
||||
|
||||
The implementation of DES for crypt (src/crypt/crypt_des.c) is
|
||||
Copyright © 1994 David Burren. It is licensed under a BSD license.
|
||||
|
||||
The implementation of blowfish crypt (src/crypt/crypt_blowfish.c) was
|
||||
originally written by Solar Designer and placed into the public
|
||||
domain. The code also comes with a fallback permissive license for use
|
||||
in jurisdictions that may not recognize the public domain.
|
||||
|
||||
The smoothsort implementation (src/stdlib/qsort.c) is Copyright © 2011
|
||||
Lynn Ochs and is licensed under an MIT-style license.
|
||||
|
||||
The x86_64 port was written by Nicholas J. Kain and is licensed under
|
||||
the standard MIT terms.
|
||||
|
||||
The mips and microblaze ports were originally written by Richard
|
||||
Pennington for use in the ellcc project. The original code was adapted
|
||||
by Rich Felker for build system and code conventions during upstream
|
||||
integration. It is licensed under the standard MIT terms.
|
||||
|
||||
The mips64 port was contributed by Imagination Technologies and is
|
||||
licensed under the standard MIT terms.
|
||||
|
||||
The powerpc port was also originally written by Richard Pennington,
|
||||
and later supplemented and integrated by John Spencer. It is licensed
|
||||
under the standard MIT terms.
|
||||
|
||||
All other files which have no copyright comments are original works
|
||||
produced specifically for use as part of this library, written either
|
||||
by Rich Felker, the main author of the library, or by one or more
|
||||
contibutors listed above. Details on authorship of individual files
|
||||
can be found in the git version control history of the project. The
|
||||
omission of copyright and license comments in each file is in the
|
||||
interest of source tree size.
|
||||
|
||||
In addition, permission is hereby granted for all public header files
|
||||
(include/* and arch/*/bits/*) and crt files intended to be linked into
|
||||
applications (crt/*, ldso/dlstart.c, and arch/*/crt_arch.h) to omit
|
||||
the copyright notice and permission notice otherwise required by the
|
||||
license, and to use these files without any requirement of
|
||||
attribution. These files include substantial contributions from:
|
||||
|
||||
Bobby Bingham
|
||||
John Spencer
|
||||
Nicholas J. Kain
|
||||
Rich Felker
|
||||
Richard Pennington
|
||||
Stefan Kristiansson
|
||||
Szabolcs Nagy
|
||||
|
||||
all of whom have explicitly granted such permission.
|
||||
|
||||
This file previously contained text expressing a belief that most of
|
||||
the files covered by the above exception were sufficiently trivial not
|
||||
to be subject to copyright, resulting in confusion over whether it
|
||||
negated the permissions granted in the license. In the spirit of
|
||||
permissive licensing, and of not having licensing issues being an
|
||||
obstacle to adoption, that text has been removed.
|
||||
@@ -200,7 +200,10 @@ enum class tablet_repair_incremental_mode : uint8_t {
|
||||
disabled,
|
||||
};
|
||||
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
|
||||
// FIXME: Incremental repair is disabled by default due to
|
||||
// https://github.com/scylladb/scylladb/issues/26041 and
|
||||
// https://github.com/scylladb/scylladb/issues/27414
|
||||
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::disabled};
|
||||
|
||||
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
|
||||
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);
|
||||
|
||||
9
main.cc
9
main.cc
@@ -748,8 +748,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// inherit Seastar's CPU affinity masks. We want this thread to be free
|
||||
// to migrate between CPUs; we think that's what makes the most sense.
|
||||
auto rpc_dict_training_worker = utils::alien_worker(startlog, 19, "rpc-dict");
|
||||
// niceness=10 is ~10% of normal process time
|
||||
auto hashing_worker = utils::alien_worker(startlog, 10, "pwd-hash");
|
||||
|
||||
return app.run(ac, av, [&] () -> future<int> {
|
||||
|
||||
@@ -779,8 +777,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return seastar::async([&app, cfg, ext, &disk_space_monitor_shard0, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
|
||||
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
|
||||
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
|
||||
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
|
||||
&hashing_worker, &vector_store_client] {
|
||||
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker, &vector_store_client] {
|
||||
try {
|
||||
if (opts.contains("relabel-config-file") && !opts["relabel-config-file"].as<sstring>().empty()) {
|
||||
// calling update_relabel_config_from_file can cause an exception that would stop startup
|
||||
@@ -2060,7 +2057,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
|
||||
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
|
||||
|
||||
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache), std::ref(hashing_worker)).get();
|
||||
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache)).get();
|
||||
|
||||
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
|
||||
|
||||
@@ -2336,7 +2333,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
auth_config.authenticator_java_name = qualified_authenticator_name;
|
||||
auth_config.role_manager_java_name = qualified_role_manager_name;
|
||||
|
||||
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache), std::ref(hashing_worker)).get();
|
||||
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache)).get();
|
||||
|
||||
std::any stop_auth_service;
|
||||
// Has to be called after node joined the cluster (join_cluster())
|
||||
|
||||
@@ -3844,83 +3844,3 @@ future<uint32_t> repair_service::get_next_repair_meta_id() {
|
||||
locator::host_id repair_service::my_host_id() const noexcept {
|
||||
return _gossiper.local().my_host_id();
|
||||
}
|
||||
|
||||
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
|
||||
if (ranges1.empty() || ranges2.empty()) {
|
||||
co_return 0;
|
||||
}
|
||||
|
||||
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
|
||||
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
|
||||
if (a.first_token != b.first_token) {
|
||||
return a.first_token < b.first_token;
|
||||
}
|
||||
return a.last_token < b.last_token;
|
||||
});
|
||||
};
|
||||
|
||||
// First, merge overlapping and adjacent ranges in ranges2.
|
||||
sort(ranges2);
|
||||
utils::chunked_vector<tablet_token_range> merged;
|
||||
merged.push_back(ranges2[0]);
|
||||
for (size_t i = 1; i < ranges2.size(); ++i) {
|
||||
co_await coroutine::maybe_yield();
|
||||
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
|
||||
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
|
||||
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
|
||||
} else {
|
||||
merged.push_back(ranges2[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Count covered ranges using a linear scan
|
||||
size_t covered_count = 0;
|
||||
auto it = merged.begin();
|
||||
auto end = merged.end();
|
||||
sort(ranges1);
|
||||
for (const auto& r1 : ranges1) {
|
||||
co_await coroutine::maybe_yield();
|
||||
// Advance the merged iterator only if the current merged range ends
|
||||
// before the current r1 starts.
|
||||
while (it != end && it->last_token < r1.first_token) {
|
||||
co_await coroutine::maybe_yield();
|
||||
++it;
|
||||
}
|
||||
// If we have exhausted the merged ranges, no further r1 can be covered
|
||||
if (it == end) {
|
||||
break;
|
||||
}
|
||||
// Check if the current merged range covers r1.
|
||||
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
|
||||
covered_count++;
|
||||
}
|
||||
}
|
||||
|
||||
co_return covered_count;
|
||||
}
|
||||
|
||||
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
|
||||
utils::chunked_vector<tablet_token_range> requested_tablets;
|
||||
utils::chunked_vector<tablet_token_range> finished_tablets;
|
||||
table_id tid;
|
||||
if (!_db.local().features().tablet_repair_tasks_table) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
|
||||
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
|
||||
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
|
||||
requested_tablets.push_back({entry.first_token, entry.last_token});
|
||||
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
|
||||
finished_tablets.push_back({entry.first_token, entry.last_token});
|
||||
}
|
||||
tid = entry.table_uuid;
|
||||
co_return;
|
||||
});
|
||||
auto requested = requested_tablets.size();
|
||||
auto finished_nomerge = finished_tablets.size();
|
||||
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
|
||||
auto progress = repair_task_progress{requested, finished, tid};
|
||||
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
|
||||
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
|
||||
co_return progress;
|
||||
}
|
||||
|
||||
@@ -99,15 +99,6 @@ public:
|
||||
|
||||
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
|
||||
|
||||
struct repair_task_progress {
|
||||
size_t requested;
|
||||
size_t finished;
|
||||
table_id table_uuid;
|
||||
float progress() const {
|
||||
return requested == 0 ? 1.0 : float(finished) / requested;
|
||||
}
|
||||
};
|
||||
|
||||
class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
sharded<service::topology_state_machine>& _tsm;
|
||||
sharded<gms::gossiper>& _gossiper;
|
||||
@@ -231,9 +222,6 @@ private:
|
||||
public:
|
||||
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas, locator::tablet_transition_stage stage);
|
||||
|
||||
|
||||
future<std::optional<repair_task_progress>> get_tablet_repair_task_progress(tasks::task_id task_uuid);
|
||||
|
||||
private:
|
||||
|
||||
future<repair_update_system_table_response> repair_update_system_table_handler(
|
||||
@@ -338,12 +326,3 @@ future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
|
||||
schema_ptr s, uint64_t seed, repair_master is_master,
|
||||
reader_permit permit, repair_hasher hasher);
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization = std::nullopt, repair_meta* rm = nullptr);
|
||||
|
||||
// A struct to hold the first and last token of a tablet.
|
||||
struct tablet_token_range {
|
||||
int64_t first_token;
|
||||
int64_t last_token;
|
||||
};
|
||||
|
||||
// Function to count the number of ranges in ranges1 covered by the merged ranges of ranges2.
|
||||
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2);
|
||||
|
||||
@@ -2810,26 +2810,26 @@ future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& shar
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) {
|
||||
if (!opts.skip_flush) {
|
||||
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, bool skip_flush) {
|
||||
if (!skip_flush) {
|
||||
co_await flush_table_on_all_shards(sharded_db, uuid);
|
||||
}
|
||||
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag, opts);
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
|
||||
}
|
||||
|
||||
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts) {
|
||||
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), opts] (auto& table_name) {
|
||||
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
|
||||
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), skip_flush] (auto& table_name) {
|
||||
auto uuid = sharded_db.local().find_uuid(ks_name, table_name);
|
||||
return snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
|
||||
return snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts) {
|
||||
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush) {
|
||||
auto& ks = sharded_db.local().find_keyspace(ks_name);
|
||||
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), opts] (const auto& pair) -> future<> {
|
||||
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
|
||||
auto uuid = pair.second->id();
|
||||
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
|
||||
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2951,12 +2951,7 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
|
||||
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
|
||||
auto name = snapshot_name_opt.value_or(
|
||||
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
|
||||
// Use the sstable identifier in snapshot names to allow de-duplication of sstables
|
||||
// at backup time even if they were migrated across shards or nodes and were renamed a given a new generation.
|
||||
// We hard-code that here since we have no way to pass this option to auto-snapshot and
|
||||
// it is always safe to use the sstable identifier for the sstable generation.
|
||||
auto opts = db::snapshot_options{.use_sstable_identifier = true};
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name, opts);
|
||||
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);
|
||||
}
|
||||
|
||||
co_await sharded_db.invoke_on_all([&] (database& db) {
|
||||
|
||||
@@ -1040,12 +1040,12 @@ public:
|
||||
private:
|
||||
using snapshot_file_set = foreign_ptr<std::unique_ptr<std::unordered_set<sstring>>>;
|
||||
|
||||
future<snapshot_file_set> take_snapshot(sstring jsondir, db::snapshot_options opts);
|
||||
future<snapshot_file_set> take_snapshot(sstring jsondir);
|
||||
// Writes the table schema and the manifest of all files in the snapshot directory.
|
||||
future<> finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets);
|
||||
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
|
||||
public:
|
||||
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts);
|
||||
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name);
|
||||
|
||||
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
|
||||
static future<snapshot_details> get_snapshot_details(std::filesystem::path snapshot_dir, std::filesystem::path datadir);
|
||||
@@ -2009,9 +2009,9 @@ public:
|
||||
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
|
||||
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
|
||||
|
||||
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, db::snapshot_options opts);
|
||||
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
|
||||
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
|
||||
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, bool skip_flush);
|
||||
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
|
||||
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
|
||||
|
||||
public:
|
||||
bool update_column_family(schema_ptr s);
|
||||
|
||||
@@ -3268,7 +3268,7 @@ future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstrin
|
||||
}
|
||||
|
||||
// Runs the orchestration code on an arbitrary shard to balance the load.
|
||||
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts) {
|
||||
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
|
||||
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
|
||||
if (so == nullptr) {
|
||||
throw std::runtime_error("Snapshotting non-local tables is not implemented");
|
||||
@@ -3291,7 +3291,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
|
||||
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
|
||||
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
|
||||
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
|
||||
return table_shards->take_snapshot(jsondir, opts);
|
||||
return table_shards->take_snapshot(jsondir);
|
||||
}));
|
||||
});
|
||||
co_await io_check(sync_directory, jsondir);
|
||||
@@ -3300,22 +3300,19 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
|
||||
});
|
||||
}
|
||||
|
||||
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir, db::snapshot_options opts) {
|
||||
tlogger.trace("take_snapshot {}: use_sstable_identifier={}", jsondir, opts.use_sstable_identifier);
|
||||
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
|
||||
tlogger.trace("take_snapshot {}", jsondir);
|
||||
|
||||
auto sstable_deletion_guard = co_await get_sstable_list_permit();
|
||||
|
||||
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
|
||||
auto table_names = std::make_unique<std::unordered_set<sstring>>();
|
||||
|
||||
auto& ks_name = schema()->ks_name();
|
||||
auto& cf_name = schema()->cf_name();
|
||||
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&, opts] (sstables::shared_sstable sstable) -> future<> {
|
||||
auto gen = co_await io_check([sstable, &dir = jsondir, opts] {
|
||||
return sstable->snapshot(dir, opts.use_sstable_identifier);
|
||||
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
|
||||
table_names->insert(sstable->component_basename(sstables::component_type::Data));
|
||||
return io_check([sstable, &dir = jsondir] {
|
||||
return sstable->snapshot(dir);
|
||||
});
|
||||
auto fname = sstable->component_basename(ks_name, cf_name, sstable->get_version(), gen, sstable->get_format(), sstables::component_type::Data);
|
||||
table_names->insert(fname);
|
||||
});
|
||||
co_return make_foreign(std::move(table_names));
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ format_match = re.compile(r'\s*(?:seastar::)?format\(\s*"([^"]+)"\s*,\s*(.*)\s*'
|
||||
def handle_error(message, strict=True, verbose_mode=False):
|
||||
if strict:
|
||||
print(f"[ERROR] {message}")
|
||||
exit(-1)
|
||||
exit(1)
|
||||
elif verbose_mode:
|
||||
print(f"[WARNING] {message}")
|
||||
|
||||
@@ -180,12 +180,11 @@ def get_metrics_from_file(file_name, prefix, metrics_information, verb=None, str
|
||||
groups = {}
|
||||
if clean_name in metrics_information:
|
||||
if (isinstance(metrics_information[clean_name], str) and metrics_information[clean_name] == "skip") or "skip" in metrics_information[clean_name]:
|
||||
exit(0)
|
||||
return {}
|
||||
param_mapping = metrics_information[clean_name]["params"] if clean_name in metrics_information and "params" in metrics_information[clean_name] else {}
|
||||
groups = metrics_information[clean_name]["groups"] if clean_name in metrics_information and "groups" in metrics_information[clean_name] else {}
|
||||
|
||||
metrics = {}
|
||||
multi_line = False
|
||||
names = undefined
|
||||
typ = undefined
|
||||
line_number = 0;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"cdc/log.cc":
|
||||
params:
|
||||
cdc_group_name: cdc
|
||||
cdc_group_name: "cdc"
|
||||
part_name;suffix: [["static_row", "total"],["clustering_row", "total"], ["map", "total"], ["set", "total"], ["list", "total"], ["udt", "total"], ["range_tombstone", "total"],["partition_delete", "total"],["row_delete", "total"], ["static_row", "failed"],["clustering_row", "failed"], ["map", "failed"], ["set", "failed"], ["list", "failed"], ["udt", "failed"], ["range_tombstone", "failed"],["partition_delete", "failed"],["row_delete", "failed"]]
|
||||
kind: ["total", "failed"]
|
||||
"db/commitlog/commitlog.cc":
|
||||
@@ -9,7 +9,7 @@
|
||||
"cfg.max_active_flushes": "cfg.max_active_flushes"
|
||||
"cql3/query_processor.cc":
|
||||
groups:
|
||||
"80": query_processor
|
||||
"80": "query_processor"
|
||||
"replica/dirty_memory_manager.cc":
|
||||
params:
|
||||
namestr: ["regular", "system"]
|
||||
@@ -19,10 +19,11 @@
|
||||
"replica/database.cc":
|
||||
params:
|
||||
"_dirty_memory_manager.throttle_threshold()": "throttle threshold"
|
||||
"seastar/apps/metrics_tester/metrics_tester.cc": skip
|
||||
"seastar/tests/unit/metrics_test.cc": skip
|
||||
"seastar/tests/unit/metrics_tester.cc": skip
|
||||
"seastar/tests/unit/prometheus_http_test.cc": skip
|
||||
"seastar/apps/metrics_tester/metrics_tester.cc": "skip"
|
||||
"seastar/tests/unit/metrics_test.cc": "skip"
|
||||
"seastar/tests/unit/metrics_tester.cc": "skip"
|
||||
"seastar/tests/unit/prometheus_http_test.cc": "skip"
|
||||
"seastar/tests/unit/prometheus_text_test.cc": "skip"
|
||||
"service/storage_proxy.cc":
|
||||
params:
|
||||
COORDINATOR_STATS_CATEGORY: "storage_proxy_coordinator"
|
||||
@@ -32,25 +33,25 @@
|
||||
_short_description_prefix: ["total_write_attempts", "write_errors", "background_replica_writes_failed", "read_repair_write_attempts"]
|
||||
_long_description_prefix: ["total number of write requests", "number of write requests that failed", "background_replica_writes_failed", "number of write operations in a read repair context"]
|
||||
_category: "storage_proxy_coordinator"
|
||||
"thrift/server.cc": skip
|
||||
"thrift/server.cc": "skip"
|
||||
"tracing/tracing.cc":
|
||||
params:
|
||||
"max_pending_trace_records + write_event_records_threshold": "max_pending_trace_records + write_event_records_threshold"
|
||||
"transport/server.cc":
|
||||
groups:
|
||||
"200": transport
|
||||
"200": "transport"
|
||||
params:
|
||||
"_config.max_request_size": "max_request_size"
|
||||
"seastar/src/net/dpdk.cc": skip
|
||||
"seastar/src/net/dpdk.cc": "skip"
|
||||
"db/hints/manager.cc":
|
||||
params:
|
||||
"group_name": ["hints_for_views_manager", "hints_manager"]
|
||||
"seastar/src/core/execution_stage.cc":
|
||||
groups:
|
||||
"100": execution_stages
|
||||
"100": "execution_stages"
|
||||
"seastar/src/core/fair_queue.cc":
|
||||
groups:
|
||||
"300": io_queue
|
||||
"300": "io_queue"
|
||||
"seastar/src/net/net.cc":
|
||||
params:
|
||||
_stats_plugin_name: ["stats_plugin_name"]
|
||||
|
||||
@@ -6822,7 +6822,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
|
||||
});
|
||||
}
|
||||
|
||||
auto ts = db_clock::now();
|
||||
for (const auto& token : tokens) {
|
||||
auto tid = tmap.get_tablet_id(token);
|
||||
auto& tinfo = tmap.get_tablet_info(tid);
|
||||
@@ -6836,20 +6835,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
|
||||
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
|
||||
.set_repair_task_info(last_token, repair_task_info, _feature_service)
|
||||
.build());
|
||||
db::system_keyspace::repair_task_entry entry{
|
||||
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
|
||||
.operation = db::system_keyspace::repair_task_operation::requested,
|
||||
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
|
||||
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
|
||||
.timestamp = ts,
|
||||
.table_uuid = table,
|
||||
};
|
||||
if (_feature_service.tablet_repair_tasks_table) {
|
||||
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
|
||||
for (auto& m : cmuts) {
|
||||
updates.push_back(std::move(m));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);
|
||||
|
||||
@@ -136,17 +136,6 @@ db::tablet_options combine_tablet_options(R&& opts) {
|
||||
return combined_opts;
|
||||
}
|
||||
|
||||
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
|
||||
auto tokens_view = s | std::views::split(delimiter)
|
||||
| std::views::transform([](auto&& range) {
|
||||
return std::string_view(&*range.begin(), std::ranges::distance(range));
|
||||
})
|
||||
| std::views::transform([](std::string_view sv) {
|
||||
return locator::tablet_id(std::stoul(std::string(sv)));
|
||||
});
|
||||
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
|
||||
}
|
||||
|
||||
// Used to compare different migration choices in regard to impact on load imbalance.
|
||||
// There is a total order on migration_badness such that better migrations are ordered before worse ones.
|
||||
struct migration_badness {
|
||||
@@ -904,8 +893,6 @@ public:
|
||||
co_await coroutine::maybe_yield();
|
||||
auto& config = tmap.repair_scheduler_config();
|
||||
auto now = db_clock::now();
|
||||
auto skip = utils::get_local_injector().inject_parameter<std::string_view>("tablet_repair_skip_sched");
|
||||
auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set<locator::tablet_id>();
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
|
||||
auto gid = locator::global_tablet_id{table, id};
|
||||
// Skip tablet that is in transitions.
|
||||
@@ -926,11 +913,6 @@ public:
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (skip_tablets.contains(id)) {
|
||||
lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid);
|
||||
co_return;
|
||||
}
|
||||
|
||||
// Avoid rescheduling a failed tablet repair in a loop
|
||||
// TODO: Allow user to config
|
||||
const auto min_reschedule_time = std::chrono::seconds(5);
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
#include "replica/database.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "repair/row_level.hh"
|
||||
#include "service/task_manager_module.hh"
|
||||
#include "tasks/task_handler.hh"
|
||||
#include "tasks/virtual_task_hint.hh"
|
||||
@@ -110,16 +109,6 @@ future<std::optional<tasks::virtual_task_hint>> tablet_virtual_task::contains(ta
|
||||
tid = tmap.next_tablet(*tid);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the task id is present in the repair task table
|
||||
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(task_id);
|
||||
if (progress && progress->requested > 0) {
|
||||
co_return tasks::virtual_task_hint{
|
||||
.table_id = progress->table_uuid,
|
||||
.task_type = locator::tablet_task_type::user_repair,
|
||||
.tablet_id = std::nullopt,
|
||||
};
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -254,20 +243,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
size_t sched_nr = 0;
|
||||
auto tmptr = _ss.get_token_metadata_ptr();
|
||||
auto& tmap = tmptr->tablets().get_tablet_map(table);
|
||||
bool repair_task_finished = false;
|
||||
bool repair_task_pending = false;
|
||||
if (is_repair_task(task_type)) {
|
||||
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(id);
|
||||
if (progress) {
|
||||
res.status.progress.completed = progress->finished;
|
||||
res.status.progress.total = progress->requested;
|
||||
res.status.progress_units = "tablets";
|
||||
if (progress->requested > 0 && progress->requested == progress->finished) {
|
||||
repair_task_finished = true;
|
||||
} if (progress->requested > 0 && progress->requested > progress->finished) {
|
||||
repair_task_pending = true;
|
||||
}
|
||||
}
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
|
||||
auto& task_info = info.repair_task_info;
|
||||
if (task_info.tablet_task_id.uuid() == id.uuid()) {
|
||||
@@ -299,17 +275,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
|
||||
co_return res;
|
||||
}
|
||||
|
||||
if (repair_task_pending) {
|
||||
// When repair_task_pending is true, the res.tablets will be empty iff the request is aborted by user.
|
||||
res.status.state = res.tablets.empty() ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::running;
|
||||
co_return res;
|
||||
}
|
||||
if (repair_task_finished) {
|
||||
res.status.state = tasks::task_manager::task_state::done;
|
||||
co_return res;
|
||||
}
|
||||
|
||||
// FIXME: Show finished tasks.
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
|
||||
@@ -1205,8 +1205,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
|
||||
// Record the repair_time returned by the repair_tablet rpc call
|
||||
db_clock::time_point repair_time;
|
||||
// Record the repair task update muations
|
||||
utils::chunked_vector<canonical_mutation> repair_task_updates;
|
||||
service::session_id session_id;
|
||||
};
|
||||
|
||||
@@ -1739,14 +1737,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
dst = dst_opt.value().host;
|
||||
}
|
||||
// Update repair task
|
||||
db::system_keyspace::repair_task_entry entry{
|
||||
.task_uuid = tasks::task_id(tinfo.repair_task_info.tablet_task_id.uuid()),
|
||||
.operation = db::system_keyspace::repair_task_operation::finished,
|
||||
.first_token = dht::token::to_int64(tmap.get_first_token(gid.tablet)),
|
||||
.last_token = dht::token::to_int64(tmap.get_last_token(gid.tablet)),
|
||||
.table_uuid = gid.table,
|
||||
};
|
||||
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
|
||||
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
|
||||
service::session_id::create_random_id() : trinfo->session_id;
|
||||
@@ -1755,10 +1745,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
|
||||
auto& tablet_state = _tablets[tablet];
|
||||
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
|
||||
if (_feature_service.tablet_repair_tasks_table) {
|
||||
entry.timestamp = db_clock::now();
|
||||
tablet_state.repair_task_updates = co_await _sys_ks.get_update_repair_task_mutations(entry, api::new_timestamp());
|
||||
}
|
||||
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
|
||||
dst, tablet, duration, res.repair_time);
|
||||
})) {
|
||||
@@ -1777,9 +1763,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
|
||||
.del_repair_task_info(last_token, _feature_service)
|
||||
.del_session(last_token);
|
||||
for (auto& m : tablet_state.repair_task_updates) {
|
||||
updates.push_back(std::move(m));
|
||||
}
|
||||
// Skip update repair time in case hosts filter or dcs filter is set.
|
||||
if (valid && is_filter_off) {
|
||||
auto sched_time = tinfo.repair_task_info.sched_time;
|
||||
|
||||
@@ -2117,14 +2117,11 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
|
||||
}
|
||||
|
||||
sstable_id sid;
|
||||
// Force a random sstable_id for testing purposes
|
||||
bool random_sstable_identifier = utils::get_local_injector().is_enabled("random_sstable_identifier");
|
||||
if (!random_sstable_identifier && generation().is_uuid_based()) {
|
||||
if (generation().is_uuid_based()) {
|
||||
sid = sstable_id(generation().as_uuid());
|
||||
} else {
|
||||
sid = sstable_id(utils::UUID_gen::get_time_UUID());
|
||||
auto msg = random_sstable_identifier ? "forced random sstable_id" : "has numerical generation";
|
||||
sstlog.info("SSTable {} {}. SSTable identifier in scylla_metadata set to {}", get_filename(), msg, sid);
|
||||
sstlog.info("SSTable {} has numerical generation. SSTable identifier in scylla_metadata set to {}", get_filename(), sid);
|
||||
}
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::SSTableIdentifier>(scylla_metadata::sstable_identifier{sid});
|
||||
|
||||
@@ -2538,11 +2535,8 @@ std::vector<std::pair<component_type, sstring>> sstable::all_components() const
|
||||
return all;
|
||||
}
|
||||
|
||||
future<generation_type> sstable::snapshot(const sstring& dir, bool use_sstable_identifier) const {
|
||||
// Use the sstable identifier UUID if available to enable global de-duplication of sstables in backup.
|
||||
generation_type gen = (use_sstable_identifier && _sstable_identifier) ? generation_type(_sstable_identifier->uuid()) : _generation;
|
||||
co_await _storage->snapshot(*this, dir, storage::absolute_path::yes, gen);
|
||||
co_return gen;
|
||||
future<> sstable::snapshot(const sstring& dir) const {
|
||||
return _storage->snapshot(*this, dir, storage::absolute_path::yes);
|
||||
}
|
||||
|
||||
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {
|
||||
|
||||
@@ -397,10 +397,6 @@ public:
|
||||
return _version;
|
||||
}
|
||||
|
||||
format_types get_format() const {
|
||||
return _format;
|
||||
}
|
||||
|
||||
// Returns the total bytes of all components.
|
||||
uint64_t bytes_on_disk() const;
|
||||
file_size_stats get_file_size_stats() const;
|
||||
@@ -442,10 +438,7 @@ public:
|
||||
|
||||
std::vector<std::pair<component_type, sstring>> all_components() const;
|
||||
|
||||
// When use_sstable_identifier is true and the sstable identifier is available,
|
||||
// use it to name the sstable in the snapshot, rather than the sstable generation.
|
||||
// Returns the generation used for snapshot.
|
||||
future<generation_type> snapshot(const sstring& dir, bool use_sstable_identifier = false) const;
|
||||
future<> snapshot(const sstring& dir) const;
|
||||
|
||||
// Delete the sstable by unlinking all sstable files
|
||||
// Ignores all errors.
|
||||
|
||||
@@ -12,7 +12,7 @@ add_scylla_test(alternator_unit_test
|
||||
add_scylla_test(anchorless_list_test
|
||||
KIND BOOST)
|
||||
add_scylla_test(auth_passwords_test
|
||||
KIND BOOST
|
||||
KIND SEASTAR
|
||||
LIBRARIES auth)
|
||||
add_scylla_test(auth_resource_test
|
||||
KIND BOOST)
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_MODULE core
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
#include <array>
|
||||
#include <random>
|
||||
@@ -16,15 +16,21 @@
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
#include "seastarx.hh"
|
||||
|
||||
extern "C" {
|
||||
#include <crypt.h>
|
||||
#include <unistd.h>
|
||||
}
|
||||
|
||||
static auto rng_for_salt = std::default_random_engine(std::random_device{}());
|
||||
|
||||
//
|
||||
// The same password hashed multiple times will result in different strings because the salt will be different.
|
||||
//
|
||||
BOOST_AUTO_TEST_CASE(passwords_are_salted) {
|
||||
SEASTAR_TEST_CASE(passwords_are_salted) {
|
||||
const char* const cleartext = "my_excellent_password";
|
||||
std::unordered_set<sstring> observed_passwords{};
|
||||
|
||||
@@ -33,12 +39,13 @@ BOOST_AUTO_TEST_CASE(passwords_are_salted) {
|
||||
BOOST_REQUIRE(!observed_passwords.contains(e));
|
||||
observed_passwords.insert(e);
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
//
|
||||
// A hashed password will authenticate against the same password in cleartext.
|
||||
//
|
||||
BOOST_AUTO_TEST_CASE(correct_passwords_authenticate) {
|
||||
SEASTAR_TEST_CASE(correct_passwords_authenticate) {
|
||||
// Common passwords.
|
||||
std::array<const char*, 3> passwords{
|
||||
"12345",
|
||||
@@ -47,14 +54,85 @@ BOOST_AUTO_TEST_CASE(correct_passwords_authenticate) {
|
||||
};
|
||||
|
||||
for (const char* p : passwords) {
|
||||
BOOST_REQUIRE(auth::passwords::check(p, auth::passwords::hash(p, rng_for_salt, auth::passwords::scheme::sha_512)));
|
||||
BOOST_REQUIRE(co_await auth::passwords::check(p, auth::passwords::hash(p, rng_for_salt, auth::passwords::scheme::sha_512)));
|
||||
}
|
||||
}
|
||||
|
||||
std::string long_password(uint32_t len) {
|
||||
std::string out;
|
||||
auto pattern = "0123456789";
|
||||
for (uint32_t i = 0; i < len; ++i) {
|
||||
out.push_back(pattern[i % strlen(pattern)]);
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(same_hashes_as_crypt_h) {
|
||||
|
||||
std::string long_pwd_254 = long_password(254);
|
||||
std::string long_pwd_255 = long_password(255);
|
||||
std::string long_pwd_511 = long_password(511);
|
||||
|
||||
std::array<const char*, 8> passwords{
|
||||
"12345",
|
||||
"1_am_the_greatest!",
|
||||
"password1",
|
||||
// Some special characters
|
||||
"!@#$%^&*()_+-=[]{}|\n;:'\",.<>/?",
|
||||
// UTF-8 characters
|
||||
"こんにちは、世界!",
|
||||
// Passwords close to __crypt_sha512 length limit
|
||||
long_pwd_254.c_str(),
|
||||
long_pwd_255.c_str(),
|
||||
// Password of maximal accepted length
|
||||
long_pwd_511.c_str(),
|
||||
};
|
||||
|
||||
auto salt = "$6$aaaabbbbccccdddd";
|
||||
|
||||
for (const char* p : passwords) {
|
||||
auto res = co_await auth::passwords::detail::hash_with_salt_async(p, salt);
|
||||
BOOST_REQUIRE(res == auth::passwords::detail::hash_with_salt(p, salt));
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(too_long_password) {
|
||||
auto p1 = long_password(71);
|
||||
auto p2 = long_password(72);
|
||||
auto p3 = long_password(73);
|
||||
auto too_long_password = long_password(512);
|
||||
|
||||
auto salt_bcrypt = "$2a$05$mAyzaIeJu41dWUkxEbn8hO";
|
||||
auto h1_bcrypt = co_await auth::passwords::detail::hash_with_salt_async(p1, salt_bcrypt);
|
||||
auto h2_bcrypt = co_await auth::passwords::detail::hash_with_salt_async(p2, salt_bcrypt);
|
||||
auto h3_bcrypt = co_await auth::passwords::detail::hash_with_salt_async(p3, salt_bcrypt);
|
||||
BOOST_REQUIRE(h1_bcrypt != h2_bcrypt);
|
||||
|
||||
// The check below documents the behavior of the current bcrypt
|
||||
// implementation that compares only the first 72 bytes of the password.
|
||||
// Although we don't typically use bcrypt for password hashing, it is
|
||||
// possible to insert such a hash using`CREATE ROLE ... WITH HASHED PASSWORD ...`.
|
||||
// Refs: scylladb/scylladb#26842
|
||||
BOOST_REQUIRE(h2_bcrypt == h3_bcrypt);
|
||||
|
||||
// The current implementation of bcrypt password hasing fails with passwords of length 512 and above
|
||||
BOOST_CHECK_THROW(co_await auth::passwords::detail::hash_with_salt_async(too_long_password, salt_bcrypt), std::system_error);
|
||||
|
||||
auto salt_sha512 = "$6$aaaabbbbccccdddd";
|
||||
auto h1_sha512 = co_await auth::passwords::detail::hash_with_salt_async(p1, salt_sha512);
|
||||
auto h2_sha512 = co_await auth::passwords::detail::hash_with_salt_async(p2, salt_sha512);
|
||||
auto h3_sha512 = co_await auth::passwords::detail::hash_with_salt_async(p3, salt_sha512);
|
||||
BOOST_REQUIRE(h1_sha512 != h2_sha512);
|
||||
BOOST_REQUIRE(h2_sha512 != h3_sha512);
|
||||
// The current implementation of SHA-512 password hasing fails with passwords of length 512 and above
|
||||
BOOST_CHECK_THROW(co_await auth::passwords::detail::hash_with_salt_async(too_long_password, salt_sha512), std::system_error);
|
||||
}
|
||||
|
||||
//
|
||||
// A hashed password that does not match the password in cleartext does not authenticate.
|
||||
//
|
||||
BOOST_AUTO_TEST_CASE(incorrect_passwords_do_not_authenticate) {
|
||||
SEASTAR_TEST_CASE(incorrect_passwords_do_not_authenticate) {
|
||||
const sstring hashed_password = auth::passwords::hash("actual_password", rng_for_salt,auth::passwords::scheme::sha_512);
|
||||
BOOST_REQUIRE(!auth::passwords::check("password_guess", hashed_password));
|
||||
BOOST_REQUIRE(!co_await auth::passwords::check("password_guess", hashed_password));
|
||||
}
|
||||
|
||||
@@ -31,7 +31,6 @@
|
||||
#include "replica/database.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/lister.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "test/lib/mutation_source_test.hh"
|
||||
@@ -39,7 +38,6 @@
|
||||
#include "service/migration_manager.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/generation_type.hh"
|
||||
#include "sstables/sstable_version.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
@@ -53,7 +51,6 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "replica/mutation_dump.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using namespace sstables;
|
||||
@@ -615,13 +612,13 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
|
||||
});
|
||||
}
|
||||
|
||||
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
|
||||
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", bool skip_flush = false) {
|
||||
try {
|
||||
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
|
||||
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
|
||||
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, skip_flush);
|
||||
} catch (...) {
|
||||
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={} use_sstable_identifier={}: {}",
|
||||
ks_name, cf_name, snapshot_name, opts.skip_flush, opts.use_sstable_identifier, std::current_exception());
|
||||
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
|
||||
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@@ -635,37 +632,6 @@ future<std::set<sstring>> collect_files(fs::path path) {
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
static bool is_component(const sstring& fname, const sstring& suffix) {
|
||||
return fname.ends_with(suffix);
|
||||
}
|
||||
|
||||
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
|
||||
// Verify manifest against the files in the snapshots dir
|
||||
auto pred = [&suffix] (const sstring& fname) {
|
||||
return is_component(fname, suffix);
|
||||
};
|
||||
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
|
||||
}
|
||||
|
||||
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
|
||||
static future<> validate_manifest(const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir) {
|
||||
sstring suffix = "-Data.db";
|
||||
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
|
||||
|
||||
std::set<sstring> sstables_in_manifest;
|
||||
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
|
||||
auto manifest_json = rjson::parse(manifest_str);
|
||||
auto& manifest_files = manifest_json["files"];
|
||||
BOOST_REQUIRE(manifest_files.IsArray());
|
||||
for (auto& f : manifest_files.GetArray()) {
|
||||
if (is_component(f.GetString(), suffix)) {
|
||||
sstables_in_manifest.insert(f.GetString());
|
||||
}
|
||||
}
|
||||
testlog.debug("SSTables in manifest.json: {}", fmt::join(sstables_in_manifest, ", "));
|
||||
BOOST_REQUIRE_EQUAL(sstables_in_snapshot, sstables_in_manifest);
|
||||
}
|
||||
|
||||
static future<> snapshot_works(const std::string& table_name) {
|
||||
return do_with_some_data({"cf"}, [table_name] (cql_test_env& e) {
|
||||
take_snapshot(e, "ks", table_name).get();
|
||||
@@ -685,8 +651,6 @@ static future<> snapshot_works(const std::string& table_name) {
|
||||
// all files were copied and manifest was generated
|
||||
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
|
||||
|
||||
validate_manifest(snapshot_dir, in_snapshot_dir).get();
|
||||
|
||||
return make_ready_future<>();
|
||||
}, true);
|
||||
}
|
||||
@@ -705,8 +669,7 @@ SEASTAR_TEST_CASE(index_snapshot_works) {
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
||||
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
|
||||
db::snapshot_options opts = {.skip_flush = true};
|
||||
take_snapshot(e, "ks", "cf", "test", opts).get();
|
||||
take_snapshot(e, "ks", "cf", "test", true /* skip_flush */).get();
|
||||
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
|
||||
@@ -719,41 +682,6 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_use_sstable_identifier_works) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
||||
return make_ready_future<>();
|
||||
#endif
|
||||
sstring table_name = "cf";
|
||||
// Force random sstable identifiers, otherwise the initial sstable_id is equal
|
||||
// to the sstable generation and the test can't distinguish between them.
|
||||
utils::get_local_injector().enable("random_sstable_identifier", false);
|
||||
return do_with_some_data({table_name}, [table_name] (cql_test_env& e) -> future<> {
|
||||
sstring tag = "test";
|
||||
db::snapshot_options opts = {.use_sstable_identifier = true};
|
||||
co_await take_snapshot(e, "ks", table_name, tag, opts);
|
||||
|
||||
auto& cf = e.local_db().find_column_family("ks", table_name);
|
||||
auto table_directory = table_dir(cf);
|
||||
auto snapshot_dir = table_directory / sstables::snapshots_dir / tag;
|
||||
auto in_table_dir = co_await collect_files(table_directory);
|
||||
// snapshot triggered a flush and wrote the data down.
|
||||
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
|
||||
testlog.info("Files in table dir: {}", fmt::join(in_table_dir, ", "));
|
||||
|
||||
auto in_snapshot_dir = co_await collect_files(snapshot_dir);
|
||||
testlog.info("Files in snapshot dir: {}", fmt::join(in_snapshot_dir, ", "));
|
||||
|
||||
in_table_dir.insert("manifest.json");
|
||||
in_table_dir.insert("schema.cql");
|
||||
// all files were copied and manifest was generated
|
||||
BOOST_REQUIRE_EQUAL(in_table_dir.size(), in_snapshot_dir.size());
|
||||
BOOST_REQUIRE_NE(in_table_dir, in_snapshot_dir);
|
||||
|
||||
co_await validate_manifest(snapshot_dir, in_snapshot_dir);
|
||||
}, true);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(snapshot_list_okay) {
|
||||
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
|
||||
auto& cf = e.local_db().find_column_family("ks", "cf");
|
||||
@@ -1528,7 +1456,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
||||
}
|
||||
BOOST_REQUIRE(found);
|
||||
|
||||
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
|
||||
co_await take_snapshot(e, "ks", "cf", "test", true /* skip_flush */);
|
||||
|
||||
testlog.debug("Expected: {}", expected);
|
||||
|
||||
|
||||
@@ -346,60 +346,4 @@ SEASTAR_TEST_CASE(repair_rows_size_considers_external_memory) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_tablet_token_range_count) {
|
||||
{
|
||||
// Simple case: one large range covers a smaller one
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
|
||||
}
|
||||
{
|
||||
// r2 ranges overlap and should merge to cover r1
|
||||
// r2: [0, 50] + [40, 100] -> merges to [0, 100]
|
||||
// r1: [10, 90] should be covered
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 90}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 50}, {40, 100}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
|
||||
}
|
||||
{
|
||||
// r2 ranges are adjacent (contiguous) and should merge
|
||||
// r2: [0, 10] + [11, 20] -> merges to [0, 20]
|
||||
// r1: [5, 15] should be covered
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}, {11, 20}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
|
||||
}
|
||||
{
|
||||
// r1 overlaps r2 but is not FULLY contained
|
||||
// r2: [0, 10]
|
||||
// r1: [5, 15] (Ends too late), [ -5, 5 ] (Starts too early)
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}, {-5, 5}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 0);
|
||||
}
|
||||
{
|
||||
// A single merged range in r2 covers multiple distinct ranges in r1
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}, {30, 40}, {50, 60}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 3);
|
||||
}
|
||||
{
|
||||
// Inputs are provided in random order, ensuring the internal sort works
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{50, 60}, {10, 20}};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{50, 100}, {0, 40}};
|
||||
// r2 merges effectively to [0, 40] and [50, 100]
|
||||
// Both r1 items are covered
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 2);
|
||||
}
|
||||
{
|
||||
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
|
||||
utils::chunked_vector<tablet_token_range> r2_empty = {};
|
||||
utils::chunked_vector<tablet_token_range> r1_empty = {};
|
||||
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
|
||||
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2_empty) == 0);
|
||||
BOOST_REQUIRE(co_await count_finished_tablets(r1_empty, r2) == 0);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -25,7 +25,7 @@ from typing import Any, Optional, override
|
||||
import pytest
|
||||
import requests
|
||||
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
|
||||
from cassandra.cluster import NoHostAvailable, Session
|
||||
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
|
||||
from cassandra.query import SimpleStatement, named_tuple_factory
|
||||
from ccmlib.scylla_node import ScyllaNode, NodeError
|
||||
|
||||
@@ -1135,6 +1135,14 @@ class TestCQLAudit(AuditTester):
|
||||
|
||||
session.execute("DROP TABLE test1")
|
||||
|
||||
def _get_attempt_count(self, session: Session, *, execution_profile=EXEC_PROFILE_DEFAULT, consistency_level: ConsistencyLevel = ConsistencyLevel.ONE) -> int:
|
||||
# dtest env is using FlakyRetryPolicy which has `max_retries` attribute
|
||||
cl_profile = session.execution_profile_clone_update(execution_profile, consistency_level=consistency_level)
|
||||
policy = cl_profile.retry_policy
|
||||
retries = getattr(policy, "max_retries", None)
|
||||
assert retries is not None
|
||||
return 1 + retries
|
||||
|
||||
def _test_insert_failure_doesnt_report_success_assign_nodes(self, session: Session = None):
|
||||
all_nodes: set[ScyllaNode] = set(self.cluster.nodelist())
|
||||
assert len(all_nodes) == 7
|
||||
@@ -1154,6 +1162,7 @@ class TestCQLAudit(AuditTester):
|
||||
for i in range(256):
|
||||
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES ({i}, 1337)", consistency_level=ConsistencyLevel.THREE)
|
||||
session.execute(stmt)
|
||||
attempt_count = self._get_attempt_count(session, consistency_level=ConsistencyLevel.THREE)
|
||||
|
||||
token = rows_to_list(session.execute(f"SELECT token(k) FROM ks.test1 WHERE k = {i}"))[0][0]
|
||||
|
||||
@@ -1168,9 +1177,9 @@ class TestCQLAudit(AuditTester):
|
||||
audit_partition_nodes = [address_to_node[address] for address in audit_nodes]
|
||||
insert_node = address_to_node[insert_node.pop()]
|
||||
kill_node = address_to_node[partitions.pop()]
|
||||
return audit_partition_nodes, insert_node, kill_node, stmt.query_string
|
||||
return audit_partition_nodes, insert_node, kill_node, stmt.query_string, attempt_count
|
||||
|
||||
return [], [], None, None
|
||||
return [], [], None, None, None
|
||||
|
||||
@pytest.mark.exclude_errors("audit - Unexpected exception when writing log with: node_ip")
|
||||
def test_insert_failure_doesnt_report_success(self):
|
||||
@@ -1192,7 +1201,7 @@ class TestCQLAudit(AuditTester):
|
||||
with self.assert_exactly_n_audit_entries_were_added(session, 1):
|
||||
conn.execute(stmt)
|
||||
|
||||
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
|
||||
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail, query_fail_count = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
|
||||
|
||||
# TODO: remove the loop when scylladb#24473 is fixed
|
||||
# We call get_host_id only to cache host_id
|
||||
@@ -1231,8 +1240,8 @@ class TestCQLAudit(AuditTester):
|
||||
# If any audit mode is not done yet, continue polling.
|
||||
all_modes_done = True
|
||||
for mode, rows in rows_dict.items():
|
||||
rows_with_error = list(filter(lambda r: r.error, rows))
|
||||
if len(rows_with_error) == 6:
|
||||
rows_with_error = [row for row in rows if row.error and row.operation == query_to_fail]
|
||||
if len(rows_with_error) == query_fail_count:
|
||||
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
|
||||
assert rows_with_error[0].error is True
|
||||
assert rows_with_error[0].consistency == "THREE"
|
||||
|
||||
@@ -220,14 +220,14 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
|
||||
|
||||
await insert_keys(cql, ks, 0, 100)
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
skipped_bytes = get_incremental_repair_sst_skipped_bytes(servers[0])
|
||||
read_bytes = get_incremental_repair_sst_read_bytes(servers[0])
|
||||
# Nothing to skip. Repair all data.
|
||||
assert skipped_bytes == 0
|
||||
assert read_bytes > 0
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
skipped_bytes2 = get_incremental_repair_sst_skipped_bytes(servers[0])
|
||||
read_bytes2 = get_incremental_repair_sst_read_bytes(servers[0])
|
||||
# Skip all. Nothing to repair
|
||||
@@ -236,7 +236,7 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
|
||||
|
||||
await insert_keys(cql, ks, 200, 300)
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
skipped_bytes3 = get_incremental_repair_sst_skipped_bytes(servers[0])
|
||||
read_bytes3 = get_incremental_repair_sst_read_bytes(servers[0])
|
||||
# Both skipped and read bytes should grow
|
||||
@@ -272,7 +272,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
|
||||
assert get_sstables_repaired_at(map0, token) == sstables_repaired_at
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
map1 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
|
||||
logging.info(f'map1={map1}')
|
||||
# Check sstables_repaired_at is increased by 1
|
||||
@@ -288,7 +288,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
|
||||
assert len(enable) == 1
|
||||
|
||||
# Second repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
map2 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
|
||||
logging.info(f'map2={map2}')
|
||||
# Check sstables_repaired_at is increased by 1
|
||||
@@ -313,7 +313,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
|
||||
# Repair should not finish with error
|
||||
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
try:
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, timeout=10)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental', timeout=10)
|
||||
assert False # Check the tablet repair is not supposed to finish
|
||||
except TimeoutError:
|
||||
logger.info("Repair timeout as expected")
|
||||
@@ -329,7 +329,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
token = -1
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
# 1 add 0 skip 1 mark
|
||||
for log in logs:
|
||||
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
|
||||
@@ -355,7 +355,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
|
||||
else:
|
||||
assert False # Wrong ops
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# 1 add 1 skip 1 mark
|
||||
for log in logs:
|
||||
@@ -394,7 +394,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
||||
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert more keys
|
||||
await insert_keys(cql, ks, current_key, current_key + nr_keys)
|
||||
@@ -402,7 +402,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
||||
current_key += nr_keys
|
||||
|
||||
# Second repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# Insert more keys and flush to get 2 more sstables
|
||||
for _ in range(2):
|
||||
@@ -436,7 +436,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
|
||||
|
||||
# Insert more keys
|
||||
@@ -445,7 +445,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
|
||||
current_key += nr_keys
|
||||
|
||||
# Second repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 3
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 3
|
||||
|
||||
# Insert more keys and flush to get 2 more sstables
|
||||
for _ in range(2):
|
||||
@@ -505,7 +505,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
|
||||
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
scylla_path = get_scylla_path(cql)
|
||||
|
||||
@@ -521,8 +521,8 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
|
||||
|
||||
# Insert more keys
|
||||
await insert_keys(cql, ks, current_key, current_key + nr_keys)
|
||||
@@ -532,7 +532,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
|
||||
# Second repair
|
||||
await inject_error_on(manager, "repair_tablet_no_update_sstables_repair_at", servers)
|
||||
# some sstable repaired_at = 3, but sstables_repaired_at = 2
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
|
||||
await inject_error_off(manager, "repair_tablet_no_update_sstables_repair_at", servers)
|
||||
|
||||
scylla_path = get_scylla_path(cql)
|
||||
@@ -561,8 +561,8 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
|
||||
|
||||
# Insert more keys
|
||||
await insert_keys(cql, ks, current_key, current_key + nr_keys)
|
||||
@@ -574,7 +574,7 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
|
||||
last_tokens = [t.last_token for t in replicas]
|
||||
for t in last_tokens[0::2]:
|
||||
logging.info(f"Start repair for token={t}");
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t) # sstables_repaired_at 3
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t, incremental_mode='incremental') # sstables_repaired_at 3
|
||||
|
||||
scylla_path = get_scylla_path(cql)
|
||||
|
||||
@@ -595,7 +595,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
|
||||
# Insert more keys
|
||||
await insert_keys(cql, ks, current_key, current_key + nr_keys)
|
||||
@@ -659,13 +659,18 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
assert read1 == 0
|
||||
assert skip2 == 0
|
||||
assert read2 > 0
|
||||
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
|
||||
await do_repair_and_check('incremental', 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
|
||||
|
||||
def check2(skip1, read1, skip2, read2):
|
||||
assert skip1 == skip2
|
||||
assert read1 == read2
|
||||
await do_repair_and_check('disabled', 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
|
||||
|
||||
# FIXME: Incremental repair is disabled by default due to
|
||||
# https://github.com/scylladb/scylladb/issues/26041 and
|
||||
# https://github.com/scylladb/scylladb/issues/27414
|
||||
await do_repair_and_check(None, 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
|
||||
|
||||
def check3(skip1, read1, skip2, read2):
|
||||
assert skip1 < skip2
|
||||
assert read1 == read2
|
||||
@@ -677,14 +682,14 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
await do_repair_and_check('full', 1, rf'Starting tablet repair by API .* incremental_mode=full.*', check4)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
|
||||
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
|
||||
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
time1 = 0
|
||||
time2 = 0
|
||||
|
||||
for s in servers:
|
||||
time1 += get_repair_tablet_time_ms(s)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
for s in servers:
|
||||
time2 += get_repair_tablet_time_ms(s)
|
||||
|
||||
@@ -694,7 +699,7 @@ async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
|
||||
# Reproducer for https://github.com/scylladb/scylladb/issues/26346
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
|
||||
async def test_incremental_repair_finishes_when_tablet_skips_end_repair_stage(manager):
|
||||
servers = await manager.servers_add(3, auto_rack_dc="dc1")
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
||||
@@ -719,7 +724,7 @@ async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_repair_rejoin_do_tablet_operation(manager):
|
||||
async def test_incremental_repair_rejoin_do_tablet_operation(manager):
|
||||
cmdline = ['--logger-log-level', 'raft_topology=debug']
|
||||
servers = await manager.servers_add(3, auto_rack_dc="dc1", cmdline=cmdline)
|
||||
|
||||
|
||||
@@ -43,86 +43,6 @@ async def guarantee_repair_time_next_second():
|
||||
# different than the previous one.
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def do_test_tablet_repair_progress_split_merge(manager: ManagerClient, do_split=False, do_merge=False):
|
||||
nr_tablets = 16
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=True, tablets=nr_tablets)
|
||||
token = 'all'
|
||||
logs = []
|
||||
for s in servers:
|
||||
logs.append(await manager.server_open_log(s.server_id))
|
||||
|
||||
# Skip repair for the listed tablet id
|
||||
nr_tablets_skipped = 4
|
||||
nr_tablets_repaired = nr_tablets - nr_tablets_skipped
|
||||
await inject_error_on(manager, "tablet_repair_skip_sched", servers, params={'value':"0,1,5,8"})
|
||||
|
||||
# Request to repair all tablets
|
||||
repair_res = await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
|
||||
logging.info(f'{repair_res=}')
|
||||
tablet_task_id = repair_res['tablet_task_id']
|
||||
|
||||
async def get_task_status(desc):
|
||||
task_status = await manager.api.get_task_status(servers[0].ip_addr, tablet_task_id)
|
||||
completed = int(task_status['progress_completed'])
|
||||
total = int(task_status['progress_total'])
|
||||
logging.info(f'{desc=} {completed=} {total=} {task_status=}')
|
||||
return completed, total
|
||||
|
||||
async def wait_task_progress(wanted_complete, wanted_total):
|
||||
while True:
|
||||
completed, total = await get_task_status("wait_task_progress")
|
||||
if completed == wanted_complete and total == wanted_total:
|
||||
break
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def get_task_status_and_check(desc):
|
||||
completed, total = await get_task_status(desc)
|
||||
assert completed == nr_tablets_repaired
|
||||
assert total == nr_tablets
|
||||
|
||||
# 12 out of 16 tablets should finish
|
||||
await wait_task_progress(nr_tablets_repaired, nr_tablets)
|
||||
|
||||
if do_split:
|
||||
await get_task_status_and_check("before_split")
|
||||
|
||||
s1_mark = await logs[0].mark()
|
||||
await inject_error_on(manager, "tablet_force_tablet_count_increase", servers)
|
||||
await logs[0].wait_for('Detected tablet split for table', from_mark=s1_mark)
|
||||
await inject_error_off(manager, "tablet_force_tablet_count_increase", servers)
|
||||
|
||||
await get_task_status_and_check("after_split")
|
||||
|
||||
if do_merge:
|
||||
await get_task_status_and_check("before_merge")
|
||||
|
||||
s1_mark = await logs[0].mark()
|
||||
await inject_error_on(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
await logs[0].wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
||||
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
|
||||
await get_task_status_and_check("after_merge")
|
||||
|
||||
# Wait for all repair to finish after all tablets can be scheduled to run repair
|
||||
await inject_error_off(manager, "tablet_repair_skip_sched", servers)
|
||||
await wait_task_progress(nr_tablets, nr_tablets)
|
||||
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_progress(manager: ManagerClient):
|
||||
await do_test_tablet_repair_progress_split_merge(manager, do_split=False, do_merge=False)
|
||||
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_progress_split(manager: ManagerClient):
|
||||
await do_test_tablet_repair_progress_split_merge(manager, do_split=True)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/26844")
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_repair_progress_merge(manager: ManagerClient):
|
||||
await do_test_tablet_repair_progress_split_merge(manager, do_merge=True)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_manual_repair(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
|
||||
|
||||
@@ -67,11 +67,11 @@ nodetool_cmd.conf = False
|
||||
|
||||
# Run the external "nodetool" executable (can be overridden by the NODETOOL
|
||||
# environment variable). Only call this if the REST API doesn't work.
|
||||
def run_nodetool(cql, *args):
|
||||
def run_nodetool(cql, *args, **subprocess_kwargs):
|
||||
# TODO: We may need to change this function or its callers to add proper
|
||||
# support for testing on multi-node clusters.
|
||||
host = cql.cluster.contact_points[0]
|
||||
subprocess.run([nodetool_cmd(), '-h', host, *args])
|
||||
return subprocess.run([nodetool_cmd(), '-h', host, *args], **subprocess_kwargs)
|
||||
|
||||
def flush(cql, table):
|
||||
ks, cf = table.split('.')
|
||||
@@ -115,7 +115,7 @@ def compact_keyspace(cql, ks, flush_memtables=True):
|
||||
args.extend([ks, cf])
|
||||
run_nodetool(cql, "compact", *args)
|
||||
|
||||
def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
|
||||
def take_snapshot(cql, table, tag, skip_flush):
|
||||
ks, cf = table.split('.')
|
||||
if has_rest_api(cql):
|
||||
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
|
||||
@@ -123,8 +123,6 @@ def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
|
||||
args = ['--tag', tag, '--table', cf]
|
||||
if skip_flush:
|
||||
args.append('--skip-flush')
|
||||
if use_sstable_identifier:
|
||||
args.append('--use-sstable-identifier')
|
||||
args.append(ks)
|
||||
run_nodetool(cql, "snapshot", *args)
|
||||
|
||||
@@ -159,6 +157,28 @@ def disablebinary(cql):
|
||||
else:
|
||||
run_nodetool(cql, "disablebinary")
|
||||
|
||||
def getlogginglevel(cql, logger):
|
||||
if has_rest_api(cql):
|
||||
resp = requests.get(f'{rest_api_url(cql)}/system/logger/{logger}')
|
||||
if resp.ok:
|
||||
return resp.text.strip()
|
||||
raise RuntimeError(f"failed to fetch logging level for {logger}: {resp.status_code} {resp.text}")
|
||||
|
||||
result = run_nodetool(
|
||||
cql,
|
||||
"getlogginglevels",
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
for line in result.stdout.splitlines():
|
||||
stripped = line.strip()
|
||||
parts = stripped.split()
|
||||
if len(parts) >= 2 and parts[0] == logger:
|
||||
return parts[-1]
|
||||
|
||||
raise RuntimeError(f"logger {logger} not found in getlogginglevels output")
|
||||
|
||||
def setlogginglevel(cql, logger, level):
|
||||
if has_rest_api(cql):
|
||||
requests.post(f'{rest_api_url(cql)}/system/logger/{logger}', params={'level': level})
|
||||
|
||||
@@ -74,3 +74,58 @@ def test_batch_with_error(cql, table1):
|
||||
# exceptions::exception_code::SERVER_ERROR, it gets converted to NoHostAvailable by the driver
|
||||
with pytest.raises(NoHostAvailable, match="Value too large"):
|
||||
cql.execute(generate_big_batch(table1, 100) + injection_key)
|
||||
|
||||
|
||||
def test_unlogged_batch_size_not_checked(cql, test_keyspace):
|
||||
"""Verifies that UNLOGGED batches are NOT subject to batch size limits.
|
||||
|
||||
Unlogged batches are applied as independent mutations and don't go through
|
||||
the system.batchlog table, so their collective size is irrelevant.
|
||||
This test should succeed even with a batch larger than the fail threshold.
|
||||
"""
|
||||
with new_test_table(cql, test_keyspace, "k int primary key, t text") as table:
|
||||
# Create a batch larger than the fail threshold (1024 KB)
|
||||
# This would fail for a logged batch, but should succeed for unlogged
|
||||
statements = [f"INSERT INTO {table} (k, t) VALUES ({idx}, '{'x' * 743}')" for idx in range(1100)]
|
||||
unlogged_batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
|
||||
|
||||
# This should not raise an exception
|
||||
cql.execute(unlogged_batch)
|
||||
|
||||
# Verify the data was inserted
|
||||
result = cql.execute(f"SELECT COUNT(*) FROM {table}")
|
||||
assert result.one()[0] == 1100
|
||||
|
||||
|
||||
def test_logged_multi_partition_batch_size_checked(cql, test_keyspace):
|
||||
"""Verifies that LOGGED batches targeting multiple partitions ARE subject to batch size limits.
|
||||
|
||||
Logged multi-partition batches go through system.batchlog and must be size-checked.
|
||||
"""
|
||||
with new_test_table(cql, test_keyspace, "k int primary key, t text") as table:
|
||||
# Create a batch larger than the fail threshold (1024 KB) with multiple partitions
|
||||
statements = [f"INSERT INTO {table} (k, t) VALUES ({idx}, '{'x' * 743}')" for idx in range(1100)]
|
||||
logged_batch = "BEGIN BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
|
||||
|
||||
# This should raise "Batch too large" exception
|
||||
with pytest.raises(InvalidRequest, match="Batch too large"):
|
||||
cql.execute(logged_batch)
|
||||
|
||||
|
||||
def test_logged_single_partition_batch_size_not_checked(cql, test_keyspace):
|
||||
"""Verifies that LOGGED batches targeting a single partition are NOT subject to batch size limits.
|
||||
|
||||
Logged single-partition batches don't go through system.batchlog,
|
||||
so their collective size is not relevant.
|
||||
"""
|
||||
with new_test_table(cql, test_keyspace, "k int, c int, t text, primary key (k, c)") as table:
|
||||
# Create a batch larger than the fail threshold (1024 KB) but all targeting the same partition
|
||||
statements = [f"INSERT INTO {table} (k, c, t) VALUES (1, {idx}, '{'x' * 743}')" for idx in range(1100)]
|
||||
logged_batch = "BEGIN BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
|
||||
|
||||
# This should not raise an exception since it's a single-partition batch
|
||||
cql.execute(logged_batch)
|
||||
|
||||
# Verify the data was inserted
|
||||
result = cql.execute(f"SELECT COUNT(*) FROM {table} WHERE k = 1")
|
||||
assert result.one()[0] == 1100
|
||||
|
||||
@@ -10,6 +10,7 @@ import re
|
||||
import requests
|
||||
import socket
|
||||
import struct
|
||||
from test.cqlpy import nodetool
|
||||
from test.cqlpy.util import cql_session
|
||||
|
||||
def get_protocol_error_metrics(host) -> int:
|
||||
@@ -58,11 +59,50 @@ def try_connect(host, port, creds, protocol_version):
|
||||
with cql_with_protocol(host, port, creds, protocol_version) as session:
|
||||
return 1 if session else 0
|
||||
|
||||
@pytest.fixture
|
||||
def debug_exceptions_logging(request, cql):
|
||||
def _read_level() -> str | None:
|
||||
try:
|
||||
level = nodetool.getlogginglevel(cql, "exception")
|
||||
if level:
|
||||
level = level.strip().strip('"').lower()
|
||||
return level
|
||||
except Exception as exc:
|
||||
print(f"Failed to read exception logger level: {exc}")
|
||||
return None
|
||||
|
||||
def _set_and_verify(level: str) -> bool:
|
||||
try:
|
||||
nodetool.setlogginglevel(cql, "exception", level)
|
||||
except Exception as exc:
|
||||
print(f"Failed to set exception logger level to '{level}': {exc}")
|
||||
return False
|
||||
|
||||
observed = _read_level()
|
||||
if observed == level:
|
||||
return True
|
||||
|
||||
print(f"Exception logger level observed as '{observed}' while expecting '{level}'")
|
||||
return False
|
||||
|
||||
def _restore_logging():
|
||||
if not enabled and previous_level is None:
|
||||
return
|
||||
|
||||
target_level = previous_level or "info"
|
||||
_set_and_verify(target_level)
|
||||
|
||||
previous_level = _read_level()
|
||||
enabled = _set_and_verify("debug")
|
||||
|
||||
yield
|
||||
_restore_logging()
|
||||
|
||||
# If there is a protocol version mismatch, the server should
|
||||
# raise a protocol error, which is counted in the metrics.
|
||||
def test_protocol_version_mismatch(scylla_only, request, host):
|
||||
run_count = 100
|
||||
cpp_exception_threshold = 10
|
||||
def test_protocol_version_mismatch(scylla_only, debug_exceptions_logging, request, host):
|
||||
run_count = 200
|
||||
cpp_exception_threshold = 20
|
||||
|
||||
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
|
||||
protocol_exception_metrics_before = get_protocol_error_metrics(host)
|
||||
@@ -244,8 +284,8 @@ def _protocol_error_impl(
|
||||
s.close()
|
||||
|
||||
def _test_impl(host, flag):
|
||||
run_count = 100
|
||||
cpp_exception_threshold = 10
|
||||
run_count = 200
|
||||
cpp_exception_threshold = 20
|
||||
|
||||
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
|
||||
protocol_exception_metrics_before = get_protocol_error_metrics(host)
|
||||
@@ -267,47 +307,47 @@ def no_ssl(request):
|
||||
yield
|
||||
|
||||
# Malformed BATCH with an invalid kind triggers a protocol error.
|
||||
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, host):
|
||||
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_bad_batch")
|
||||
|
||||
# Send OPTIONS during AUTHENTICATE to trigger auth-state error.
|
||||
def test_unexpected_message_during_auth(scylla_only, no_ssl, host):
|
||||
def test_unexpected_message_during_auth(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_unexpected_auth")
|
||||
|
||||
# STARTUP with an invalid/missing string-map entry should produce a protocol error.
|
||||
def test_process_startup_invalid_string_map(scylla_only, no_ssl, host):
|
||||
def test_process_startup_invalid_string_map(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_process_startup_invalid_string_map")
|
||||
|
||||
# STARTUP with unknown COMPRESSION option should produce a protocol error.
|
||||
def test_unknown_compression_algorithm(scylla_only, no_ssl, host):
|
||||
def test_unknown_compression_algorithm(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_unknown_compression")
|
||||
|
||||
# QUERY long-string truncation: declared length > provided bytes triggers protocol error.
|
||||
def test_process_query_internal_malformed_query(scylla_only, no_ssl, host):
|
||||
def test_process_query_internal_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_process_query_internal_malformed_query")
|
||||
|
||||
# QUERY options malformed: PAGE_SIZE flag set but page_size truncated triggers protocol error.
|
||||
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, host):
|
||||
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_process_query_internal_fail_read_options")
|
||||
|
||||
# PREPARE long-string truncation: declared length > provided bytes triggers protocol error.
|
||||
def test_process_prepare_malformed_query(scylla_only, no_ssl, host):
|
||||
def test_process_prepare_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_process_prepare_malformed_query")
|
||||
|
||||
# EXECUTE cache-key malformed: short-bytes length > provided bytes triggers protocol error.
|
||||
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, host):
|
||||
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_process_execute_internal_malformed_cache_key")
|
||||
|
||||
# REGISTER malformed string list: declared string length > provided bytes triggers protocol error.
|
||||
def test_process_register_malformed_string_list(scylla_only, no_ssl, host):
|
||||
def test_process_register_malformed_string_list(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
_test_impl(host, "trigger_process_register_malformed_string_list")
|
||||
|
||||
# Test if the protocol exceptions do not decrease after running the test happy path.
|
||||
# This is to ensure that the protocol exceptions are not cleared or reset
|
||||
# during the test execution.
|
||||
def test_no_protocol_exceptions(scylla_only, no_ssl, host):
|
||||
run_count = 100
|
||||
cpp_exception_threshold = 10
|
||||
def test_no_protocol_exceptions(scylla_only, no_ssl, debug_exceptions_logging, host):
|
||||
run_count = 200
|
||||
cpp_exception_threshold = 20
|
||||
|
||||
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
|
||||
protocol_exception_metrics_before = get_protocol_error_metrics(host)
|
||||
|
||||
@@ -1128,11 +1128,8 @@ private:
|
||||
auth_config.authenticator_java_name = qualified_authenticator_name;
|
||||
auth_config.role_manager_java_name = qualified_role_manager_name;
|
||||
|
||||
_auth_service.start(perm_cache_config, std::ref(_qp), std::ref(group0_client), std::ref(_mnotifier), std::ref(_mm), auth_config, maintenance_socket_enabled::no, std::ref(_auth_cache)).get();
|
||||
|
||||
|
||||
const uint64_t niceness = 19;
|
||||
auto hashing_worker = utils::alien_worker(startlog, niceness, "pwd-hash");
|
||||
_auth_service.start(perm_cache_config, std::ref(_qp), std::ref(group0_client), std::ref(_mnotifier), std::ref(_mm), auth_config, maintenance_socket_enabled::no, std::ref(_auth_cache), std::ref(hashing_worker)).get();
|
||||
_auth_service.invoke_on_all([this] (auth::service& auth) {
|
||||
return auth.start(_mm.local(), _sys_ks.local());
|
||||
}).get();
|
||||
|
||||
@@ -99,7 +99,7 @@ def test_listsnapshots_no_snapshots(nodetool, request):
|
||||
assert res.stdout == "Snapshot Details: \nThere are no snapshots\n"
|
||||
|
||||
|
||||
def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=False):
|
||||
def check_snapshot_out(res, tag, ktlist, skip_flush):
|
||||
"""Check that the output of nodetool snapshot contains the expected messages"""
|
||||
|
||||
if len(ktlist) == 0:
|
||||
@@ -110,7 +110,7 @@ def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=Fals
|
||||
pattern = re.compile("Requested creating snapshot\\(s\\)"
|
||||
f" for \\[{keyspaces}\\]"
|
||||
f" with snapshot name \\[(.+)\\]"
|
||||
f" and options \\{{skip_flush={str(skip_flush).lower()}, use_sstable_identifier={str(use_sstable_identifier).lower()}\\}}")
|
||||
f" and options \\{{skipFlush={str(skip_flush).lower()}\\}}")
|
||||
|
||||
print(res)
|
||||
print(pattern)
|
||||
@@ -138,13 +138,13 @@ def test_snapshot_keyspace(nodetool):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", "ks2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False)
|
||||
|
||||
@@ -155,13 +155,13 @@ def test_snapshot_keyspace_with_table(nodetool, option_name):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl1,tbl2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1"], False)
|
||||
|
||||
@@ -186,7 +186,7 @@ class kn_param(NamedTuple):
|
||||
def test_snapshot_keyspace_table_single_arg(nodetool, param, scylla_only):
|
||||
tag = "my_snapshot"
|
||||
|
||||
req_params = {"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": param.kn}
|
||||
req_params = {"tag": tag, "sf": "false", "kn": param.kn}
|
||||
if param.cf:
|
||||
req_params["cf"] = param.cf
|
||||
|
||||
@@ -202,19 +202,19 @@ def test_snapshot_ktlist(nodetool, option_name):
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1,ks2.tbl2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1.tbl1,ks2.tbl2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1.tbl1,ks2.tbl2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False)
|
||||
|
||||
res = nodetool("snapshot", "--tag", tag, option_name, "ks1,ks2", expected_requests=[
|
||||
expected_request("POST", "/storage_service/snapshots",
|
||||
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
|
||||
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
|
||||
])
|
||||
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False)
|
||||
|
||||
@@ -229,8 +229,7 @@ def test_snapshot_ktlist(nodetool, option_name):
|
||||
{"ks": ["ks1", "ks2"], "tbl": []},
|
||||
])
|
||||
@pytest.mark.parametrize("skip_flush", [False, True])
|
||||
@pytest.mark.parametrize("use_sstable_identifier", [False, True])
|
||||
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_identifier):
|
||||
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
|
||||
cmd = ["snapshot"]
|
||||
params = {}
|
||||
|
||||
@@ -243,11 +242,8 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
|
||||
|
||||
if skip_flush:
|
||||
cmd.append("--skip-flush")
|
||||
if use_sstable_identifier:
|
||||
cmd.append("--use-sstable-identifier")
|
||||
|
||||
params["sf"] = str(skip_flush).lower()
|
||||
params["use_sstable_identifier"] = str(use_sstable_identifier).lower()
|
||||
|
||||
if ktlist:
|
||||
if "tbl" in ktlist:
|
||||
@@ -277,7 +273,7 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
|
||||
expected_request("POST", "/storage_service/snapshots", params=params)
|
||||
])
|
||||
|
||||
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush, use_sstable_identifier)
|
||||
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush)
|
||||
|
||||
|
||||
def test_snapshot_multiple_keyspace_with_table(nodetool):
|
||||
|
||||
@@ -654,7 +654,7 @@ void cluster_repair_operation(scylla_rest_client& client, const bpo::variables_m
|
||||
for (const auto& table : tables.empty() ? ks_to_cfs[keyspace] : tables) {
|
||||
repair_params["table"] = table;
|
||||
try {
|
||||
sstring task_id = client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"].GetString();
|
||||
sstring task_id = rjson::to_sstring(client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"]);
|
||||
|
||||
log("Starting repair with task_id={} keyspace={} table={}", task_id, keyspace, table);
|
||||
|
||||
@@ -2362,23 +2362,16 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
|
||||
params["sf"] = "false";
|
||||
}
|
||||
|
||||
if (vm.contains("use-sstable-identifier")) {
|
||||
params["use_sstable_identifier"] = "true";
|
||||
} else {
|
||||
params["use_sstable_identifier"] = "false";
|
||||
}
|
||||
|
||||
client.post("/storage_service/snapshots", params);
|
||||
|
||||
if (kn_msg.empty()) {
|
||||
kn_msg = params["kn"];
|
||||
}
|
||||
|
||||
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skip_flush={}, use_sstable_identifier={}}}\n",
|
||||
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
|
||||
kn_msg,
|
||||
params["tag"],
|
||||
params["sf"],
|
||||
params["use_sstable_identifier"]);
|
||||
params["sf"]);
|
||||
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
|
||||
}
|
||||
|
||||
@@ -4605,7 +4598,6 @@ For more information, see: {}
|
||||
typed_option<sstring>("keyspace-table-list", "The keyspace.table pair(s) to snapshot, multiple ones can be joined with ','"),
|
||||
typed_option<sstring>("tag,t", "The name of the snapshot"),
|
||||
typed_option<>("skip-flush", "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)"),
|
||||
typed_option<>("use-sstable-identifier", "Use the sstable identifier UUID, if available, rather than the sstable generation for the sstable file names within the snapshot dir and the manifest file"),
|
||||
},
|
||||
{
|
||||
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),
|
||||
|
||||
@@ -29,11 +29,8 @@ class counted_data_source_impl : public data_source_impl {
|
||||
if (_cpu_concurrency.stopped) {
|
||||
return fun();
|
||||
}
|
||||
return futurize_invoke([this] () {
|
||||
_cpu_concurrency.units.return_all();
|
||||
}).then([fun = std::move(fun)] () {
|
||||
return fun();
|
||||
}).finally([this] () {
|
||||
_cpu_concurrency.units.return_all();
|
||||
return fun().finally([this] () {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
|
||||
});
|
||||
};
|
||||
@@ -60,11 +57,8 @@ class counted_data_sink_impl : public data_sink_impl {
|
||||
if (_cpu_concurrency.stopped) {
|
||||
return fun();
|
||||
}
|
||||
return futurize_invoke([this] () {
|
||||
_cpu_concurrency.units.return_all();
|
||||
}).then([fun = std::move(fun)] () mutable {
|
||||
return fun();
|
||||
}).finally([this] () {
|
||||
_cpu_concurrency.units.return_all();
|
||||
return fun().finally([this] () {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
|
||||
});
|
||||
};
|
||||
|
||||
@@ -832,6 +832,12 @@ to_bytes(bytes_view x) {
|
||||
return bytes(x.begin(), x.size());
|
||||
}
|
||||
|
||||
inline
|
||||
bytes
|
||||
to_bytes(std::string_view x) {
|
||||
return to_bytes(to_bytes_view(x));
|
||||
}
|
||||
|
||||
inline
|
||||
bytes_opt
|
||||
to_bytes_opt(bytes_view_opt bv) {
|
||||
|
||||
@@ -15,6 +15,7 @@ target_sources(utils
|
||||
buffer_input_stream.cc
|
||||
build_id.cc
|
||||
config_file.cc
|
||||
crypt_sha512.cc
|
||||
directories.cc
|
||||
disk-error-handler.cc
|
||||
disk_space_monitor.cc
|
||||
|
||||
381
utils/crypt_sha512.cc
Normal file
381
utils/crypt_sha512.cc
Normal file
@@ -0,0 +1,381 @@
|
||||
/*
|
||||
* This file originates from musl libc (git.musl-libc.org).
|
||||
* Modifications have been made and are licensed under the following terms:
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*
|
||||
* public domain sha512 crypt implementation
|
||||
*
|
||||
* original sha crypt design: http://people.redhat.com/drepper/SHA-crypt.txt
|
||||
* in this implementation at least 32bit int is assumed,
|
||||
* key length is limited, the $6$ prefix is mandatory, '\n' and ':' is rejected
|
||||
* in the salt and rounds= setting must contain a valid iteration count,
|
||||
* on error "*" is returned.
|
||||
*/
|
||||
#include <ctype.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "crypt_sha512.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
|
||||
/* public domain sha512 implementation based on fips180-3 */
|
||||
/* >=2^64 bits messages are not supported (about 2000 peta bytes) */
|
||||
|
||||
struct sha512 {
|
||||
uint64_t len; /* processed message length */
|
||||
uint64_t h[8]; /* hash state */
|
||||
uint8_t buf[128]; /* message block buffer */
|
||||
};
|
||||
|
||||
static uint64_t ror(uint64_t n, int k) { return (n >> k) | (n << (64-k)); }
|
||||
#define Ch(x,y,z) (z ^ (x & (y ^ z)))
|
||||
#define Maj(x,y,z) ((x & y) | (z & (x | y)))
|
||||
#define S0(x) (ror(x,28) ^ ror(x,34) ^ ror(x,39))
|
||||
#define S1(x) (ror(x,14) ^ ror(x,18) ^ ror(x,41))
|
||||
#define R0(x) (ror(x,1) ^ ror(x,8) ^ (x>>7))
|
||||
#define R1(x) (ror(x,19) ^ ror(x,61) ^ (x>>6))
|
||||
|
||||
static const uint64_t K[80] = {
|
||||
0x428a2f98d728ae22ULL, 0x7137449123ef65cdULL, 0xb5c0fbcfec4d3b2fULL, 0xe9b5dba58189dbbcULL,
|
||||
0x3956c25bf348b538ULL, 0x59f111f1b605d019ULL, 0x923f82a4af194f9bULL, 0xab1c5ed5da6d8118ULL,
|
||||
0xd807aa98a3030242ULL, 0x12835b0145706fbeULL, 0x243185be4ee4b28cULL, 0x550c7dc3d5ffb4e2ULL,
|
||||
0x72be5d74f27b896fULL, 0x80deb1fe3b1696b1ULL, 0x9bdc06a725c71235ULL, 0xc19bf174cf692694ULL,
|
||||
0xe49b69c19ef14ad2ULL, 0xefbe4786384f25e3ULL, 0x0fc19dc68b8cd5b5ULL, 0x240ca1cc77ac9c65ULL,
|
||||
0x2de92c6f592b0275ULL, 0x4a7484aa6ea6e483ULL, 0x5cb0a9dcbd41fbd4ULL, 0x76f988da831153b5ULL,
|
||||
0x983e5152ee66dfabULL, 0xa831c66d2db43210ULL, 0xb00327c898fb213fULL, 0xbf597fc7beef0ee4ULL,
|
||||
0xc6e00bf33da88fc2ULL, 0xd5a79147930aa725ULL, 0x06ca6351e003826fULL, 0x142929670a0e6e70ULL,
|
||||
0x27b70a8546d22ffcULL, 0x2e1b21385c26c926ULL, 0x4d2c6dfc5ac42aedULL, 0x53380d139d95b3dfULL,
|
||||
0x650a73548baf63deULL, 0x766a0abb3c77b2a8ULL, 0x81c2c92e47edaee6ULL, 0x92722c851482353bULL,
|
||||
0xa2bfe8a14cf10364ULL, 0xa81a664bbc423001ULL, 0xc24b8b70d0f89791ULL, 0xc76c51a30654be30ULL,
|
||||
0xd192e819d6ef5218ULL, 0xd69906245565a910ULL, 0xf40e35855771202aULL, 0x106aa07032bbd1b8ULL,
|
||||
0x19a4c116b8d2d0c8ULL, 0x1e376c085141ab53ULL, 0x2748774cdf8eeb99ULL, 0x34b0bcb5e19b48a8ULL,
|
||||
0x391c0cb3c5c95a63ULL, 0x4ed8aa4ae3418acbULL, 0x5b9cca4f7763e373ULL, 0x682e6ff3d6b2b8a3ULL,
|
||||
0x748f82ee5defb2fcULL, 0x78a5636f43172f60ULL, 0x84c87814a1f0ab72ULL, 0x8cc702081a6439ecULL,
|
||||
0x90befffa23631e28ULL, 0xa4506cebde82bde9ULL, 0xbef9a3f7b2c67915ULL, 0xc67178f2e372532bULL,
|
||||
0xca273eceea26619cULL, 0xd186b8c721c0c207ULL, 0xeada7dd6cde0eb1eULL, 0xf57d4f7fee6ed178ULL,
|
||||
0x06f067aa72176fbaULL, 0x0a637dc5a2c898a6ULL, 0x113f9804bef90daeULL, 0x1b710b35131c471bULL,
|
||||
0x28db77f523047d84ULL, 0x32caab7b40c72493ULL, 0x3c9ebe0a15c9bebcULL, 0x431d67c49c100d4cULL,
|
||||
0x4cc5d4becb3e42b6ULL, 0x597f299cfc657e2aULL, 0x5fcb6fab3ad6faecULL, 0x6c44198c4a475817ULL
|
||||
};
|
||||
|
||||
static void processblock(struct sha512 *s, const uint8_t *buf)
|
||||
{
|
||||
uint64_t W[80], t1, t2, a, b, c, d, e, f, g, h;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < 16; i++) {
|
||||
W[i] = (uint64_t)buf[8*i]<<56;
|
||||
W[i] |= (uint64_t)buf[8*i+1]<<48;
|
||||
W[i] |= (uint64_t)buf[8*i+2]<<40;
|
||||
W[i] |= (uint64_t)buf[8*i+3]<<32;
|
||||
W[i] |= (uint64_t)buf[8*i+4]<<24;
|
||||
W[i] |= (uint64_t)buf[8*i+5]<<16;
|
||||
W[i] |= (uint64_t)buf[8*i+6]<<8;
|
||||
W[i] |= buf[8*i+7];
|
||||
}
|
||||
for (; i < 80; i++)
|
||||
W[i] = R1(W[i-2]) + W[i-7] + R0(W[i-15]) + W[i-16];
|
||||
a = s->h[0];
|
||||
b = s->h[1];
|
||||
c = s->h[2];
|
||||
d = s->h[3];
|
||||
e = s->h[4];
|
||||
f = s->h[5];
|
||||
g = s->h[6];
|
||||
h = s->h[7];
|
||||
for (i = 0; i < 80; i++) {
|
||||
t1 = h + S1(e) + Ch(e,f,g) + K[i] + W[i];
|
||||
t2 = S0(a) + Maj(a,b,c);
|
||||
h = g;
|
||||
g = f;
|
||||
f = e;
|
||||
e = d + t1;
|
||||
d = c;
|
||||
c = b;
|
||||
b = a;
|
||||
a = t1 + t2;
|
||||
}
|
||||
s->h[0] += a;
|
||||
s->h[1] += b;
|
||||
s->h[2] += c;
|
||||
s->h[3] += d;
|
||||
s->h[4] += e;
|
||||
s->h[5] += f;
|
||||
s->h[6] += g;
|
||||
s->h[7] += h;
|
||||
}
|
||||
|
||||
static void pad(struct sha512 *s)
|
||||
{
|
||||
unsigned r = s->len % 128;
|
||||
|
||||
s->buf[r++] = 0x80;
|
||||
if (r > 112) {
|
||||
memset(s->buf + r, 0, 128 - r);
|
||||
r = 0;
|
||||
processblock(s, s->buf);
|
||||
}
|
||||
memset(s->buf + r, 0, 120 - r);
|
||||
s->len *= 8;
|
||||
s->buf[120] = s->len >> 56;
|
||||
s->buf[121] = s->len >> 48;
|
||||
s->buf[122] = s->len >> 40;
|
||||
s->buf[123] = s->len >> 32;
|
||||
s->buf[124] = s->len >> 24;
|
||||
s->buf[125] = s->len >> 16;
|
||||
s->buf[126] = s->len >> 8;
|
||||
s->buf[127] = s->len;
|
||||
processblock(s, s->buf);
|
||||
}
|
||||
|
||||
static void sha512_init(struct sha512 *s)
|
||||
{
|
||||
s->len = 0;
|
||||
s->h[0] = 0x6a09e667f3bcc908ULL;
|
||||
s->h[1] = 0xbb67ae8584caa73bULL;
|
||||
s->h[2] = 0x3c6ef372fe94f82bULL;
|
||||
s->h[3] = 0xa54ff53a5f1d36f1ULL;
|
||||
s->h[4] = 0x510e527fade682d1ULL;
|
||||
s->h[5] = 0x9b05688c2b3e6c1fULL;
|
||||
s->h[6] = 0x1f83d9abfb41bd6bULL;
|
||||
s->h[7] = 0x5be0cd19137e2179ULL;
|
||||
}
|
||||
|
||||
static void sha512_sum(struct sha512 *s, uint8_t *md)
|
||||
{
|
||||
int i;
|
||||
|
||||
pad(s);
|
||||
for (i = 0; i < 8; i++) {
|
||||
md[8*i] = s->h[i] >> 56;
|
||||
md[8*i+1] = s->h[i] >> 48;
|
||||
md[8*i+2] = s->h[i] >> 40;
|
||||
md[8*i+3] = s->h[i] >> 32;
|
||||
md[8*i+4] = s->h[i] >> 24;
|
||||
md[8*i+5] = s->h[i] >> 16;
|
||||
md[8*i+6] = s->h[i] >> 8;
|
||||
md[8*i+7] = s->h[i];
|
||||
}
|
||||
}
|
||||
|
||||
static void sha512_update(struct sha512 *s, const void *m, unsigned long len)
|
||||
{
|
||||
const uint8_t *p = (const uint8_t *)m;
|
||||
unsigned r = s->len % 128;
|
||||
|
||||
s->len += len;
|
||||
if (r) {
|
||||
if (len < 128 - r) {
|
||||
memcpy(s->buf + r, p, len);
|
||||
return;
|
||||
}
|
||||
memcpy(s->buf + r, p, 128 - r);
|
||||
len -= 128 - r;
|
||||
p += 128 - r;
|
||||
processblock(s, s->buf);
|
||||
}
|
||||
for (; len >= 128; len -= 128, p += 128)
|
||||
processblock(s, p);
|
||||
memcpy(s->buf, p, len);
|
||||
}
|
||||
|
||||
static const unsigned char b64[] =
|
||||
"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
|
||||
|
||||
static char *to64(char *s, unsigned int u, int n)
|
||||
{
|
||||
while (--n >= 0) {
|
||||
*s++ = b64[u % 64];
|
||||
u /= 64;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
/* key limit is not part of the original design, added for DoS protection.
|
||||
* rounds limit has been lowered (versus the reference/spec), also for DoS
|
||||
* protection. runtime is O(klen^2 + klen*rounds) */
|
||||
#define KEY_MAX 256
|
||||
#define SALT_MAX 16
|
||||
#define ROUNDS_DEFAULT 5000
|
||||
#define ROUNDS_MIN 1000
|
||||
#define ROUNDS_MAX 9999999
|
||||
|
||||
/* hash n bytes of the repeated md message digest */
|
||||
static void hashmd(struct sha512 *s, unsigned int n, const void *md)
|
||||
{
|
||||
unsigned int i;
|
||||
|
||||
for (i = n; i > 64; i -= 64)
|
||||
sha512_update(s, md, 64);
|
||||
sha512_update(s, md, i);
|
||||
}
|
||||
|
||||
static seastar::future<char *> sha512crypt(const char *key, const char *setting, char *output)
|
||||
{
|
||||
struct sha512 ctx;
|
||||
unsigned char md[64], kmd[64], smd[64];
|
||||
unsigned int i, r, klen, slen;
|
||||
char rounds[20] = "";
|
||||
const char *salt;
|
||||
char *p;
|
||||
|
||||
/* reject large keys */
|
||||
for (i = 0; i <= KEY_MAX && key[i]; i++);
|
||||
if (i > KEY_MAX)
|
||||
co_return nullptr;
|
||||
klen = i;
|
||||
|
||||
/* setting: $6$rounds=n$salt$ (rounds=n$ and closing $ are optional) */
|
||||
if (strncmp(setting, "$6$", 3) != 0)
|
||||
co_return nullptr;
|
||||
salt = setting + 3;
|
||||
|
||||
r = ROUNDS_DEFAULT;
|
||||
if (strncmp(salt, "rounds=", sizeof "rounds=" - 1) == 0) {
|
||||
unsigned long u;
|
||||
char *end;
|
||||
|
||||
/*
|
||||
* this is a deviation from the reference:
|
||||
* bad rounds setting is rejected if it is
|
||||
* - empty
|
||||
* - unterminated (missing '$')
|
||||
* - begins with anything but a decimal digit
|
||||
* the reference implementation treats these bad
|
||||
* rounds as part of the salt or parse them with
|
||||
* strtoul semantics which may cause problems
|
||||
* including non-portable hashes that depend on
|
||||
* the host's value of ULONG_MAX.
|
||||
*/
|
||||
salt += sizeof "rounds=" - 1;
|
||||
if (!isdigit(*salt))
|
||||
co_return nullptr;
|
||||
u = strtoul(salt, &end, 10);
|
||||
if (*end != '$')
|
||||
co_return nullptr;
|
||||
salt = end+1;
|
||||
if (u < ROUNDS_MIN)
|
||||
r = ROUNDS_MIN;
|
||||
else if (u > ROUNDS_MAX)
|
||||
co_return nullptr;
|
||||
else
|
||||
r = u;
|
||||
/* needed when rounds is zero prefixed or out of bounds */
|
||||
sprintf(rounds, "rounds=%u$", r);
|
||||
}
|
||||
|
||||
for (i = 0; i < SALT_MAX && salt[i] && salt[i] != '$'; i++)
|
||||
/* reject characters that interfere with /etc/shadow parsing */
|
||||
if (salt[i] == '\n' || salt[i] == ':')
|
||||
co_return nullptr;
|
||||
slen = i;
|
||||
|
||||
/* B = sha(key salt key) */
|
||||
sha512_init(&ctx);
|
||||
sha512_update(&ctx, key, klen);
|
||||
sha512_update(&ctx, salt, slen);
|
||||
sha512_update(&ctx, key, klen);
|
||||
sha512_sum(&ctx, md);
|
||||
|
||||
/* A = sha(key salt repeat-B alternate-B-key) */
|
||||
sha512_init(&ctx);
|
||||
sha512_update(&ctx, key, klen);
|
||||
sha512_update(&ctx, salt, slen);
|
||||
hashmd(&ctx, klen, md);
|
||||
for (i = klen; i > 0; i >>= 1)
|
||||
if (i & 1)
|
||||
sha512_update(&ctx, md, sizeof md);
|
||||
else
|
||||
sha512_update(&ctx, key, klen);
|
||||
sha512_sum(&ctx, md);
|
||||
|
||||
/* DP = sha(repeat-key), this step takes O(klen^2) time */
|
||||
sha512_init(&ctx);
|
||||
for (i = 0; i < klen; i++)
|
||||
sha512_update(&ctx, key, klen);
|
||||
sha512_sum(&ctx, kmd);
|
||||
|
||||
/* DS = sha(repeat-salt) */
|
||||
sha512_init(&ctx);
|
||||
for (i = 0; i < 16 + md[0]; i++)
|
||||
sha512_update(&ctx, salt, slen);
|
||||
sha512_sum(&ctx, smd);
|
||||
|
||||
/* iterate A = f(A,DP,DS), this step takes O(rounds*klen) time */
|
||||
for (i = 0; i < r; i++) {
|
||||
sha512_init(&ctx);
|
||||
if (i % 2)
|
||||
hashmd(&ctx, klen, kmd);
|
||||
else
|
||||
sha512_update(&ctx, md, sizeof md);
|
||||
if (i % 3)
|
||||
sha512_update(&ctx, smd, slen);
|
||||
if (i % 7)
|
||||
hashmd(&ctx, klen, kmd);
|
||||
if (i % 2)
|
||||
sha512_update(&ctx, md, sizeof md);
|
||||
else
|
||||
hashmd(&ctx, klen, kmd);
|
||||
sha512_sum(&ctx, md);
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
/* output is $6$rounds=n$salt$hash */
|
||||
p = output;
|
||||
p += sprintf(p, "$6$%s%.*s$", rounds, slen, salt);
|
||||
#if 1
|
||||
static const unsigned char perm[][3] = {
|
||||
{0,21,42},{22,43,1},{44,2,23},{3,24,45},{25,46,4},
|
||||
{47,5,26},{6,27,48},{28,49,7},{50,8,29},{9,30,51},
|
||||
{31,52,10},{53,11,32},{12,33,54},{34,55,13},{56,14,35},
|
||||
{15,36,57},{37,58,16},{59,17,38},{18,39,60},{40,61,19},
|
||||
{62,20,41} };
|
||||
for (i=0; i<21; i++) p = to64(p,
|
||||
(md[perm[i][0]]<<16)|(md[perm[i][1]]<<8)|md[perm[i][2]], 4);
|
||||
#else
|
||||
p = to64(p, (md[0]<<16)|(md[21]<<8)|md[42], 4);
|
||||
p = to64(p, (md[22]<<16)|(md[43]<<8)|md[1], 4);
|
||||
p = to64(p, (md[44]<<16)|(md[2]<<8)|md[23], 4);
|
||||
p = to64(p, (md[3]<<16)|(md[24]<<8)|md[45], 4);
|
||||
p = to64(p, (md[25]<<16)|(md[46]<<8)|md[4], 4);
|
||||
p = to64(p, (md[47]<<16)|(md[5]<<8)|md[26], 4);
|
||||
p = to64(p, (md[6]<<16)|(md[27]<<8)|md[48], 4);
|
||||
p = to64(p, (md[28]<<16)|(md[49]<<8)|md[7], 4);
|
||||
p = to64(p, (md[50]<<16)|(md[8]<<8)|md[29], 4);
|
||||
p = to64(p, (md[9]<<16)|(md[30]<<8)|md[51], 4);
|
||||
p = to64(p, (md[31]<<16)|(md[52]<<8)|md[10], 4);
|
||||
p = to64(p, (md[53]<<16)|(md[11]<<8)|md[32], 4);
|
||||
p = to64(p, (md[12]<<16)|(md[33]<<8)|md[54], 4);
|
||||
p = to64(p, (md[34]<<16)|(md[55]<<8)|md[13], 4);
|
||||
p = to64(p, (md[56]<<16)|(md[14]<<8)|md[35], 4);
|
||||
p = to64(p, (md[15]<<16)|(md[36]<<8)|md[57], 4);
|
||||
p = to64(p, (md[37]<<16)|(md[58]<<8)|md[16], 4);
|
||||
p = to64(p, (md[59]<<16)|(md[17]<<8)|md[38], 4);
|
||||
p = to64(p, (md[18]<<16)|(md[39]<<8)|md[60], 4);
|
||||
p = to64(p, (md[40]<<16)|(md[61]<<8)|md[19], 4);
|
||||
p = to64(p, (md[62]<<16)|(md[20]<<8)|md[41], 4);
|
||||
#endif
|
||||
p = to64(p, md[63], 2);
|
||||
*p = 0;
|
||||
co_return output;
|
||||
}
|
||||
|
||||
seastar::future<const char *> __crypt_sha512(const char *key, const char *setting, char *output)
|
||||
{
|
||||
static const char testkey[] = "Xy01@#\x01\x02\x80\x7f\xff\r\n\x81\t !";
|
||||
static const char testsetting[] = "$6$rounds=1234$abc0123456789$";
|
||||
static const char testhash[] = "$6$rounds=1234$abc0123456789$BCpt8zLrc/RcyuXmCDOE1ALqMXB2MH6n1g891HhFj8.w7LxGv.FTkqq6Vxc/km3Y0jE0j24jY5PIv/oOu6reg1";
|
||||
char testbuf[128];
|
||||
char *p, *q;
|
||||
|
||||
p = co_await sha512crypt(key, setting, output);
|
||||
/* self test and stack cleanup */
|
||||
q = co_await sha512crypt(testkey, testsetting, testbuf);
|
||||
if (!p || q != testbuf || memcmp(testbuf, testhash, sizeof testhash))
|
||||
co_return "*";
|
||||
co_return p;
|
||||
}
|
||||
13
utils/crypt_sha512.hh
Normal file
13
utils/crypt_sha512.hh
Normal file
@@ -0,0 +1,13 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
|
||||
seastar::future<const char *> __crypt_sha512(const char *key, const char *setting, char *output);
|
||||
@@ -204,7 +204,7 @@ public:
|
||||
|
||||
public:
|
||||
template <typename Clock, typename Duration>
|
||||
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr) {
|
||||
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr, std::source_location loc = std::source_location::current()) {
|
||||
if (!_shared_data) {
|
||||
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
|
||||
}
|
||||
@@ -234,7 +234,8 @@ public:
|
||||
throw;
|
||||
}
|
||||
catch (const std::exception& e) {
|
||||
on_internal_error(errinj_logger, "Error injection wait_for_message timeout: " + std::string(e.what()));
|
||||
on_internal_error(errinj_logger, fmt::format("Error injection [{}] wait_for_message timeout: Called from `{}` @ {}:{}:{:d}: {}",
|
||||
_shared_data->injection_name, loc.function_name(), loc.file_name(), loc.line(), loc.column(), e.what()));
|
||||
}
|
||||
++_read_messages_counter;
|
||||
}
|
||||
|
||||
@@ -174,13 +174,6 @@ future<> print_with_extra_array(const rjson::value& value,
|
||||
seastar::output_stream<char>& os,
|
||||
size_t max_nested_level = default_max_nested_level);
|
||||
|
||||
// Returns a string_view to the string held in a JSON value (which is
|
||||
// assumed to hold a string, i.e., v.IsString() == true). This is a view
|
||||
// to the existing data - no copying is done.
|
||||
inline std::string_view to_string_view(const rjson::value& v) {
|
||||
return std::string_view(v.GetString(), v.GetStringLength());
|
||||
}
|
||||
|
||||
// Copies given JSON value - involves allocation
|
||||
rjson::value copy(const rjson::value& value);
|
||||
|
||||
@@ -236,6 +229,27 @@ rjson::value parse_yieldable(chunked_content&&, size_t max_nested_level = defaul
|
||||
rjson::value from_string(const char* str, size_t size);
|
||||
rjson::value from_string(std::string_view view);
|
||||
|
||||
// Returns a string_view to the string held in a JSON value (which is
|
||||
// assumed to hold a string, i.e., v.IsString() == true). This is a view
|
||||
// to the existing data - no copying is done.
|
||||
inline std::string_view to_string_view(const rjson::value& v) {
|
||||
return std::string_view(v.GetString(), v.GetStringLength());
|
||||
}
|
||||
|
||||
// Those functions must be called on json string object.
|
||||
// They make a copy of underlying data so it's safe to destroy
|
||||
// rjson::value afterwards.
|
||||
//
|
||||
// Rapidjson's GetString method alone is not good enough
|
||||
// for string conversion because it needs to scan the string
|
||||
// unnecessarily and GetStringLength could be used to avoid that.
|
||||
inline sstring to_sstring(const rjson::value& str) {
|
||||
return sstring(str.GetString(), str.GetStringLength());
|
||||
}
|
||||
inline std::string to_string(const rjson::value& str) {
|
||||
return std::string(str.GetString(), str.GetStringLength());
|
||||
}
|
||||
|
||||
// Returns a pointer to JSON member if it exists, nullptr otherwise
|
||||
rjson::value* find(rjson::value& value, std::string_view name);
|
||||
const rjson::value* find(const rjson::value& value, std::string_view name);
|
||||
@@ -377,7 +391,7 @@ rjson::value from_string_map(const std::map<sstring, sstring>& map);
|
||||
sstring quote_json_string(const sstring& value);
|
||||
|
||||
inline bytes base64_decode(const value& v) {
|
||||
return ::base64_decode(std::string_view(v.GetString(), v.GetStringLength()));
|
||||
return ::base64_decode(to_string_view(v));
|
||||
}
|
||||
|
||||
// A writer which allows writing json into an std::ostream in a streaming manner.
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "utils/http.hh"
|
||||
#include "utils/s3/client.hh"
|
||||
#include "utils/s3/default_aws_retry_strategy.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <rapidjson/document.h>
|
||||
#include <rapidjson/error/en.h>
|
||||
#include <seastar/http/client.hh>
|
||||
@@ -72,7 +73,7 @@ future<> instance_profile_credentials_provider::update_credentials() {
|
||||
}
|
||||
|
||||
s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sstring& creds_response) {
|
||||
rapidjson::Document document;
|
||||
rjson::document document;
|
||||
document.Parse(creds_response.data());
|
||||
|
||||
if (document.HasParseError()) {
|
||||
@@ -81,9 +82,9 @@ s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sst
|
||||
}
|
||||
|
||||
// Retrieve credentials
|
||||
return {.access_key_id = document["AccessKeyId"].GetString(),
|
||||
.secret_access_key = document["SecretAccessKey"].GetString(),
|
||||
.session_token = document["Token"].GetString(),
|
||||
return {.access_key_id = rjson::to_string(document["AccessKeyId"]),
|
||||
.secret_access_key = rjson::to_string(document["SecretAccessKey"]),
|
||||
.session_token = rjson::to_string(document["Token"]),
|
||||
// Set the expiration to one minute earlier to ensure credentials are renewed slightly before they expire
|
||||
.expires_at = seastar::lowres_clock::now() + std::chrono::seconds(session_duration - 60)};
|
||||
}
|
||||
|
||||
@@ -152,7 +152,7 @@ seastar::future<bool> client::check_status() {
|
||||
}
|
||||
auto resp = co_await std::move(f);
|
||||
auto json = rjson::parse(std::move(resp.content));
|
||||
co_return json.IsString() && json.GetString() == std::string_view("SERVING");
|
||||
co_return json.IsString() && rjson::to_string_view(json) == "SERVING";
|
||||
}
|
||||
|
||||
seastar::future<> client::close() {
|
||||
|
||||
Reference in New Issue
Block a user