Compare commits

..

4 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
8c48b82b84 test_ssl: fix indentation 2026-01-09 10:27:17 +01:00
Piotr Smaron
2bcbebe92d generic_server: improve logging broken TLS connection
Preiously we were logging a broken TLS connection and then this has been
logged later again, so now instead of logging we're constructing an
exception with a message extened with TLS info, which later will be
catched with its full message still logged.
2026-01-09 10:24:55 +01:00
Piotr Smaron
7016fc4835 test_ssl: improve timeout and readability
1. With this change the test really waits 10s, previously (in case
   something went wrong), the timeout could take way more than that.
2. Added `else` to above `if` to increase clarity of execution flow -
   it doesn't change logic, but makes it more clear.
2026-01-09 10:22:19 +01:00
copilot-swe-agent[bot]
d25d295e84 alternator/server: update SSL comment 2025-12-29 09:34:08 +01:00
57 changed files with 779 additions and 264 deletions

8
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall
auth/* @nuivall @ptrsmrn
# CACHE
row_cache* @tgrabiec
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall
cql3/* @tgrabiec @nuivall @ptrsmrn
# COUNTERS
counters* @nuivall
tests/counter_test* @nuivall
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
# DOCS
docs/* @annastuchlik @tzach

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -3,13 +3,10 @@ name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
pull_request_target:
types:
- unlabeled
jobs:
trigger-jenkins:
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI-Route Jenkins Job

View File

@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
if (!comparison_operator.IsString()) {
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
}
std::string op = rjson::to_string(comparison_operator);
std::string op = comparison_operator.GetString();
auto it = ops.find(op);
if (it == ops.end()) {
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
}
if (kv1.name == "S") {
return cmp(rjson::to_string_view(kv1.value),
rjson::to_string_view(kv2.value));
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
}
if (kv1.name == "B") {
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(rjson::to_string_view(kv_v.value),
rjson::to_string_view(kv_lb.value),
rjson::to_string_view(kv_ub.value),
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
bounds_from_query);
}
if (kv_v.name == "B") {

View File

@@ -8,8 +8,6 @@
#include "consumed_capacity.hh"
#include "error.hh"
#include "utils/rjson.hh"
#include <fmt/format.h>
namespace alternator {
@@ -34,12 +32,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
if (!return_consumed->IsString()) {
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
}
std::string_view consumed = rjson::to_string_view(*return_consumed);
std::string consumed = return_consumed->GetString();
if (consumed == "INDEXES") {
throw api_error::validation("INDEXES consumed capacity is not supported");
}
if (consumed != "TOTAL") {
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
throw api_error::validation("Unknown consumed capacity "+ consumed);
}
return true;
}

View File

@@ -419,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
if (!table_name_value->IsString()) {
throw api_error::validation("Non-string TableName field in request");
}
std::string table_name = rjson::to_string(*table_name_value);
std::string table_name = table_name_value->GetString();
return table_name;
}
@@ -546,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
// does exist but the index does not (ValidationException).
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
@@ -587,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
attribute_name, value));
}
return rjson::to_string(*attribute_value);
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
}
// Convenience function for getting the value of a boolean attribute, or a
@@ -1080,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
}
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == name) {
auto type = attribute_info["AttributeType"].GetString();
data_type dt = parse_key_type(type);
if (computed_column) {
// Computed column for GSI (doesn't choose a real column as-is
@@ -1116,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First element of KeySchema must be an object");
}
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
throw api_error::validation("First key in KeySchema must be a HASH key");
}
v = rjson::find((*key_schema)[0], "AttributeName");
@@ -1124,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First key in KeySchema must have string AttributeName");
}
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
std::string hash_key = rjson::to_string(*v);
std::string hash_key = v->GetString();
std::string range_key;
if (key_schema->Size() == 2) {
if (!(*key_schema)[1].IsObject()) {
throw api_error::validation("Second element of KeySchema must be an object");
}
v = rjson::find((*key_schema)[1], "KeyType");
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
throw api_error::validation("Second key in KeySchema must be a RANGE key");
}
v = rjson::find((*key_schema)[1], "AttributeName");
@@ -1887,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string def_type = type_to_string(def.type);
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
auto type = attribute_info["AttributeType"].GetString();
if (type != def_type) {
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
}
@@ -2362,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
@@ -2783,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
return;
}
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
if (!used.contains(rjson::to_string(it->name))) {
if (!used.contains(it->name.GetString())) {
throw api_error::validation(
format("{} has spurious '{}', not used in {}",
field_name, rjson::to_string_view(it->name), operation));
field_name, it->name.GetString(), operation));
}
}
}
@@ -3000,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
}
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
try {
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
} catch(data_dictionary::no_such_column_family&) {
@@ -3386,7 +3386,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
}
rjson::value newv = rjson::empty_object();
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
std::string attr = rjson::to_string(it->name);
std::string attr = it->name.GetString();
auto x = members.find(attr);
if (x != members.end()) {
if (x->second) {
@@ -3606,7 +3606,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
attribute_path_map_add("AttributesToGet", ret, it->GetString());
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
@@ -4272,12 +4272,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
// Note that it.key() is the name of the column, *it is the operation
bytes column_name = to_bytes(rjson::to_string_view(it->name));
bytes column_name = to_bytes(it->name.GetString());
const column_definition* cdef = _schema->get_column_definition(column_name);
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
}
std::string action = rjson::to_string((it->value)["Action"]);
std::string action = (it->value)["Action"].GetString();
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -5474,7 +5474,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
sstring key = rjson::to_sstring(it->name);
std::string key = it->name.GetString();
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
@@ -5482,13 +5482,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (key == pk_cdef.name_as_text()) {
if (sstring(key) == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
@@ -5889,7 +5889,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
int limit = limit_json ? limit_json->GetInt() : 100;
if (limit < 1 || limit > 100) {
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");

View File

@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
return {"", nullptr};
}
auto it = v.MemberBegin();
const std::string it_key = rjson::to_string(it->name);
const std::string it_key = it->name.GetString();
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
return {std::move(it_key), nullptr};
}

View File

@@ -979,9 +979,8 @@ client_data server::ongoing_request::make_client_data() const {
// and keep "driver_version" unset.
cd.driver_name = _user_agent;
// Leave "protocol_version" unset, it has no meaning in Alternator.
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset.
// As reported in issue #9216, we never set these fields in CQL
// either (see cql_server::connection::make_client_data()).
// Leave "hostname", "ssl_protocol" and "ssl_cipher_suite" unset for Alternator.
// Note: CQL sets ssl_protocol and ssl_cipher_suite via generic_server::connection base class.
return cd;
}

View File

@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
}
sstring attribute_name = rjson::to_sstring(*v);
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {

View File

@@ -729,6 +729,14 @@
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"use_sstable_identifier",
"description":"Use the sstable identifier UUID, if available, rather than the sstable generation.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
},
@@ -3051,7 +3059,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -2020,12 +2020,16 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
auto usiopt = req->get_query_param("use_sstable_identifier");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
.use_sstable_identifier = strcasecmp(usiopt.c_str(), "true") == 0
};
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
try {
if (column_families.empty()) {
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
} else {
if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified");
@@ -2033,7 +2037,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
}
co_return json_void();
} catch (...) {
@@ -2068,7 +2072,8 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
}
compaction::compaction_stats stats;

View File

@@ -146,7 +146,8 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();

View File

@@ -77,9 +77,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
}
continue;
} catch (const std::out_of_range&) {
} catch (std::out_of_range&) {
// just fallthrough
} catch (const boost::regex_error&) {
} catch (boost::regex_error&) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
}
}

View File

@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
try {
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
} catch (const exceptions::already_exists_exception&) {}
} catch (exceptions::already_exists_exception&) {}
}
}

View File

@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
} else {
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
}
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
}
}
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
[resource](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}

View File

@@ -337,13 +337,13 @@ future<authenticated_user> password_authenticator::authenticate(
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
co_return username;
} catch (const std::system_error &) {
} catch (std::system_error &) {
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (const exceptions::request_execution_exception& e) {
} catch (exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (const exceptions::authentication_exception& e) {
} catch (exceptions::authentication_exception& e) {
std::throw_with_nested(e);
} catch (const exceptions::unavailable_exception& e) {
} catch (exceptions::unavailable_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));

View File

@@ -226,7 +226,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (const ::service::group0_concurrent_modification&) {
} catch (::service::group0_concurrent_modification&) {
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}

View File

@@ -192,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
log.info("Created default superuser role '{}'.", _superuser);
} catch (const exceptions::unavailable_exception& e) {
} catch(const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}

View File

@@ -81,7 +81,7 @@ public:
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -126,7 +126,7 @@ public:
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
_complete = true;
return {};
}
@@ -141,7 +141,7 @@ public:
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (const exceptions::authentication_exception&) {
} catch (exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}

View File

@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
});
}
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
if (tag.empty()) {
throw std::runtime_error("You must supply a snapshot name.");
}
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
};
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts);
});
}
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag);
});
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts);
});
}
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
if (ks_name.empty()) {
throw std::runtime_error("You must supply a keyspace name");
}
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
throw std::runtime_error("You must supply a snapshot name.");
}
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts);
});
}
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
}
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {

View File

@@ -38,10 +38,13 @@ class backup_task_impl;
} // snapshot namespace
struct snapshot_options {
bool skip_flush = false;
bool use_sstable_identifier = false;
};
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
public:
using skip_flush = bool_class<class skip_flush_tag>;
struct table_snapshot_details {
int64_t total;
int64_t live;
@@ -70,8 +73,8 @@ public:
*
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
return take_snapshot(tag, {}, sf);
future<> take_snapshot(sstring tag, snapshot_options opts = {}) {
return take_snapshot(tag, {}, opts);
}
/**
@@ -80,7 +83,7 @@ public:
* @param tag the tag given to the snapshot; may not be null or empty
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
*/
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {});
/**
* Takes the snapshot of multiple tables. A snapshot name must be specified.
@@ -89,7 +92,7 @@ public:
* @param tables a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
/**
* Remove the snapshot with the given name from the given keyspaces.
@@ -127,8 +130,8 @@ private:
friend class snapshot::backup_task_impl;
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
};
}

View File

@@ -137,6 +137,8 @@ namespace {
system_keyspace::ROLE_PERMISSIONS,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
// repair tasks
system_keyspace::REPAIR_TASKS,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
props.is_group0_table = true;
@@ -462,6 +464,24 @@ schema_ptr system_keyspace::repair_history() {
return schema;
}
schema_ptr system_keyspace::repair_tasks() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
.with_column("task_uuid", uuid_type, column_kind::partition_key)
.with_column("operation", utf8_type, column_kind::clustering_key)
// First and last token for of the tablet
.with_column("first_token", long_type, column_kind::clustering_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("timestamp", timestamp_type)
.with_column("table_uuid", uuid_type, column_kind::static_column)
.set_comment("Record tablet repair tasks")
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr system_keyspace::built_indexes() {
static thread_local auto built_indexes = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
@@ -2311,6 +2331,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
corrupt_data(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(),
repair_tasks(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),
@@ -2552,6 +2573,32 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
});
}
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
constexpr int ttl = 10 * 24 * 3600;
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
utils::chunked_vector<canonical_mutation> cmuts = {muts.begin(), muts.end()};
co_return cmuts;
}
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_task_entry ent;
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
ent.first_token = row.get_as<int64_t>("first_token");
ent.last_token = row.get_as<int64_t>("last_token");
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
co_await f(std::move(ent));
co_return stop_iteration::no;
});
}
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
@@ -3723,4 +3770,35 @@ future<> system_keyspace::apply_mutation(mutation m) {
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
{system_keyspace::repair_task_operation::requested, "requested"},
{system_keyspace::repair_task_operation::finished, "finished"},
};
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
for (auto&& [v, s] : repair_task_operation_to_name) {
result.emplace(s, v);
}
return result;
});
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
auto i = repair_task_operation_to_name.find(op);
if (i == repair_task_operation_to_name.end()) {
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
}
return i->second;
}
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
return repair_task_operation_from_name.at(name);
}
} // namespace db
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
}

View File

@@ -57,6 +57,8 @@ namespace paxos {
struct topology_request_state;
class group0_guard;
class raft_group0_client;
}
namespace netw {
@@ -184,6 +186,7 @@ public:
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static constexpr auto REPAIR_TASKS = "repair_tasks";
static constexpr auto GROUP0_HISTORY = "group0_history";
static constexpr auto DISCOVERY = "discovery";
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
@@ -260,6 +263,7 @@ public:
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr repair_history();
static schema_ptr repair_tasks();
static schema_ptr group0_history();
static schema_ptr discovery();
static schema_ptr broadcast_kv_store();
@@ -398,6 +402,22 @@ public:
int64_t range_end;
};
enum class repair_task_operation {
requested,
finished,
};
static sstring repair_task_operation_to_string(repair_task_operation op);
static repair_task_operation repair_task_operation_from_string(const sstring& name);
struct repair_task_entry {
tasks::task_id task_uuid;
repair_task_operation operation;
int64_t first_token;
int64_t last_token;
db_clock::time_point timestamp;
table_id table_uuid;
};
struct topology_requests_entry {
utils::UUID id;
utils::UUID initiating_host;
@@ -419,6 +439,10 @@ public:
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
future<> get_repair_history(table_id, repair_history_consumer f);
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
future<replay_positions> get_truncated_positions(table_id);
future<> drop_truncation_rp_records();
@@ -726,3 +750,8 @@ public:
}; // class system_keyspace
} // namespace db
template <>
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -28,8 +28,7 @@ Incremental Repair is only supported for tables that use the tablets architectur
Incremental Repair Modes
------------------------
Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation.
The available modes are:

View File

@@ -53,13 +53,13 @@ ScyllaDB nodetool cluster repair command supports the following options:
nodetool cluster repair --tablet-tokens 1,10474535988
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
For example:
::
nodetool cluster repair --incremental-mode disabled
nodetool cluster repair --incremental-mode regular
- ``keyspace`` executes a repair on a specific keyspace. The default is all keyspaces.

View File

@@ -17,7 +17,7 @@ SYNOPSIS
[(-u <username> | --username <username>)] snapshot
[(-cf <table> | --column-family <table> | --table <table>)]
[(-kc <kclist> | --kc.list <kclist>)]
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
[(-sf | --skip-flush)] [--use-sstable-identifier] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
OPTIONS
.......
@@ -37,6 +37,8 @@ Parameter Descriptio
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
--use-sstable-identifier Use the sstable identifier UUID, if available, rather than the sstable generation.
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-t <tag> / --tag <tag> The name of the snapshot
==================================================================== =====================================================================================

View File

@@ -143,6 +143,7 @@ public:
gms::feature tablet_incremental_repair { *this, "TABLET_INCREMENTAL_REPAIR"sv };
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
gms::feature tablet_repair_tasks_table { *this, "TABLET_REPAIR_TASKS_TABLE"sv };
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv };

View File

@@ -200,10 +200,7 @@ enum class tablet_repair_incremental_mode : uint8_t {
disabled,
};
// FIXME: Incremental repair is disabled by default due to
// https://github.com/scylladb/scylladb/issues/26041 and
// https://github.com/scylladb/scylladb/issues/27414
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::disabled};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);

View File

@@ -3844,3 +3844,83 @@ future<uint32_t> repair_service::get_next_repair_meta_id() {
locator::host_id repair_service::my_host_id() const noexcept {
return _gossiper.local().my_host_id();
}
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
if (ranges1.empty() || ranges2.empty()) {
co_return 0;
}
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
if (a.first_token != b.first_token) {
return a.first_token < b.first_token;
}
return a.last_token < b.last_token;
});
};
// First, merge overlapping and adjacent ranges in ranges2.
sort(ranges2);
utils::chunked_vector<tablet_token_range> merged;
merged.push_back(ranges2[0]);
for (size_t i = 1; i < ranges2.size(); ++i) {
co_await coroutine::maybe_yield();
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
} else {
merged.push_back(ranges2[i]);
}
}
// Count covered ranges using a linear scan
size_t covered_count = 0;
auto it = merged.begin();
auto end = merged.end();
sort(ranges1);
for (const auto& r1 : ranges1) {
co_await coroutine::maybe_yield();
// Advance the merged iterator only if the current merged range ends
// before the current r1 starts.
while (it != end && it->last_token < r1.first_token) {
co_await coroutine::maybe_yield();
++it;
}
// If we have exhausted the merged ranges, no further r1 can be covered
if (it == end) {
break;
}
// Check if the current merged range covers r1.
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
covered_count++;
}
}
co_return covered_count;
}
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
utils::chunked_vector<tablet_token_range> requested_tablets;
utils::chunked_vector<tablet_token_range> finished_tablets;
table_id tid;
if (!_db.local().features().tablet_repair_tasks_table) {
co_return std::nullopt;
}
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
requested_tablets.push_back({entry.first_token, entry.last_token});
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
finished_tablets.push_back({entry.first_token, entry.last_token});
}
tid = entry.table_uuid;
co_return;
});
auto requested = requested_tablets.size();
auto finished_nomerge = finished_tablets.size();
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
auto progress = repair_task_progress{requested, finished, tid};
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
co_return progress;
}

View File

@@ -99,6 +99,15 @@ public:
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
struct repair_task_progress {
size_t requested;
size_t finished;
table_id table_uuid;
float progress() const {
return requested == 0 ? 1.0 : float(finished) / requested;
}
};
class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<service::topology_state_machine>& _tsm;
sharded<gms::gossiper>& _gossiper;
@@ -222,6 +231,9 @@ private:
public:
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas, locator::tablet_transition_stage stage);
future<std::optional<repair_task_progress>> get_tablet_repair_task_progress(tasks::task_id task_uuid);
private:
future<repair_update_system_table_response> repair_update_system_table_handler(
@@ -326,3 +338,12 @@ future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
schema_ptr s, uint64_t seed, repair_master is_master,
reader_permit permit, repair_hasher hasher);
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization = std::nullopt, repair_meta* rm = nullptr);
// A struct to hold the first and last token of a tablet.
struct tablet_token_range {
int64_t first_token;
int64_t last_token;
};
// Function to count the number of ranges in ranges1 covered by the merged ranges of ranges2.
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2);

View File

@@ -297,17 +297,17 @@ public:
const dht::token_range& token_range() const noexcept;
size_t memtable_count() const;
size_t memtable_count() const noexcept;
const compaction_group_ptr& main_compaction_group() const noexcept;
const std::vector<compaction_group_ptr>& split_ready_compaction_groups() const;
compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept;
uint64_t live_disk_space_used() const;
uint64_t live_disk_space_used() const noexcept;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const;
utils::small_vector<compaction_group_ptr, 3> compaction_groups();
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept;
utils::small_vector<compaction_group_ptr, 3> compaction_groups() noexcept;
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const noexcept;
utils::small_vector<compaction_group_ptr, 3> split_unready_groups() const;
bool split_unready_groups_are_empty() const;
@@ -430,7 +430,7 @@ public:
virtual storage_group& storage_group_for_token(dht::token) const = 0;
virtual utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;

View File

@@ -2810,26 +2810,26 @@ future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& shar
});
}
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, bool skip_flush) {
if (!skip_flush) {
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) {
if (!opts.skip_flush) {
co_await flush_table_on_all_shards(sharded_db, uuid);
}
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag, opts);
}
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), skip_flush] (auto& table_name) {
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), opts] (auto& table_name) {
auto uuid = sharded_db.local().find_uuid(ks_name, table_name);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
});
}
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush) {
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), opts] (const auto& pair) -> future<> {
auto uuid = pair.second->id();
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
});
}
@@ -2951,7 +2951,12 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
auto name = snapshot_name_opt.value_or(
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);
// Use the sstable identifier in snapshot names to allow de-duplication of sstables
// at backup time even if they were migrated across shards or nodes and were renamed a given a new generation.
// We hard-code that here since we have no way to pass this option to auto-snapshot and
// it is always safe to use the sstable identifier for the sstable generation.
auto opts = db::snapshot_options{.use_sstable_identifier = true};
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name, opts);
}
co_await sharded_db.invoke_on_all([&] (database& db) {

View File

@@ -1040,12 +1040,12 @@ public:
private:
using snapshot_file_set = foreign_ptr<std::unique_ptr<std::unordered_set<sstring>>>;
future<snapshot_file_set> take_snapshot(sstring jsondir);
future<snapshot_file_set> take_snapshot(sstring jsondir, db::snapshot_options opts);
// Writes the table schema and the manifest of all files in the snapshot directory.
future<> finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets);
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
public:
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name);
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts);
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
static future<snapshot_details> get_snapshot_details(std::filesystem::path snapshot_dir, std::filesystem::path datadir);
@@ -1133,7 +1133,7 @@ public:
// The tablet filter is used to not double account migrating tablets, so it's important that
// only one of pending or leaving replica is accounted based on current migration stage.
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept;
const db::view::stats& get_view_stats() const {
return _view_stats;
@@ -2009,9 +2009,9 @@ public:
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, bool skip_flush);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, db::snapshot_options opts);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
public:
bool update_column_family(schema_ptr s);

View File

@@ -708,7 +708,7 @@ public:
return *_single_sg;
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const override {
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const noexcept override {
return locator::combined_load_stats{
.table_ls = locator::table_load_stats{
.size_in_bytes = _single_sg->live_disk_space_used(),
@@ -874,7 +874,7 @@ public:
return storage_group_for_id(storage_group_of(token).first);
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const override;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
bool all_storage_groups_split() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
@@ -922,7 +922,7 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran
return _main_cg;
}
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const {
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept {
action(_main_cg);
for (auto& cg : _merging_groups) {
action(cg);
@@ -932,7 +932,7 @@ void storage_group::for_each_compaction_group(std::function<void(const compactio
}
}
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() {
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() noexcept {
utils::small_vector<compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -940,7 +940,7 @@ utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups()
return cgs;
}
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const {
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const noexcept {
utils::small_vector<const_compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -1890,7 +1890,7 @@ sstables::file_size_stats compaction_group::live_disk_space_used_full_stats() co
return _main_sstables->get_file_size_stats() + _maintenance_sstables->get_file_size_stats();
}
uint64_t storage_group::live_disk_space_used() const {
uint64_t storage_group::live_disk_space_used() const noexcept {
auto cgs = const_cast<storage_group&>(*this).compaction_groups();
return std::ranges::fold_left(cgs | std::views::transform(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0), std::plus{});
}
@@ -2813,7 +2813,7 @@ void table::on_flush_timer() {
});
}
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::table_load_stats table_stats;
table_stats.split_ready_seq_number = _split_ready_seq_number;
@@ -2836,7 +2836,7 @@ locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std:
};
}
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
return _sg_manager->table_load_stats(std::move(tablet_filter));
}
@@ -3268,7 +3268,7 @@ future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstrin
}
// Runs the orchestration code on an arbitrary shard to balance the load.
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts) {
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
if (so == nullptr) {
throw std::runtime_error("Snapshotting non-local tables is not implemented");
@@ -3291,7 +3291,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
return table_shards->take_snapshot(jsondir);
return table_shards->take_snapshot(jsondir, opts);
}));
});
co_await io_check(sync_directory, jsondir);
@@ -3300,19 +3300,22 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
});
}
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
tlogger.trace("take_snapshot {}", jsondir);
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir, db::snapshot_options opts) {
tlogger.trace("take_snapshot {}: use_sstable_identifier={}", jsondir, opts.use_sstable_identifier);
auto sstable_deletion_guard = co_await get_sstable_list_permit();
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
auto table_names = std::make_unique<std::unordered_set<sstring>>();
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
table_names->insert(sstable->component_basename(sstables::component_type::Data));
return io_check([sstable, &dir = jsondir] {
return sstable->snapshot(dir);
auto& ks_name = schema()->ks_name();
auto& cf_name = schema()->cf_name();
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&, opts] (sstables::shared_sstable sstable) -> future<> {
auto gen = co_await io_check([sstable, &dir = jsondir, opts] {
return sstable->snapshot(dir, opts.use_sstable_identifier);
});
auto fname = sstable->component_basename(ks_name, cf_name, sstable->get_version(), gen, sstable->get_format(), sstables::component_type::Data);
table_names->insert(fname);
});
co_return make_foreign(std::move(table_names));
}
@@ -3453,7 +3456,7 @@ size_t compaction_group::memtable_count() const noexcept {
return _memtables->size();
}
size_t storage_group::memtable_count() const {
size_t storage_group::memtable_count() const noexcept {
return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{});
}

View File

@@ -6822,6 +6822,7 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
});
}
auto ts = db_clock::now();
for (const auto& token : tokens) {
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
@@ -6835,6 +6836,20 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.set_repair_task_info(last_token, repair_task_info, _feature_service)
.build());
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::requested,
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
.timestamp = ts,
.table_uuid = table,
};
if (_feature_service.tablet_repair_tasks_table) {
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
for (auto& m : cmuts) {
updates.push_back(std::move(m));
}
}
}
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);

View File

@@ -136,6 +136,17 @@ db::tablet_options combine_tablet_options(R&& opts) {
return combined_opts;
}
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
auto tokens_view = s | std::views::split(delimiter)
| std::views::transform([](auto&& range) {
return std::string_view(&*range.begin(), std::ranges::distance(range));
})
| std::views::transform([](std::string_view sv) {
return locator::tablet_id(std::stoul(std::string(sv)));
});
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
}
// Used to compare different migration choices in regard to impact on load imbalance.
// There is a total order on migration_badness such that better migrations are ordered before worse ones.
struct migration_badness {
@@ -893,6 +904,8 @@ public:
co_await coroutine::maybe_yield();
auto& config = tmap.repair_scheduler_config();
auto now = db_clock::now();
auto skip = utils::get_local_injector().inject_parameter<std::string_view>("tablet_repair_skip_sched");
auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set<locator::tablet_id>();
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
auto gid = locator::global_tablet_id{table, id};
// Skip tablet that is in transitions.
@@ -913,6 +926,11 @@ public:
co_return;
}
if (skip_tablets.contains(id)) {
lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid);
co_return;
}
// Avoid rescheduling a failed tablet repair in a loop
// TODO: Allow user to config
const auto min_reschedule_time = std::chrono::seconds(5);

View File

@@ -10,6 +10,7 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/storage_service.hh"
#include "repair/row_level.hh"
#include "service/task_manager_module.hh"
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
@@ -109,6 +110,16 @@ future<std::optional<tasks::virtual_task_hint>> tablet_virtual_task::contains(ta
tid = tmap.next_tablet(*tid);
}
}
// Check if the task id is present in the repair task table
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(task_id);
if (progress && progress->requested > 0) {
co_return tasks::virtual_task_hint{
.table_id = progress->table_uuid,
.task_type = locator::tablet_task_type::user_repair,
.tablet_id = std::nullopt,
};
}
co_return std::nullopt;
}
@@ -243,7 +254,20 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
size_t sched_nr = 0;
auto tmptr = _ss.get_token_metadata_ptr();
auto& tmap = tmptr->tablets().get_tablet_map(table);
bool repair_task_finished = false;
bool repair_task_pending = false;
if (is_repair_task(task_type)) {
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(id);
if (progress) {
res.status.progress.completed = progress->finished;
res.status.progress.total = progress->requested;
res.status.progress_units = "tablets";
if (progress->requested > 0 && progress->requested == progress->finished) {
repair_task_finished = true;
} if (progress->requested > 0 && progress->requested > progress->finished) {
repair_task_pending = true;
}
}
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
auto& task_info = info.repair_task_info;
if (task_info.tablet_task_id.uuid() == id.uuid()) {
@@ -275,7 +299,17 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
co_return res;
}
// FIXME: Show finished tasks.
if (repair_task_pending) {
// When repair_task_pending is true, the res.tablets will be empty iff the request is aborted by user.
res.status.state = res.tablets.empty() ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::running;
co_return res;
}
if (repair_task_finished) {
res.status.state = tasks::task_manager::task_state::done;
co_return res;
}
co_return std::nullopt;
}

View File

@@ -1205,6 +1205,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
// Record the repair_time returned by the repair_tablet rpc call
db_clock::time_point repair_time;
// Record the repair task update muations
utils::chunked_vector<canonical_mutation> repair_task_updates;
service::session_id session_id;
};
@@ -1737,6 +1739,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
dst = dst_opt.value().host;
}
// Update repair task
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(tinfo.repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::finished,
.first_token = dht::token::to_int64(tmap.get_first_token(gid.tablet)),
.last_token = dht::token::to_int64(tmap.get_last_token(gid.tablet)),
.table_uuid = gid.table,
};
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
@@ -1745,6 +1755,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
if (_feature_service.tablet_repair_tasks_table) {
entry.timestamp = db_clock::now();
tablet_state.repair_task_updates = co_await _sys_ks.get_update_repair_task_mutations(entry, api::new_timestamp());
}
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
dst, tablet, duration, res.repair_time);
})) {
@@ -1763,6 +1777,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
.del_repair_task_info(last_token, _feature_service)
.del_session(last_token);
for (auto& m : tablet_state.repair_task_updates) {
updates.push_back(std::move(m));
}
// Skip update repair time in case hosts filter or dcs filter is set.
if (valid && is_filter_off) {
auto sched_time = tinfo.repair_task_info.sched_time;

View File

@@ -2117,11 +2117,14 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
}
sstable_id sid;
if (generation().is_uuid_based()) {
// Force a random sstable_id for testing purposes
bool random_sstable_identifier = utils::get_local_injector().is_enabled("random_sstable_identifier");
if (!random_sstable_identifier && generation().is_uuid_based()) {
sid = sstable_id(generation().as_uuid());
} else {
sid = sstable_id(utils::UUID_gen::get_time_UUID());
sstlog.info("SSTable {} has numerical generation. SSTable identifier in scylla_metadata set to {}", get_filename(), sid);
auto msg = random_sstable_identifier ? "forced random sstable_id" : "has numerical generation";
sstlog.info("SSTable {} {}. SSTable identifier in scylla_metadata set to {}", get_filename(), msg, sid);
}
_components->scylla_metadata->data.set<scylla_metadata_type::SSTableIdentifier>(scylla_metadata::sstable_identifier{sid});
@@ -2535,8 +2538,11 @@ std::vector<std::pair<component_type, sstring>> sstable::all_components() const
return all;
}
future<> sstable::snapshot(const sstring& dir) const {
return _storage->snapshot(*this, dir, storage::absolute_path::yes);
future<generation_type> sstable::snapshot(const sstring& dir, bool use_sstable_identifier) const {
// Use the sstable identifier UUID if available to enable global de-duplication of sstables in backup.
generation_type gen = (use_sstable_identifier && _sstable_identifier) ? generation_type(_sstable_identifier->uuid()) : _generation;
co_await _storage->snapshot(*this, dir, storage::absolute_path::yes, gen);
co_return gen;
}
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {

View File

@@ -397,6 +397,10 @@ public:
return _version;
}
format_types get_format() const {
return _format;
}
// Returns the total bytes of all components.
uint64_t bytes_on_disk() const;
file_size_stats get_file_size_stats() const;
@@ -438,7 +442,10 @@ public:
std::vector<std::pair<component_type, sstring>> all_components() const;
future<> snapshot(const sstring& dir) const;
// When use_sstable_identifier is true and the sstable identifier is available,
// use it to name the sstable in the snapshot, rather than the sstable generation.
// Returns the generation used for snapshot.
future<generation_type> snapshot(const sstring& dir, bool use_sstable_identifier = false) const;
// Delete the sstable by unlinking all sstable files
// Ignores all errors.

View File

@@ -31,6 +31,7 @@
#include "replica/database.hh"
#include "utils/assert.hh"
#include "utils/lister.hh"
#include "utils/rjson.hh"
#include "partition_slice_builder.hh"
#include "mutation/frozen_mutation.hh"
#include "test/lib/mutation_source_test.hh"
@@ -38,6 +39,7 @@
#include "service/migration_manager.hh"
#include "sstables/sstables.hh"
#include "sstables/generation_type.hh"
#include "sstables/sstable_version.hh"
#include "db/config.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog.hh"
@@ -51,6 +53,7 @@
#include "db/system_keyspace.hh"
#include "db/view/view_builder.hh"
#include "replica/mutation_dump.hh"
#include "utils/error_injection.hh"
using namespace std::chrono_literals;
using namespace sstables;
@@ -612,13 +615,13 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
});
}
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", bool skip_flush = false) {
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
try {
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, skip_flush);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
} catch (...) {
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={} use_sstable_identifier={}: {}",
ks_name, cf_name, snapshot_name, opts.skip_flush, opts.use_sstable_identifier, std::current_exception());
throw;
}
}
@@ -632,6 +635,37 @@ future<std::set<sstring>> collect_files(fs::path path) {
co_return ret;
}
static bool is_component(const sstring& fname, const sstring& suffix) {
return fname.ends_with(suffix);
}
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
// Verify manifest against the files in the snapshots dir
auto pred = [&suffix] (const sstring& fname) {
return is_component(fname, suffix);
};
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
}
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
static future<> validate_manifest(const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir) {
sstring suffix = "-Data.db";
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
std::set<sstring> sstables_in_manifest;
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
auto manifest_json = rjson::parse(manifest_str);
auto& manifest_files = manifest_json["files"];
BOOST_REQUIRE(manifest_files.IsArray());
for (auto& f : manifest_files.GetArray()) {
if (is_component(f.GetString(), suffix)) {
sstables_in_manifest.insert(f.GetString());
}
}
testlog.debug("SSTables in manifest.json: {}", fmt::join(sstables_in_manifest, ", "));
BOOST_REQUIRE_EQUAL(sstables_in_snapshot, sstables_in_manifest);
}
static future<> snapshot_works(const std::string& table_name) {
return do_with_some_data({"cf"}, [table_name] (cql_test_env& e) {
take_snapshot(e, "ks", table_name).get();
@@ -651,6 +685,8 @@ static future<> snapshot_works(const std::string& table_name) {
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
validate_manifest(snapshot_dir, in_snapshot_dir).get();
return make_ready_future<>();
}, true);
}
@@ -669,7 +705,8 @@ SEASTAR_TEST_CASE(index_snapshot_works) {
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
take_snapshot(e, "ks", "cf", "test", true /* skip_flush */).get();
db::snapshot_options opts = {.skip_flush = true};
take_snapshot(e, "ks", "cf", "test", opts).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -682,6 +719,41 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
});
}
SEASTAR_TEST_CASE(snapshot_use_sstable_identifier_works) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return make_ready_future<>();
#endif
sstring table_name = "cf";
// Force random sstable identifiers, otherwise the initial sstable_id is equal
// to the sstable generation and the test can't distinguish between them.
utils::get_local_injector().enable("random_sstable_identifier", false);
return do_with_some_data({table_name}, [table_name] (cql_test_env& e) -> future<> {
sstring tag = "test";
db::snapshot_options opts = {.use_sstable_identifier = true};
co_await take_snapshot(e, "ks", table_name, tag, opts);
auto& cf = e.local_db().find_column_family("ks", table_name);
auto table_directory = table_dir(cf);
auto snapshot_dir = table_directory / sstables::snapshots_dir / tag;
auto in_table_dir = co_await collect_files(table_directory);
// snapshot triggered a flush and wrote the data down.
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
testlog.info("Files in table dir: {}", fmt::join(in_table_dir, ", "));
auto in_snapshot_dir = co_await collect_files(snapshot_dir);
testlog.info("Files in snapshot dir: {}", fmt::join(in_snapshot_dir, ", "));
in_table_dir.insert("manifest.json");
in_table_dir.insert("schema.cql");
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir.size(), in_snapshot_dir.size());
BOOST_REQUIRE_NE(in_table_dir, in_snapshot_dir);
co_await validate_manifest(snapshot_dir, in_snapshot_dir);
}, true);
}
SEASTAR_TEST_CASE(snapshot_list_okay) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -1456,7 +1528,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
}
BOOST_REQUIRE(found);
co_await take_snapshot(e, "ks", "cf", "test", true /* skip_flush */);
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
testlog.debug("Expected: {}", expected);

View File

@@ -346,4 +346,60 @@ SEASTAR_TEST_CASE(repair_rows_size_considers_external_memory) {
});
}
SEASTAR_TEST_CASE(test_tablet_token_range_count) {
{
// Simple case: one large range covers a smaller one
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges overlap and should merge to cover r1
// r2: [0, 50] + [40, 100] -> merges to [0, 100]
// r1: [10, 90] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{10, 90}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 50}, {40, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges are adjacent (contiguous) and should merge
// r2: [0, 10] + [11, 20] -> merges to [0, 20]
// r1: [5, 15] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}, {11, 20}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r1 overlaps r2 but is not FULLY contained
// r2: [0, 10]
// r1: [5, 15] (Ends too late), [ -5, 5 ] (Starts too early)
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}, {-5, 5}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 0);
}
{
// A single merged range in r2 covers multiple distinct ranges in r1
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}, {30, 40}, {50, 60}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 3);
}
{
// Inputs are provided in random order, ensuring the internal sort works
utils::chunked_vector<tablet_token_range> r1 = {{50, 60}, {10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{50, 100}, {0, 40}};
// r2 merges effectively to [0, 40] and [50, 100]
// Both r1 items are covered
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 2);
}
{
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2_empty = {};
utils::chunked_vector<tablet_token_range> r1_empty = {};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2_empty) == 0);
BOOST_REQUIRE(co_await count_finished_tablets(r1_empty, r2) == 0);
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -25,7 +25,7 @@ from typing import Any, Optional, override
import pytest
import requests
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.cluster import NoHostAvailable, Session
from cassandra.query import SimpleStatement, named_tuple_factory
from ccmlib.scylla_node import ScyllaNode, NodeError
@@ -1135,14 +1135,6 @@ class TestCQLAudit(AuditTester):
session.execute("DROP TABLE test1")
def _get_attempt_count(self, session: Session, *, execution_profile=EXEC_PROFILE_DEFAULT, consistency_level: ConsistencyLevel = ConsistencyLevel.ONE) -> int:
# dtest env is using FlakyRetryPolicy which has `max_retries` attribute
cl_profile = session.execution_profile_clone_update(execution_profile, consistency_level=consistency_level)
policy = cl_profile.retry_policy
retries = getattr(policy, "max_retries", None)
assert retries is not None
return 1 + retries
def _test_insert_failure_doesnt_report_success_assign_nodes(self, session: Session = None):
all_nodes: set[ScyllaNode] = set(self.cluster.nodelist())
assert len(all_nodes) == 7
@@ -1162,7 +1154,6 @@ class TestCQLAudit(AuditTester):
for i in range(256):
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES ({i}, 1337)", consistency_level=ConsistencyLevel.THREE)
session.execute(stmt)
attempt_count = self._get_attempt_count(session, consistency_level=ConsistencyLevel.THREE)
token = rows_to_list(session.execute(f"SELECT token(k) FROM ks.test1 WHERE k = {i}"))[0][0]
@@ -1177,9 +1168,9 @@ class TestCQLAudit(AuditTester):
audit_partition_nodes = [address_to_node[address] for address in audit_nodes]
insert_node = address_to_node[insert_node.pop()]
kill_node = address_to_node[partitions.pop()]
return audit_partition_nodes, insert_node, kill_node, stmt.query_string, attempt_count
return audit_partition_nodes, insert_node, kill_node, stmt.query_string
return [], [], None, None, None
return [], [], None, None
@pytest.mark.exclude_errors("audit - Unexpected exception when writing log with: node_ip")
def test_insert_failure_doesnt_report_success(self):
@@ -1201,7 +1192,7 @@ class TestCQLAudit(AuditTester):
with self.assert_exactly_n_audit_entries_were_added(session, 1):
conn.execute(stmt)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail, query_fail_count = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
# TODO: remove the loop when scylladb#24473 is fixed
# We call get_host_id only to cache host_id
@@ -1240,8 +1231,8 @@ class TestCQLAudit(AuditTester):
# If any audit mode is not done yet, continue polling.
all_modes_done = True
for mode, rows in rows_dict.items():
rows_with_error = [row for row in rows if row.error and row.operation == query_to_fail]
if len(rows_with_error) == query_fail_count:
rows_with_error = list(filter(lambda r: r.error, rows))
if len(rows_with_error) == 6:
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
assert rows_with_error[0].error is True
assert rows_with_error[0].consistency == "THREE"

View File

@@ -220,14 +220,14 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 0, 100)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
skipped_bytes = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes = get_incremental_repair_sst_read_bytes(servers[0])
# Nothing to skip. Repair all data.
assert skipped_bytes == 0
assert read_bytes > 0
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
skipped_bytes2 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes2 = get_incremental_repair_sst_read_bytes(servers[0])
# Skip all. Nothing to repair
@@ -236,7 +236,7 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 200, 300)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
skipped_bytes3 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes3 = get_incremental_repair_sst_read_bytes(servers[0])
# Both skipped and read bytes should grow
@@ -272,7 +272,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert get_sstables_repaired_at(map0, token) == sstables_repaired_at
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
map1 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map1={map1}')
# Check sstables_repaired_at is increased by 1
@@ -288,7 +288,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert len(enable) == 1
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
map2 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map2={map2}')
# Check sstables_repaired_at is increased by 1
@@ -313,7 +313,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
# Repair should not finish with error
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
try:
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental', timeout=10)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, timeout=10)
assert False # Check the tablet repair is not supposed to finish
except TimeoutError:
logger.info("Repair timeout as expected")
@@ -329,7 +329,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# 1 add 0 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
@@ -355,7 +355,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
else:
assert False # Wrong ops
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# 1 add 1 skip 1 mark
for log in logs:
@@ -394,7 +394,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -402,7 +402,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -436,7 +436,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
@@ -445,7 +445,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 3
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -505,7 +505,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
await manager.server_start(servers[1].server_id)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
scylla_path = get_scylla_path(cql)
@@ -521,8 +521,8 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -532,7 +532,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
# Second repair
await inject_error_on(manager, "repair_tablet_no_update_sstables_repair_at", servers)
# some sstable repaired_at = 3, but sstables_repaired_at = 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await inject_error_off(manager, "repair_tablet_no_update_sstables_repair_at", servers)
scylla_path = get_scylla_path(cql)
@@ -561,8 +561,8 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -574,7 +574,7 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
last_tokens = [t.last_token for t in replicas]
for t in last_tokens[0::2]:
logging.info(f"Start repair for token={t}");
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t, incremental_mode='incremental') # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t) # sstables_repaired_at 3
scylla_path = get_scylla_path(cql)
@@ -595,7 +595,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -659,18 +659,13 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
assert read1 == 0
assert skip2 == 0
assert read2 > 0
await do_repair_and_check('incremental', 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
def check2(skip1, read1, skip2, read2):
assert skip1 == skip2
assert read1 == read2
await do_repair_and_check('disabled', 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
# FIXME: Incremental repair is disabled by default due to
# https://github.com/scylladb/scylladb/issues/26041 and
# https://github.com/scylladb/scylladb/issues/27414
await do_repair_and_check(None, 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
def check3(skip1, read1, skip2, read2):
assert skip1 < skip2
assert read1 == read2
@@ -682,14 +677,14 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
await do_repair_and_check('full', 1, rf'Starting tablet repair by API .* incremental_mode=full.*', check4)
@pytest.mark.asyncio
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
for s in servers:
time1 += get_repair_tablet_time_ms(s)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
for s in servers:
time2 += get_repair_tablet_time_ms(s)
@@ -699,7 +694,7 @@ async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
# Reproducer for https://github.com/scylladb/scylladb/issues/26346
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_incremental_repair_finishes_when_tablet_skips_end_repair_stage(manager):
async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
servers = await manager.servers_add(3, auto_rack_dc="dc1")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
@@ -724,7 +719,7 @@ async def test_incremental_repair_finishes_when_tablet_skips_end_repair_stage(ma
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_incremental_repair_rejoin_do_tablet_operation(manager):
async def test_repair_rejoin_do_tablet_operation(manager):
cmdline = ['--logger-log-level', 'raft_topology=debug']
servers = await manager.servers_add(3, auto_rack_dc="dc1", cmdline=cmdline)

View File

@@ -43,6 +43,86 @@ async def guarantee_repair_time_next_second():
# different than the previous one.
await asyncio.sleep(1)
async def do_test_tablet_repair_progress_split_merge(manager: ManagerClient, do_split=False, do_merge=False):
nr_tablets = 16
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=True, tablets=nr_tablets)
token = 'all'
logs = []
for s in servers:
logs.append(await manager.server_open_log(s.server_id))
# Skip repair for the listed tablet id
nr_tablets_skipped = 4
nr_tablets_repaired = nr_tablets - nr_tablets_skipped
await inject_error_on(manager, "tablet_repair_skip_sched", servers, params={'value':"0,1,5,8"})
# Request to repair all tablets
repair_res = await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
logging.info(f'{repair_res=}')
tablet_task_id = repair_res['tablet_task_id']
async def get_task_status(desc):
task_status = await manager.api.get_task_status(servers[0].ip_addr, tablet_task_id)
completed = int(task_status['progress_completed'])
total = int(task_status['progress_total'])
logging.info(f'{desc=} {completed=} {total=} {task_status=}')
return completed, total
async def wait_task_progress(wanted_complete, wanted_total):
while True:
completed, total = await get_task_status("wait_task_progress")
if completed == wanted_complete and total == wanted_total:
break
await asyncio.sleep(1)
async def get_task_status_and_check(desc):
completed, total = await get_task_status(desc)
assert completed == nr_tablets_repaired
assert total == nr_tablets
# 12 out of 16 tablets should finish
await wait_task_progress(nr_tablets_repaired, nr_tablets)
if do_split:
await get_task_status_and_check("before_split")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_increase", servers)
await logs[0].wait_for('Detected tablet split for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_increase", servers)
await get_task_status_and_check("after_split")
if do_merge:
await get_task_status_and_check("before_merge")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_decrease", servers)
await logs[0].wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
await get_task_status_and_check("after_merge")
# Wait for all repair to finish after all tablets can be scheduled to run repair
await inject_error_off(manager, "tablet_repair_skip_sched", servers)
await wait_task_progress(nr_tablets, nr_tablets)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=False, do_merge=False)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress_split(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=True)
@pytest.mark.asyncio
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/26844")
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_repair_progress_merge(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_merge=True)
@pytest.mark.asyncio
async def test_tablet_manual_repair(manager: ManagerClient):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)

View File

@@ -115,7 +115,7 @@ def compact_keyspace(cql, ks, flush_memtables=True):
args.extend([ks, cf])
run_nodetool(cql, "compact", *args)
def take_snapshot(cql, table, tag, skip_flush):
def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
ks, cf = table.split('.')
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
@@ -123,6 +123,8 @@ def take_snapshot(cql, table, tag, skip_flush):
args = ['--tag', tag, '--table', cf]
if skip_flush:
args.append('--skip-flush')
if use_sstable_identifier:
args.append('--use-sstable-identifier')
args.append(ks)
run_nodetool(cql, "snapshot", *args)

View File

@@ -14,6 +14,7 @@ import cassandra.cluster
from contextlib import contextmanager
import re
import ssl
import time
# This function normalizes the SSL cipher suite name (a string),
@@ -66,13 +67,12 @@ def test_tls_versions(cql):
# a regression test for #9216
def test_system_clients_stores_tls_info(cql):
if not cql.cluster.ssl_context:
table_result = cql.execute(f"SELECT * FROM system.clients")
for row in table_result:
assert not row.ssl_enabled
assert row.ssl_protocol is None
assert row.ssl_cipher_suite is None
if cql.cluster.ssl_context:
table_result = cql.execute(f"SELECT * FROM system.clients")
for row in table_result:
assert not row.ssl_enabled
assert row.ssl_protocol is None
assert row.ssl_cipher_suite is None
else:
# TLS v1.2 must be supported, because this is the default version that
# "cqlsh --ssl" uses. If this fact changes in the future, we may need
# to reconsider this test.
@@ -82,7 +82,8 @@ def test_system_clients_stores_tls_info(cql):
# so we need to retry until all connections are initialized and have their TLS info recorded in system.clients,
# otherwise we'd end up with some connections e.g. having their ssl_enabled=True but other fields still None.
expected_ciphers = [normalize_cipher(cipher['name']) for cipher in ssl.create_default_context().get_ciphers()]
for _ in range(1000): # try for up to 1000 * 0.01s = 10s seconds
deadline = time.time() + 10 # 10 seconds timeout
while time.time() < deadline:
rows = session.execute(f"SELECT * FROM system.clients")
if rows and all(
row.ssl_enabled
@@ -92,7 +93,7 @@ def test_system_clients_stores_tls_info(cql):
):
return
time.sleep(0.01)
pytest.fail(f"Not all connections have TLS data set correctly in system.clients after 10s seconds")
pytest.fail(f"Not all connections have TLS data set correctly in system.clients after 10 seconds")
@contextmanager

View File

@@ -99,7 +99,7 @@ def test_listsnapshots_no_snapshots(nodetool, request):
assert res.stdout == "Snapshot Details: \nThere are no snapshots\n"
def check_snapshot_out(res, tag, ktlist, skip_flush):
def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=False):
"""Check that the output of nodetool snapshot contains the expected messages"""
if len(ktlist) == 0:
@@ -110,7 +110,7 @@ def check_snapshot_out(res, tag, ktlist, skip_flush):
pattern = re.compile("Requested creating snapshot\\(s\\)"
f" for \\[{keyspaces}\\]"
f" with snapshot name \\[(.+)\\]"
f" and options \\{{skipFlush={str(skip_flush).lower()}\\}}")
f" and options \\{{skip_flush={str(skip_flush).lower()}, use_sstable_identifier={str(use_sstable_identifier).lower()}\\}}")
print(res)
print(pattern)
@@ -138,13 +138,13 @@ def test_snapshot_keyspace(nodetool):
res = nodetool("snapshot", "--tag", tag, "ks1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", "ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False)
@@ -155,13 +155,13 @@ def test_snapshot_keyspace_with_table(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl1,tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
@@ -186,7 +186,7 @@ class kn_param(NamedTuple):
def test_snapshot_keyspace_table_single_arg(nodetool, param, scylla_only):
tag = "my_snapshot"
req_params = {"tag": tag, "sf": "false", "kn": param.kn}
req_params = {"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": param.kn}
if param.cf:
req_params["cf"] = param.cf
@@ -202,19 +202,19 @@ def test_snapshot_ktlist(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1,ks2.tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1.tbl1,ks2.tbl2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1.tbl1,ks2.tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1,ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False)
@@ -229,7 +229,8 @@ def test_snapshot_ktlist(nodetool, option_name):
{"ks": ["ks1", "ks2"], "tbl": []},
])
@pytest.mark.parametrize("skip_flush", [False, True])
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
@pytest.mark.parametrize("use_sstable_identifier", [False, True])
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_identifier):
cmd = ["snapshot"]
params = {}
@@ -242,8 +243,11 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
if skip_flush:
cmd.append("--skip-flush")
if use_sstable_identifier:
cmd.append("--use-sstable-identifier")
params["sf"] = str(skip_flush).lower()
params["use_sstable_identifier"] = str(use_sstable_identifier).lower()
if ktlist:
if "tbl" in ktlist:
@@ -273,7 +277,7 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
expected_request("POST", "/storage_service/snapshots", params=params)
])
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush)
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush, use_sstable_identifier)
def test_snapshot_multiple_keyspace_with_table(nodetool):

View File

@@ -654,7 +654,7 @@ void cluster_repair_operation(scylla_rest_client& client, const bpo::variables_m
for (const auto& table : tables.empty() ? ks_to_cfs[keyspace] : tables) {
repair_params["table"] = table;
try {
sstring task_id = rjson::to_sstring(client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"]);
sstring task_id = client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"].GetString();
log("Starting repair with task_id={} keyspace={} table={}", task_id, keyspace, table);
@@ -2362,16 +2362,23 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
params["sf"] = "false";
}
if (vm.contains("use-sstable-identifier")) {
params["use_sstable_identifier"] = "true";
} else {
params["use_sstable_identifier"] = "false";
}
client.post("/storage_service/snapshots", params);
if (kn_msg.empty()) {
kn_msg = params["kn"];
}
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skip_flush={}, use_sstable_identifier={}}}\n",
kn_msg,
params["tag"],
params["sf"]);
params["sf"],
params["use_sstable_identifier"]);
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
}
@@ -4598,6 +4605,7 @@ For more information, see: {}
typed_option<sstring>("keyspace-table-list", "The keyspace.table pair(s) to snapshot, multiple ones can be joined with ','"),
typed_option<sstring>("tag,t", "The name of the snapshot"),
typed_option<>("skip-flush", "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)"),
typed_option<>("use-sstable-identifier", "Use the sstable identifier UUID, if available, rather than the sstable generation for the sstable file names within the snapshot dir and the manifest file"),
},
{
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),

View File

@@ -123,7 +123,7 @@ _get_distribution_components() {
continue
;;
esac
echo "$target"
echo $target
done
}
@@ -164,7 +164,7 @@ if [[ "${CLANG_BUILD}" = "INSTALL" ]]; then
echo "[clang-stage3] build the compiler applied CSPGO profile"
cd "${CLANG_BUILD_DIR}"
llvm-profdata merge build/profiles/csir-*.profraw -output=csir.prof
llvm-profdata merge build/csprofiles/default_*.profraw -output=csir.prof
llvm-profdata merge ir.prof csir.prof -output=combined.prof
rm -rf build
# linker flags are needed for BOLT

View File

@@ -29,8 +29,11 @@ class counted_data_source_impl : public data_source_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () {
return fun();
}).finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};
@@ -57,8 +60,11 @@ class counted_data_sink_impl : public data_sink_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () mutable {
return fun();
}).finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};
@@ -408,9 +414,8 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add
conn->_ssl_cipher_suite = cipher_suite;
return make_ready_future<bool>(true);
});
}).handle_exception([this, conn](std::exception_ptr ep) {
_logger.warn("Inspecting TLS connection failed: {}", ep);
return make_ready_future<bool>(false);
}).handle_exception([conn](std::exception_ptr ep) {
return seastar::make_exception_future<bool>(std::runtime_error(fmt::format("Inspecting TLS connection failed: {}", ep)));
})
: make_ready_future<bool>(true)
).then([conn] (bool ok){

View File

@@ -63,7 +63,7 @@ protected:
bool _ssl_enabled = false;
std::optional<sstring> _ssl_cipher_suite = std::nullopt;
std::optional<sstring> _ssl_protocol = std::nullopt;;
std::optional<sstring> _ssl_protocol = std::nullopt;
private:
future<> process_until_tenant_switch();

View File

@@ -832,12 +832,6 @@ to_bytes(bytes_view x) {
return bytes(x.begin(), x.size());
}
inline
bytes
to_bytes(std::string_view x) {
return to_bytes(to_bytes_view(x));
}
inline
bytes_opt
to_bytes_opt(bytes_view_opt bv) {

View File

@@ -204,7 +204,7 @@ public:
public:
template <typename Clock, typename Duration>
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr, std::source_location loc = std::source_location::current()) {
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr) {
if (!_shared_data) {
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
}
@@ -234,8 +234,7 @@ public:
throw;
}
catch (const std::exception& e) {
on_internal_error(errinj_logger, fmt::format("Error injection [{}] wait_for_message timeout: Called from `{}` @ {}:{}:{:d}: {}",
_shared_data->injection_name, loc.function_name(), loc.file_name(), loc.line(), loc.column(), e.what()));
on_internal_error(errinj_logger, "Error injection wait_for_message timeout: " + std::string(e.what()));
}
++_read_messages_counter;
}

View File

@@ -174,6 +174,13 @@ future<> print_with_extra_array(const rjson::value& value,
seastar::output_stream<char>& os,
size_t max_nested_level = default_max_nested_level);
// Returns a string_view to the string held in a JSON value (which is
// assumed to hold a string, i.e., v.IsString() == true). This is a view
// to the existing data - no copying is done.
inline std::string_view to_string_view(const rjson::value& v) {
return std::string_view(v.GetString(), v.GetStringLength());
}
// Copies given JSON value - involves allocation
rjson::value copy(const rjson::value& value);
@@ -229,27 +236,6 @@ rjson::value parse_yieldable(chunked_content&&, size_t max_nested_level = defaul
rjson::value from_string(const char* str, size_t size);
rjson::value from_string(std::string_view view);
// Returns a string_view to the string held in a JSON value (which is
// assumed to hold a string, i.e., v.IsString() == true). This is a view
// to the existing data - no copying is done.
inline std::string_view to_string_view(const rjson::value& v) {
return std::string_view(v.GetString(), v.GetStringLength());
}
// Those functions must be called on json string object.
// They make a copy of underlying data so it's safe to destroy
// rjson::value afterwards.
//
// Rapidjson's GetString method alone is not good enough
// for string conversion because it needs to scan the string
// unnecessarily and GetStringLength could be used to avoid that.
inline sstring to_sstring(const rjson::value& str) {
return sstring(str.GetString(), str.GetStringLength());
}
inline std::string to_string(const rjson::value& str) {
return std::string(str.GetString(), str.GetStringLength());
}
// Returns a pointer to JSON member if it exists, nullptr otherwise
rjson::value* find(rjson::value& value, std::string_view name);
const rjson::value* find(const rjson::value& value, std::string_view name);
@@ -391,7 +377,7 @@ rjson::value from_string_map(const std::map<sstring, sstring>& map);
sstring quote_json_string(const sstring& value);
inline bytes base64_decode(const value& v) {
return ::base64_decode(to_string_view(v));
return ::base64_decode(std::string_view(v.GetString(), v.GetStringLength()));
}
// A writer which allows writing json into an std::ostream in a streaming manner.

View File

@@ -10,7 +10,6 @@
#include "utils/http.hh"
#include "utils/s3/client.hh"
#include "utils/s3/default_aws_retry_strategy.hh"
#include "utils/rjson.hh"
#include <rapidjson/document.h>
#include <rapidjson/error/en.h>
#include <seastar/http/client.hh>
@@ -73,7 +72,7 @@ future<> instance_profile_credentials_provider::update_credentials() {
}
s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sstring& creds_response) {
rjson::document document;
rapidjson::Document document;
document.Parse(creds_response.data());
if (document.HasParseError()) {
@@ -82,9 +81,9 @@ s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sst
}
// Retrieve credentials
return {.access_key_id = rjson::to_string(document["AccessKeyId"]),
.secret_access_key = rjson::to_string(document["SecretAccessKey"]),
.session_token = rjson::to_string(document["Token"]),
return {.access_key_id = document["AccessKeyId"].GetString(),
.secret_access_key = document["SecretAccessKey"].GetString(),
.session_token = document["Token"].GetString(),
// Set the expiration to one minute earlier to ensure credentials are renewed slightly before they expire
.expires_at = seastar::lowres_clock::now() + std::chrono::seconds(session_duration - 60)};
}

View File

@@ -152,7 +152,7 @@ seastar::future<bool> client::check_status() {
}
auto resp = co_await std::move(f);
auto json = rjson::parse(std::move(resp.content));
co_return json.IsString() && rjson::to_string_view(json) == "SERVING";
co_return json.IsString() && json.GetString() == std::string_view("SERVING");
}
seastar::future<> client::close() {