Compare commits

...

27 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
77ee7f3417 Revert "Merge 'Add option to use sstable identifier in snapshot' from Benny Halevy"
This reverts commit 8192f45e84.

The merge exposed a bug where truncate (via drop) fails and causes Raft
errors, leading to schema inconsistencies across nodes. This results in
test_table_drop_with_auto_snapshot failures with 'Keyspace test does not exist'
errors.

The specific problematic change was in commit 19b6207f which modified
truncate_table_on_all_shards to set use_sstable_identifier = true. This
causes exceptions during truncate that are not properly handled, leading
to Raft applier fiber stopping and nodes losing schema synchronization.
2025-12-12 03:55:13 +00:00
copilot-swe-agent[bot]
0ff89a58be Initial plan 2025-12-12 03:48:12 +00:00
Yaron Kaikov
f7ffa395a8 workflows: trigger CI automatically when conflicts label is removed
Add pull_request_target event with unlabeled type to trigger-scylla-ci
workflow. This allows automatic CI triggering when the 'conflicts' label
is removed from a PR, in addition to the existing manual trigger via
comment.

The workflow now runs when:
- A user posts a comment with '@scylladbbot trigger-ci' (existing)
- The 'conflicts' label is removed from a PR (new)

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-84

Closes scylladb/scylladb#27521
2025-12-11 16:48:06 +02:00
Piotr Smaron
3fa3b920de Update CODEOWNERS to remove redundant entries
Removing myself as I have no maintainer's permissions to review the code

Closes scylladb/scylladb#27576
2025-12-11 16:47:08 +02:00
Botond Dénes
e7ca52ee79 Merge 'api: storage_service/tablets/repair: disable incremental repair by default' from Benny Halevy
Change the default incremental_mode to `disabled` due to https://github.com/scylladb/scylladb/issues/26041 and https://github.com/scylladb/scylladb/issues/27414

** Backport to 2025.4 where 611918056a was introduced **

Closes scylladb/scylladb#27530

* github.com:scylladb/scylladb:
  api: storage_service/tablets/repair: disable incremental repair by default
  docs: nodetool-commands: cluster: repair: fix incremental-mode example
2025-12-11 15:23:09 +02:00
Botond Dénes
730eca5dac Merge 'Remove noexcept from storage_group and table functions to allow exception propagation' from null
Fixed a critical bug where `storage_group::for_each_compaction_group()` was incorrectly marked `noexcept`, causing `std::terminate` when actions threw exceptions (e.g., `utils::memory_limit_reached` during memory-constrained reader creation).

**Changes made:**
1. Removed `noexcept` from `storage_group::for_each_compaction_group()` declaration and implementation
2. Removed `noexcept` from `storage_group::compaction_groups()` overloads (they call for_each_compaction_group)
3. Removed `noexcept` from `storage_group::live_disk_space_used()` and `memtable_count()` (they call compaction_groups())
4. Kept `noexcept` on `storage_group::flush()` - it's a coroutine that automatically captures exceptions and returns them as exceptional futures
5. Removed `noexcept` from `table_load_stats()` functions in base class, table, and storage group managers

**Rationale:**

As noted by reviewers, there's no reason to kill the server if these functions throw. For coroutines returning futures, `noexcept` is appropriate because Seastar automatically captures exceptions and returns them as exceptional futures. For other functions, proper exception handling allows the system to recover gracefully instead of terminating.

Fixes #27475

Closes scylladb/scylladb#27476

* github.com:scylladb/scylladb:
  replica: Remove unnecessary noexcept
  replica: Remove noexcept from compaction_groups() functions
  replica: Remove noexcept from storage_group::for_each_compaction_group
2025-12-11 15:17:35 +02:00
Benny Halevy
c8cff94a5a api: storage_service/tablets/repair: disable incremental repair by default
Change the default incremental_mode to `disabled` due to
https://github.com/scylladb/scylladb/issues/26041 and
https://github.com/scylladb/scylladb/issues/27414

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-11 14:25:21 +02:00
Benny Halevy
5fae4cdf80 docs: nodetool-commands: cluster: repair: fix incremental-mode example
There is no 'regular' incremental mode anymore.
The example seems have meant 'disabled'.

Fixes #27587

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-11 14:25:11 +02:00
Marcin Maliszkiewicz
8bbcaacba1 auth: always catch by const reference
This is best practice.

Closes scylladb/scylladb#27525
2025-12-11 12:42:30 +01:00
Yaron Kaikov
3dfa5ebd7f Add JIRA issue validation to backport PR fixes check
Extend the Fixes validation pattern to also accept JIRA issue references
(format: [A-Z]+-\d+) in addition to GitHub issue references. This allows
backport PRs to reference JIRA issues in the format 'Fixes: PROJECT-123'.

Fixes: https://github.com/scylladb/scylladb/issues/27571

Closes scylladb/scylladb#27572
2025-12-11 12:23:16 +02:00
Avi Kivity
24264e24bb Revert "repair: Add tablet repair progress report support"
This reverts commit faad0167d7. It causes
a regression in

test_two_tablets_concurrent_repair_and_migration_repair_writer_level

in debug mode (with ~5%-10% probability).

Fixes #27510.

Closes scylladb/scylladb#27560
2025-12-11 12:18:11 +02:00
Nadav Har'El
0c64e3be9a Merge 'Unify and fix rjson string and string_view conversions' from Marcin Maliszkiewicz
This patch-set consolidates and corrects rjson string conversion handling.
It removes unnecessary string copies, ensures proper length usage and
replaces ad-hoc conversions with consistent helper functions.

Overall, the changes make rjson string handling safer, faster, and more uniform across the codebase.

Backport: no, it's a refactor

Closes scylladb/scylladb#27394

* github.com:scylladb/scylladb:
  fix rjson::value to bytes conversion with missing GetStringLength call
  alternator: change type from string to string_view in should_add_capacity
  fix rjson::value to string_view conversion with missing GetStringLength call
  use rjson::to_string_view when rjson::value gets converted using GetStringLength
  use rjson::to_sstring and rjson::to_string for various string conversions
  utils: use rjson document wrapper in instance_profile_credentials_provider::parse_creds
  utils: move rjson::to_string_view func to string related place
  utils: add to_sstring and to_string rjson helper
2025-12-11 12:05:41 +02:00
Marcin Maliszkiewicz
d5b63df46e transport: remove redundant futurize_invoke from counted data sink and source
Closes scylladb/scylladb#27526
2025-12-11 10:32:16 +03:00
Dario Mirovic
f545ed37bc test: dtest: audit_test.py: fix audit error log detection
`test_insert_failure_doesnt_report_success` test in `test/cluster/dtest/audit_test.py`
has an insert statement that is expected to fail. Dtest environment uses
`FlakyRetryPolicy`, which has `max_retries = 5`. 1 initial fail and 5 retry fails
means we expect 6 error audit logs.

The test failed because `create keyspace ks` failed once, then succeeded on retry.
It allowed the test to proceed properly, but the last part of the test that expects
exactly 6 failed queries actually had 7.

The goal of this patch is to make sure there are exactly 6 = 1 + `max_retries` failed
queries, counting only the query expected to fail. If other queries fail with
successful retry, it's fine. If other queries fail without successful retry, the test
will fail, as it should in such situations. They are not related to this expected
failed insert statement.

Fixes #27322

Closes scylladb/scylladb#27378
2025-12-11 10:17:07 +03:00
Benny Halevy
5f13880a91 utils: error_injection: wait_for_message: print injection_name and caller source_location on timeout
When waiting for the condition variable times out
we call on_internal_error, but unfortunately, the backtrace
it generates is obfuscated by
`coroutine_handle<seastar::internal::coroutine_traits_base<void>::promise_type>::resume`.

To make the log more useful, print the error injection name
and the caller's source_location in the timeout error message.

Fixes #27531

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#27532
2025-12-10 23:25:54 +01:00
Yaron Kaikov
d3e199984e auto-backport.py: modify instruction for making PR ready for review
Update the comment sent when PR has conflicts with clear instrauctions how to make the PR Ready for review

Fixes: https://scylladb.atlassian.net/browse/RELENG-152

Closes scylladb/scylladb#27547
2025-12-10 14:53:38 +02:00
Nadav Har'El
8822c23ad4 Merge 'test: cqlpy: test_protocol_exceptions.py: increase cpp exceptions thr…' from Dario Mirovic
…eshold

The initial problem:

Some of the tests in test_protocol_exceptions.py started failing. The failure is on the condition that no more than `cpp_exception_threshold` happened.

Test logic:

These tests assert that specific code paths do not throw an exception anymore. Initial implementation ran a code path once, and asserted there were 0 exceptions. Sometimes an exception or several can occur, not directly related to the code paths the tests check, but those would fail the tests.

The solution was to run the tests multiple times. If there is a regression, there would be at least as many exceptions thrown as there are test runs. If there is no regression, a few exceptions might happen, up to 10 per 100 test runs. I have arbitrarily chosen `run_count = 100` and `cpp_exception_threshold = 10` values.

Note that the exceptions are counted per shard, not per code path.

The new problem:

The occassional exceptions thrown by some parts of the server now throw a bit more than before. Based on the logs linked on the issues, it is usually 12.

There are possibly multiple ways to resolve the issue. I have considered logging exceptions and parsing them. I would have to filter exception logs only for wanted exceptions. However, if a new, different exception is introduced, it might not be counted.

Another approach is to just increase the threshold a bit. The issue of throwing more exceptions than before in some other server modules should be addressed by a set of tests for that module, just like these tests check protocol exceptions, not caring who used protocol check code paths.

For those reasons, the solution implemented here is to increase `cpp_exception_threshold` to `20`. It will not make the tests unreliable, because, as mentioned, if there is a regression, there would be at least `run_count` exceptions per `run_count` test runs (1 exception per single test run).

Still, to make "background exceptions" occurence a bit more normalized, `run_count` too is doubled, from `100` to `200`. At the first glance this looks like nothing is changed, but actually doubling both run count and exception threshold here implies that the burst does not scale as much as run count, it is just that the "jitter" is bigger than the old threshold.

Also, this patch series enables debug logging for `exception` logger. This will allow us to inspect which exceptions happened if a protocol exceptions test fails again.

Fixes #27247
Fixes #27325

Issue observed on master and branch-2025.4. The tests, in the same form, exist on master, branch-2025.4, branch-2025.3, branch-2025.2, and branch-2025.1. Code change is simple, and no issue is expected with backport automation. Thus, backports for all the aforementioned versions is requested.

Closes scylladb/scylladb#27412

* github.com:scylladb/scylladb:
  test: cqlpy: test_protocol_exceptions.py: enable debug exception logging
  test: cqlpy: test_protocol_exceptions.py: increase cpp exceptions threshold
2025-12-10 10:53:30 +02:00
Marcin Maliszkiewicz
be9992cfb3 fix rjson::value to bytes conversion with missing GetStringLength call 2025-12-09 19:27:22 +01:00
Marcin Maliszkiewicz
daf00a7f24 alternator: change type from string to string_view in should_add_capacity
It avoids allocation.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
62962f33bb fix rjson::value to string_view conversion with missing GetStringLength call
In some cases we unnecessarily convert to string which
causes a copy. In other we convert without calling
GetStringLength which causes iteration to dermine length
which is already known. In some cases we do even both.
This commit fixes that.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
060c2f7c0d use rjson::to_string_view when rjson::value gets converted using GetStringLength
This commit is only cosmetics, changes calls to GetStringLength
into rjson::to_string_view with the same underlying implementation.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
64149b57c3 use rjson::to_sstring and rjson::to_string for various string conversions
In some cases we ommit size checking which is wrong
as according to rapid json documentation strings may
contain \0 byte in the middle.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
4b004fcdfc utils: use rjson document wrapper in instance_profile_credentials_provider::parse_creds
So that we can use our common utility functions.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
5e38b3071b utils: move rjson::to_string_view func to string related place 2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
225b3351fc utils: add to_sstring and to_string rjson helper
So that conversion code is common and it's easier
to avoid accidental type conversions. Additionally
according to rapid json library size must be checked
explicitly, this also avoids extra iteration in char*
to (s)string conversion.
2025-12-09 19:27:21 +01:00
Dario Mirovic
c30b326033 test: cqlpy: test_protocol_exceptions.py: enable debug exception logging
Enable debug logging for "exception" logger inside protocol exception tests.
The exceptions will be logged, and it will be possible to see which ones
occured if a protocol exceptions test fails.

Refs #27272
Refs #27325
2025-12-09 01:35:42 +01:00
Dario Mirovic
807fc68dc5 test: cqlpy: test_protocol_exceptions.py: increase cpp exceptions threshold
The initial problem:

Some of the tests in test_protocol_exceptions.py started failing. The failure is
on the condition that no more than `cpp_exception_threshold` happened.

Test logic:

These tests assert that specific code paths do not throw an exception anymore.
Initial implementation ran a code path once, and asserted there were 0 exceptions.
Sometimes an exception or several can occur, not directly related to the code paths
the tests check, but those would fail the tests.

The solution was to run the tests multiple times. If there is a regression, there
would be at least as many exceptions thrown as there are test runs. If there is no
regression, a few exceptions might happen, up to 10 per 100 test runs.
I have arbitrarily chosen `run_count = 100` and `cpp_exception_threshold = 10` values.

Note that the exceptions are counted per shard, not per code path.

The new problem:

The occassional exceptions thrown by some parts of the server now throw a bit more
than before. Based on the logs linked on the issues, it is usually 12.

There are possibly multiple ways to resolve the issue. I have considered logging
exceptions and parsing them. I would have to filter exception logs only for wanted
exceptions. However, if a new, different exception is introduced, it might not be
counted.

Another approach is to just increase the threshold a bit. The issue of throwing
more exceptions than before in some other server modules should be addressed by
a set of tests for that module, just like these tests check protocol exceptions,
not caring who used protocol check code paths.

For those reasons, the solution implemented here is to increase `cpp_exception_threshold`
to `20`. It will not make the tests unreliable, because, as mentioned, if there is a
regression, there would be at least `run_count` exceptions per `run_count` test runs
(1 exception per single test run).

Still, to make "background exceptions" occurence a bit more normalized, `run_count` too
is doubled, from `100` to `200`. At the first glance this looks like nothing is changed,
but actually doubling both run count and exception threshold here implies that the
exception burst does not scale as much as run count, it is just that the "jitter" is
bigger than the old threshold.

Fixes #27247
Fixes #27325
2025-12-09 01:34:48 +01:00
54 changed files with 312 additions and 766 deletions

8
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall @ptrsmrn
auth/* @nuivall
# CACHE
row_cache* @tgrabiec
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall @ptrsmrn
cql3/* @tgrabiec @nuivall
# COUNTERS
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
counters* @nuivall
tests/counter_test* @nuivall
# DOCS
docs/* @annastuchlik @tzach

View File

@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
if is_draft:
labels_to_add.append("conflicts")
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
pr_comment += "Please resolve them and mark this PR as ready for review"
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
backport_pr.create_issue_comment(pr_comment)
# Apply all labels at once if we have any

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -3,10 +3,13 @@ name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
pull_request_target:
types:
- unlabeled
jobs:
trigger-jenkins:
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI-Route Jenkins Job

View File

@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
if (!comparison_operator.IsString()) {
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
}
std::string op = comparison_operator.GetString();
std::string op = rjson::to_string(comparison_operator);
auto it = ops.find(op);
if (it == ops.end()) {
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
}
if (kv1.name == "S") {
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
return cmp(rjson::to_string_view(kv1.value),
rjson::to_string_view(kv2.value));
}
if (kv1.name == "B") {
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
return check_BETWEEN(rjson::to_string_view(kv_v.value),
rjson::to_string_view(kv_lb.value),
rjson::to_string_view(kv_ub.value),
bounds_from_query);
}
if (kv_v.name == "B") {

View File

@@ -8,6 +8,8 @@
#include "consumed_capacity.hh"
#include "error.hh"
#include "utils/rjson.hh"
#include <fmt/format.h>
namespace alternator {
@@ -32,12 +34,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
if (!return_consumed->IsString()) {
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
}
std::string consumed = return_consumed->GetString();
std::string_view consumed = rjson::to_string_view(*return_consumed);
if (consumed == "INDEXES") {
throw api_error::validation("INDEXES consumed capacity is not supported");
}
if (consumed != "TOTAL") {
throw api_error::validation("Unknown consumed capacity "+ consumed);
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
}
return true;
}

View File

@@ -419,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
if (!table_name_value->IsString()) {
throw api_error::validation("Non-string TableName field in request");
}
std::string table_name = table_name_value->GetString();
std::string table_name = rjson::to_string(*table_name_value);
return table_name;
}
@@ -546,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
// does exist but the index does not (ValidationException).
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
@@ -587,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
attribute_name, value));
}
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
return rjson::to_string(*attribute_value);
}
// Convenience function for getting the value of a boolean attribute, or a
@@ -1080,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
}
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (attribute_info["AttributeName"].GetString() == name) {
auto type = attribute_info["AttributeType"].GetString();
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
data_type dt = parse_key_type(type);
if (computed_column) {
// Computed column for GSI (doesn't choose a real column as-is
@@ -1116,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First element of KeySchema must be an object");
}
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
throw api_error::validation("First key in KeySchema must be a HASH key");
}
v = rjson::find((*key_schema)[0], "AttributeName");
@@ -1124,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First key in KeySchema must have string AttributeName");
}
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
std::string hash_key = v->GetString();
std::string hash_key = rjson::to_string(*v);
std::string range_key;
if (key_schema->Size() == 2) {
if (!(*key_schema)[1].IsObject()) {
throw api_error::validation("Second element of KeySchema must be an object");
}
v = rjson::find((*key_schema)[1], "KeyType");
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
throw api_error::validation("Second key in KeySchema must be a RANGE key");
}
v = rjson::find((*key_schema)[1], "AttributeName");
@@ -1887,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string def_type = type_to_string(def.type);
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
auto type = attribute_info["AttributeType"].GetString();
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (type != def_type) {
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
}
@@ -2362,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
bytes column_name = to_bytes(it->name.GetString());
bytes column_name = to_bytes(rjson::to_string_view(it->name));
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
@@ -2783,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
return;
}
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
if (!used.contains(it->name.GetString())) {
if (!used.contains(rjson::to_string(it->name))) {
throw api_error::validation(
format("{} has spurious '{}', not used in {}",
field_name, it->name.GetString(), operation));
field_name, rjson::to_string_view(it->name), operation));
}
}
}
@@ -3000,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
}
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
try {
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
} catch(data_dictionary::no_such_column_family&) {
@@ -3386,7 +3386,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
}
rjson::value newv = rjson::empty_object();
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
std::string attr = it->name.GetString();
std::string attr = rjson::to_string(it->name);
auto x = members.find(attr);
if (x != members.end()) {
if (x->second) {
@@ -3606,7 +3606,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, it->GetString());
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
@@ -4272,12 +4272,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
// Note that it.key() is the name of the column, *it is the operation
bytes column_name = to_bytes(it->name.GetString());
bytes column_name = to_bytes(rjson::to_string_view(it->name));
const column_definition* cdef = _schema->get_column_definition(column_name);
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
}
std::string action = (it->value)["Action"].GetString();
std::string action = rjson::to_string((it->value)["Action"]);
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -5474,7 +5474,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
std::string key = it->name.GetString();
sstring key = rjson::to_sstring(it->name);
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
@@ -5482,13 +5482,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (sstring(key) == pk_cdef.name_as_text()) {
if (key == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
@@ -5889,7 +5889,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
int limit = limit_json ? limit_json->GetInt() : 100;
if (limit < 1 || limit > 100) {
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");

View File

@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
return {"", nullptr};
}
auto it = v.MemberBegin();
const std::string it_key = it->name.GetString();
const std::string it_key = rjson::to_string(it->name);
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
return {std::move(it_key), nullptr};
}

View File

@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
}
sstring attribute_name(v->GetString(), v->GetStringLength());
sstring attribute_name = rjson::to_sstring(*v);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {

View File

@@ -729,14 +729,6 @@
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"use_sstable_identifier",
"description":"Use the sstable identifier UUID, if available, rather than the sstable generation.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
},
@@ -3059,7 +3051,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -2020,16 +2020,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto tag = req->get_query_param("tag");
auto column_families = split(req->get_query_param("cf"), ",");
auto sfopt = req->get_query_param("sf");
auto usiopt = req->get_query_param("use_sstable_identifier");
db::snapshot_options opts = {
.skip_flush = strcasecmp(sfopt.c_str(), "true") == 0,
.use_sstable_identifier = strcasecmp(usiopt.c_str(), "true") == 0
};
auto sf = db::snapshot_ctl::skip_flush(strcasecmp(sfopt.c_str(), "true") == 0);
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
try {
if (column_families.empty()) {
co_await snap_ctl.local().take_snapshot(tag, keynames, opts);
co_await snap_ctl.local().take_snapshot(tag, keynames, sf);
} else {
if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified");
@@ -2037,7 +2033,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, sf);
}
co_return json_void();
} catch (...) {
@@ -2072,8 +2068,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
}
compaction::compaction_stats stats;

View File

@@ -146,8 +146,7 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
auto info = parse_scrub_options(ctx, std::move(req));
if (!info.snapshot_tag.empty()) {
db::snapshot_options opts = {.skip_flush = false, .use_sstable_identifier = false};
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts);
co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, db::snapshot_ctl::skip_flush::no);
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();

View File

@@ -77,9 +77,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
}
continue;
} catch (std::out_of_range&) {
} catch (const std::out_of_range&) {
// just fallthrough
} catch (boost::regex_error&) {
} catch (const boost::regex_error&) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
}
}

View File

@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
try {
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
} catch (exceptions::already_exists_exception&) {}
} catch (const exceptions::already_exists_exception&) {}
}
}

View File

@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
} else {
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
}
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
}
}
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
[resource](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}

View File

@@ -337,13 +337,13 @@ future<authenticated_user> password_authenticator::authenticate(
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
co_return username;
} catch (std::system_error &) {
} catch (const std::system_error &) {
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (exceptions::authentication_exception& e) {
} catch (const exceptions::authentication_exception& e) {
std::throw_with_nested(e);
} catch (exceptions::unavailable_exception& e) {
} catch (const exceptions::unavailable_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));

View File

@@ -226,7 +226,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (::service::group0_concurrent_modification&) {
} catch (const ::service::group0_concurrent_modification&) {
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}

View File

@@ -192,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
log.info("Created default superuser role '{}'.", _superuser);
} catch(const exceptions::unavailable_exception& e) {
} catch (const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}

View File

@@ -81,7 +81,7 @@ public:
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::authentication_exception&) {
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -126,7 +126,7 @@ public:
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (exceptions::authentication_exception&) {
} catch (const exceptions::authentication_exception&) {
_complete = true;
return {};
}
@@ -141,7 +141,7 @@ public:
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::authentication_exception&) {
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}

View File

@@ -65,7 +65,7 @@ future<> snapshot_ctl::run_snapshot_modify_operation(noncopyable_function<future
});
}
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
if (tag.empty()) {
throw std::runtime_error("You must supply a snapshot name.");
}
@@ -74,21 +74,21 @@ future<> snapshot_ctl::take_snapshot(sstring tag, std::vector<sstring> keyspace_
std::ranges::copy(_db.local().get_keyspaces() | std::views::keys, std::back_inserter(keyspace_names));
};
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), opts, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), opts);
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), sf, this] () mutable {
return do_take_snapshot(std::move(tag), std::move(keyspace_names), sf);
});
}
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts) {
future<> snapshot_ctl::do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf) {
co_await coroutine::parallel_for_each(keyspace_names, [tag, this] (const auto& ks_name) {
return check_snapshot_not_exist(ks_name, tag);
});
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), opts] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, opts);
co_await coroutine::parallel_for_each(keyspace_names, [this, tag = std::move(tag), sf] (const auto& ks_name) {
return replica::database::snapshot_keyspace_on_all_shards(_db, ks_name, tag, bool(sf));
});
}
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
if (ks_name.empty()) {
throw std::runtime_error("You must supply a keyspace name");
}
@@ -99,14 +99,14 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
throw std::runtime_error("You must supply a snapshot name.");
}
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), opts] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), opts);
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag), sf] () mutable {
return do_take_column_family_snapshot(std::move(ks_name), std::move(tables), std::move(tag), sf);
});
}
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf) {
co_await check_snapshot_not_exist(ks_name, tag, tables);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), bool(sf));
}
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {

View File

@@ -38,13 +38,10 @@ class backup_task_impl;
} // snapshot namespace
struct snapshot_options {
bool skip_flush = false;
bool use_sstable_identifier = false;
};
class snapshot_ctl : public peering_sharded_service<snapshot_ctl> {
public:
using skip_flush = bool_class<class skip_flush_tag>;
struct table_snapshot_details {
int64_t total;
int64_t live;
@@ -73,8 +70,8 @@ public:
*
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_snapshot(sstring tag, snapshot_options opts = {}) {
return take_snapshot(tag, {}, opts);
future<> take_snapshot(sstring tag, skip_flush sf = skip_flush::no) {
return take_snapshot(tag, {}, sf);
}
/**
@@ -83,7 +80,7 @@ public:
* @param tag the tag given to the snapshot; may not be null or empty
* @param keyspace_names the names of the keyspaces to snapshot; empty means "all"
*/
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {});
future<> take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
/**
* Takes the snapshot of multiple tables. A snapshot name must be specified.
@@ -92,7 +89,7 @@ public:
* @param tables a vector of tables names to snapshot
* @param tag the tag given to the snapshot; may not be null or empty
*/
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
/**
* Remove the snapshot with the given name from the given keyspaces.
@@ -130,8 +127,8 @@ private:
friend class snapshot::backup_task_impl;
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, snapshot_options opts = {} );
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
future<> do_take_snapshot(sstring tag, std::vector<sstring> keyspace_names, skip_flush sf = skip_flush::no);
future<> do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, skip_flush sf = skip_flush::no);
};
}

View File

@@ -137,8 +137,6 @@ namespace {
system_keyspace::ROLE_PERMISSIONS,
system_keyspace::DICTS,
system_keyspace::VIEW_BUILDING_TASKS,
// repair tasks
system_keyspace::REPAIR_TASKS,
};
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
props.is_group0_table = true;
@@ -464,24 +462,6 @@ schema_ptr system_keyspace::repair_history() {
return schema;
}
schema_ptr system_keyspace::repair_tasks() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, REPAIR_TASKS);
return schema_builder(NAME, REPAIR_TASKS, std::optional(id))
.with_column("task_uuid", uuid_type, column_kind::partition_key)
.with_column("operation", utf8_type, column_kind::clustering_key)
// First and last token for of the tablet
.with_column("first_token", long_type, column_kind::clustering_key)
.with_column("last_token", long_type, column_kind::clustering_key)
.with_column("timestamp", timestamp_type)
.with_column("table_uuid", uuid_type, column_kind::static_column)
.set_comment("Record tablet repair tasks")
.with_hash_version()
.build();
}();
return schema;
}
schema_ptr system_keyspace::built_indexes() {
static thread_local auto built_indexes = [] {
schema_builder builder(generate_legacy_id(NAME, BUILT_INDEXES), NAME, BUILT_INDEXES,
@@ -2331,7 +2311,6 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
corrupt_data(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(),
repair_tasks(),
v3::views_builds_in_progress(), v3::built_views(),
v3::scylla_views_builds_in_progress(),
v3::truncated(),
@@ -2573,32 +2552,6 @@ future<> system_keyspace::get_repair_history(::table_id table_id, repair_history
});
}
future<utils::chunked_vector<canonical_mutation>> system_keyspace::get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts) {
// Default to timeout the repair task entries in 10 days, this should be enough time for the management tools to query
constexpr int ttl = 10 * 24 * 3600;
sstring req = format("INSERT INTO system.{} (task_uuid, operation, first_token, last_token, timestamp, table_uuid) VALUES (?, ?, ?, ?, ?, ?) USING TTL {}", REPAIR_TASKS, ttl);
auto muts = co_await _qp.get_mutations_internal(req, internal_system_query_state(), ts,
{entry.task_uuid.uuid(), repair_task_operation_to_string(entry.operation),
entry.first_token, entry.last_token, entry.timestamp, entry.table_uuid.uuid()});
utils::chunked_vector<canonical_mutation> cmuts = {muts.begin(), muts.end()};
co_return cmuts;
}
future<> system_keyspace::get_repair_task(tasks::task_id task_uuid, repair_task_consumer f) {
sstring req = format("SELECT * from system.{} WHERE task_uuid = {}", REPAIR_TASKS, task_uuid);
co_await _qp.query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_task_entry ent;
ent.task_uuid = tasks::task_id(row.get_as<utils::UUID>("task_uuid"));
ent.operation = repair_task_operation_from_string(row.get_as<sstring>("operation"));
ent.first_token = row.get_as<int64_t>("first_token");
ent.last_token = row.get_as<int64_t>("last_token");
ent.timestamp = row.get_as<db_clock::time_point>("timestamp");
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
co_await f(std::move(ent));
co_return stop_iteration::no;
});
}
future<gms::generation_type> system_keyspace::increment_and_get_generation() {
auto req = format("SELECT gossip_generation FROM system.{} WHERE key='{}'", LOCAL, LOCAL);
auto rs = co_await _qp.execute_internal(req, cql3::query_processor::cache_internal::yes);
@@ -3770,35 +3723,4 @@ future<> system_keyspace::apply_mutation(mutation m) {
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
}
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<system_keyspace::repair_task_operation, sstring> repair_task_operation_to_name = {
{system_keyspace::repair_task_operation::requested, "requested"},
{system_keyspace::repair_task_operation::finished, "finished"},
};
static const std::unordered_map<sstring, system_keyspace::repair_task_operation> repair_task_operation_from_name = std::invoke([] {
std::unordered_map<sstring, system_keyspace::repair_task_operation> result;
for (auto&& [v, s] : repair_task_operation_to_name) {
result.emplace(s, v);
}
return result;
});
sstring system_keyspace::repair_task_operation_to_string(system_keyspace::repair_task_operation op) {
auto i = repair_task_operation_to_name.find(op);
if (i == repair_task_operation_to_name.end()) {
on_internal_error(slogger, format("Invalid repair task operation: {}", static_cast<int>(op)));
}
return i->second;
}
system_keyspace::repair_task_operation system_keyspace::repair_task_operation_from_string(const sstring& name) {
return repair_task_operation_from_name.at(name);
}
} // namespace db
auto fmt::formatter<db::system_keyspace::repair_task_operation>::format(const db::system_keyspace::repair_task_operation& op, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", db::system_keyspace::repair_task_operation_to_string(op));
}

View File

@@ -57,8 +57,6 @@ namespace paxos {
struct topology_request_state;
class group0_guard;
class raft_group0_client;
}
namespace netw {
@@ -186,7 +184,6 @@ public:
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
static constexpr auto RAFT_SNAPSHOT_CONFIG = "raft_snapshot_config";
static constexpr auto REPAIR_HISTORY = "repair_history";
static constexpr auto REPAIR_TASKS = "repair_tasks";
static constexpr auto GROUP0_HISTORY = "group0_history";
static constexpr auto DISCOVERY = "discovery";
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
@@ -263,7 +260,6 @@ public:
static schema_ptr raft();
static schema_ptr raft_snapshots();
static schema_ptr repair_history();
static schema_ptr repair_tasks();
static schema_ptr group0_history();
static schema_ptr discovery();
static schema_ptr broadcast_kv_store();
@@ -402,22 +398,6 @@ public:
int64_t range_end;
};
enum class repair_task_operation {
requested,
finished,
};
static sstring repair_task_operation_to_string(repair_task_operation op);
static repair_task_operation repair_task_operation_from_string(const sstring& name);
struct repair_task_entry {
tasks::task_id task_uuid;
repair_task_operation operation;
int64_t first_token;
int64_t last_token;
db_clock::time_point timestamp;
table_id table_uuid;
};
struct topology_requests_entry {
utils::UUID id;
utils::UUID initiating_host;
@@ -439,10 +419,6 @@ public:
using repair_history_consumer = noncopyable_function<future<>(const repair_history_entry&)>;
future<> get_repair_history(table_id, repair_history_consumer f);
future<utils::chunked_vector<canonical_mutation>> get_update_repair_task_mutations(const repair_task_entry& entry, api::timestamp_type ts);
using repair_task_consumer = noncopyable_function<future<>(const repair_task_entry&)>;
future<> get_repair_task(tasks::task_id task_uuid, repair_task_consumer f);
future<> save_truncation_record(const replica::column_family&, db_clock::time_point truncated_at, db::replay_position);
future<replay_positions> get_truncated_positions(table_id);
future<> drop_truncation_rp_records();
@@ -750,8 +726,3 @@ public:
}; // class system_keyspace
} // namespace db
template <>
struct fmt::formatter<db::system_keyspace::repair_task_operation> : fmt::formatter<string_view> {
auto format(const db::system_keyspace::repair_task_operation&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -28,7 +28,8 @@ Incremental Repair is only supported for tables that use the tablets architectur
Incremental Repair Modes
------------------------
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation.
Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
The available modes are:

View File

@@ -53,13 +53,13 @@ ScyllaDB nodetool cluster repair command supports the following options:
nodetool cluster repair --tablet-tokens 1,10474535988
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
For example:
::
nodetool cluster repair --incremental-mode regular
nodetool cluster repair --incremental-mode disabled
- ``keyspace`` executes a repair on a specific keyspace. The default is all keyspaces.

View File

@@ -17,7 +17,7 @@ SYNOPSIS
[(-u <username> | --username <username>)] snapshot
[(-cf <table> | --column-family <table> | --table <table>)]
[(-kc <kclist> | --kc.list <kclist>)]
[(-sf | --skip-flush)] [--use-sstable-identifier] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
[(-sf | --skip-flush)] [(-t <tag> | --tag <tag>)] [--] [<keyspaces...>]
OPTIONS
.......
@@ -37,8 +37,6 @@ Parameter Descriptio
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-sf / --skip-flush Do not flush memtables before snapshotting (snapshot will not contain unflushed data)
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
--use-sstable-identifier Use the sstable identifier UUID, if available, rather than the sstable generation.
-------------------------------------------------------------------- -------------------------------------------------------------------------------------
-t <tag> / --tag <tag> The name of the snapshot
==================================================================== =====================================================================================

View File

@@ -143,7 +143,6 @@ public:
gms::feature tablet_incremental_repair { *this, "TABLET_INCREMENTAL_REPAIR"sv };
gms::feature tablet_repair_scheduler { *this, "TABLET_REPAIR_SCHEDULER"sv };
gms::feature tablet_repair_tasks_table { *this, "TABLET_REPAIR_TASKS_TABLE"sv };
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
gms::feature tablet_rack_aware_view_pairing { *this, "TABLET_RACK_AWARE_VIEW_PAIRING"sv };

View File

@@ -200,7 +200,10 @@ enum class tablet_repair_incremental_mode : uint8_t {
disabled,
};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
// FIXME: Incremental repair is disabled by default due to
// https://github.com/scylladb/scylladb/issues/26041 and
// https://github.com/scylladb/scylladb/issues/27414
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::disabled};
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);

View File

@@ -3844,83 +3844,3 @@ future<uint32_t> repair_service::get_next_repair_meta_id() {
locator::host_id repair_service::my_host_id() const noexcept {
return _gossiper.local().my_host_id();
}
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2) {
if (ranges1.empty() || ranges2.empty()) {
co_return 0;
}
auto sort = [] (utils::chunked_vector<tablet_token_range>& ranges) {
std::sort(ranges.begin(), ranges.end(), [] (const auto& a, const auto& b) {
if (a.first_token != b.first_token) {
return a.first_token < b.first_token;
}
return a.last_token < b.last_token;
});
};
// First, merge overlapping and adjacent ranges in ranges2.
sort(ranges2);
utils::chunked_vector<tablet_token_range> merged;
merged.push_back(ranges2[0]);
for (size_t i = 1; i < ranges2.size(); ++i) {
co_await coroutine::maybe_yield();
// To avoid overflow with max() + 1, we check adjacency with `a - 1 <= b` instead of `a <= b + 1`
if (ranges2[i].first_token - 1 <= merged.back().last_token) {
merged.back().last_token = std::max(merged.back().last_token, ranges2[i].last_token);
} else {
merged.push_back(ranges2[i]);
}
}
// Count covered ranges using a linear scan
size_t covered_count = 0;
auto it = merged.begin();
auto end = merged.end();
sort(ranges1);
for (const auto& r1 : ranges1) {
co_await coroutine::maybe_yield();
// Advance the merged iterator only if the current merged range ends
// before the current r1 starts.
while (it != end && it->last_token < r1.first_token) {
co_await coroutine::maybe_yield();
++it;
}
// If we have exhausted the merged ranges, no further r1 can be covered
if (it == end) {
break;
}
// Check if the current merged range covers r1.
if (it->first_token <= r1.first_token && r1.last_token <= it->last_token) {
covered_count++;
}
}
co_return covered_count;
}
future<std::optional<repair_task_progress>> repair_service::get_tablet_repair_task_progress(tasks::task_id task_uuid) {
utils::chunked_vector<tablet_token_range> requested_tablets;
utils::chunked_vector<tablet_token_range> finished_tablets;
table_id tid;
if (!_db.local().features().tablet_repair_tasks_table) {
co_return std::nullopt;
}
co_await _sys_ks.local().get_repair_task(task_uuid, [&tid, &requested_tablets, &finished_tablets] (const db::system_keyspace::repair_task_entry& entry) -> future<> {
rlogger.debug("repair_task_progress: Get entry operation={} first_token={} last_token={}", entry.operation, entry.first_token, entry.last_token);
if (entry.operation == db::system_keyspace::repair_task_operation::requested) {
requested_tablets.push_back({entry.first_token, entry.last_token});
} else if (entry.operation == db::system_keyspace::repair_task_operation::finished) {
finished_tablets.push_back({entry.first_token, entry.last_token});
}
tid = entry.table_uuid;
co_return;
});
auto requested = requested_tablets.size();
auto finished_nomerge = finished_tablets.size();
auto finished = co_await count_finished_tablets(std::move(requested_tablets), std::move(finished_tablets));
auto progress = repair_task_progress{requested, finished, tid};
rlogger.debug("repair_task_progress: task_uuid={} table_uuid={} requested_tablets={} finished_tablets={} progress={} finished_nomerge={}",
task_uuid, tid, requested, finished, progress.progress(), finished_nomerge);
co_return progress;
}

View File

@@ -99,15 +99,6 @@ public:
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
struct repair_task_progress {
size_t requested;
size_t finished;
table_id table_uuid;
float progress() const {
return requested == 0 ? 1.0 : float(finished) / requested;
}
};
class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<service::topology_state_machine>& _tsm;
sharded<gms::gossiper>& _gossiper;
@@ -231,9 +222,6 @@ private:
public:
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid, tasks::task_info global_tablet_repair_task_info, service::frozen_topology_guard topo_guard, std::optional<locator::tablet_replica_set> rebuild_replicas, locator::tablet_transition_stage stage);
future<std::optional<repair_task_progress>> get_tablet_repair_task_progress(tasks::task_id task_uuid);
private:
future<repair_update_system_table_response> repair_update_system_table_handler(
@@ -338,12 +326,3 @@ future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
schema_ptr s, uint64_t seed, repair_master is_master,
reader_permit permit, repair_hasher hasher);
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, std::optional<small_table_optimization_params> small_table_optimization = std::nullopt, repair_meta* rm = nullptr);
// A struct to hold the first and last token of a tablet.
struct tablet_token_range {
int64_t first_token;
int64_t last_token;
};
// Function to count the number of ranges in ranges1 covered by the merged ranges of ranges2.
future<size_t> count_finished_tablets(utils::chunked_vector<tablet_token_range> ranges1, utils::chunked_vector<tablet_token_range> ranges2);

View File

@@ -2810,26 +2810,26 @@ future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& shar
});
}
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, db::snapshot_options opts) {
if (!opts.skip_flush) {
future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id uuid, sstring tag, bool skip_flush) {
if (!skip_flush) {
co_await flush_table_on_all_shards(sharded_db, uuid);
}
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag, opts);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, tag);
}
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), opts] (auto& table_name) {
future<> database::snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush) {
return parallel_for_each(table_names, [&sharded_db, ks_name, tag = std::move(tag), skip_flush] (auto& table_name) {
auto uuid = sharded_db.local().find_uuid(ks_name, table_name);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
return snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
});
}
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts) {
future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), opts] (const auto& pair) -> future<> {
co_await coroutine::parallel_for_each(ks.metadata()->cf_meta_data(), [&, tag = std::move(tag), skip_flush] (const auto& pair) -> future<> {
auto uuid = pair.second->id();
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, opts);
co_await snapshot_table_on_all_shards(sharded_db, uuid, tag, skip_flush);
});
}
@@ -2951,12 +2951,7 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
auto truncated_at = truncated_at_opt.value_or(db_clock::now());
auto name = snapshot_name_opt.value_or(
format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name()));
// Use the sstable identifier in snapshot names to allow de-duplication of sstables
// at backup time even if they were migrated across shards or nodes and were renamed a given a new generation.
// We hard-code that here since we have no way to pass this option to auto-snapshot and
// it is always safe to use the sstable identifier for the sstable generation.
auto opts = db::snapshot_options{.use_sstable_identifier = true};
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name, opts);
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);
}
co_await sharded_db.invoke_on_all([&] (database& db) {

View File

@@ -1040,12 +1040,12 @@ public:
private:
using snapshot_file_set = foreign_ptr<std::unique_ptr<std::unordered_set<sstring>>>;
future<snapshot_file_set> take_snapshot(sstring jsondir, db::snapshot_options opts);
future<snapshot_file_set> take_snapshot(sstring jsondir);
// Writes the table schema and the manifest of all files in the snapshot directory.
future<> finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets);
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
public:
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts);
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name);
future<std::unordered_map<sstring, snapshot_details>> get_snapshot_details();
static future<snapshot_details> get_snapshot_details(std::filesystem::path snapshot_dir, std::filesystem::path datadir);
@@ -2009,9 +2009,9 @@ public:
static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, db::snapshot_options opts);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_options opts);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, db::snapshot_options opts);
static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, table_id id, sstring tag, bool skip_flush);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, bool skip_flush);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
public:
bool update_column_family(schema_ptr s);

View File

@@ -3268,7 +3268,7 @@ future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstrin
}
// Runs the orchestration code on an arbitrary shard to balance the load.
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name, db::snapshot_options opts) {
future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name) {
auto* so = std::get_if<storage_options::local>(&table_shards->get_storage_options().value);
if (so == nullptr) {
throw std::runtime_error("Snapshotting non-local tables is not implemented");
@@ -3291,7 +3291,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
co_await io_check([&jsondir] { return recursive_touch_directory(jsondir); });
co_await coroutine::parallel_for_each(smp::all_cpus(), [&] (unsigned shard) -> future<> {
file_sets.emplace_back(co_await smp::submit_to(shard, [&] {
return table_shards->take_snapshot(jsondir, opts);
return table_shards->take_snapshot(jsondir);
}));
});
co_await io_check(sync_directory, jsondir);
@@ -3300,22 +3300,19 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
});
}
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir, db::snapshot_options opts) {
tlogger.trace("take_snapshot {}: use_sstable_identifier={}", jsondir, opts.use_sstable_identifier);
future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
tlogger.trace("take_snapshot {}", jsondir);
auto sstable_deletion_guard = co_await get_sstable_list_permit();
auto tables = *_sstables->all() | std::ranges::to<std::vector<sstables::shared_sstable>>();
auto table_names = std::make_unique<std::unordered_set<sstring>>();
auto& ks_name = schema()->ks_name();
auto& cf_name = schema()->cf_name();
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&, opts] (sstables::shared_sstable sstable) -> future<> {
auto gen = co_await io_check([sstable, &dir = jsondir, opts] {
return sstable->snapshot(dir, opts.use_sstable_identifier);
co_await _sstables_manager.dir_semaphore().parallel_for_each(tables, [&jsondir, &table_names] (sstables::shared_sstable sstable) {
table_names->insert(sstable->component_basename(sstables::component_type::Data));
return io_check([sstable, &dir = jsondir] {
return sstable->snapshot(dir);
});
auto fname = sstable->component_basename(ks_name, cf_name, sstable->get_version(), gen, sstable->get_format(), sstables::component_type::Data);
table_names->insert(fname);
});
co_return make_foreign(std::move(table_names));
}

View File

@@ -6822,7 +6822,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
});
}
auto ts = db_clock::now();
for (const auto& token : tokens) {
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
@@ -6836,20 +6835,6 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
tablet_mutation_builder_for_base_table(guard.write_timestamp(), table)
.set_repair_task_info(last_token, repair_task_info, _feature_service)
.build());
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::requested,
.first_token = dht::token::to_int64(tmap.get_first_token(tid)),
.last_token = dht::token::to_int64(tmap.get_last_token(tid)),
.timestamp = ts,
.table_uuid = table,
};
if (_feature_service.tablet_repair_tasks_table) {
auto cmuts = co_await _sys_ks.local().get_update_repair_task_mutations(entry, guard.write_timestamp());
for (auto& m : cmuts) {
updates.push_back(std::move(m));
}
}
}
sstring reason = format("Repair tablet by API request tokens={} tablet_task_id={}", tokens, repair_task_info.tablet_task_id);

View File

@@ -136,17 +136,6 @@ db::tablet_options combine_tablet_options(R&& opts) {
return combined_opts;
}
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
auto tokens_view = s | std::views::split(delimiter)
| std::views::transform([](auto&& range) {
return std::string_view(&*range.begin(), std::ranges::distance(range));
})
| std::views::transform([](std::string_view sv) {
return locator::tablet_id(std::stoul(std::string(sv)));
});
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
}
// Used to compare different migration choices in regard to impact on load imbalance.
// There is a total order on migration_badness such that better migrations are ordered before worse ones.
struct migration_badness {
@@ -904,8 +893,6 @@ public:
co_await coroutine::maybe_yield();
auto& config = tmap.repair_scheduler_config();
auto now = db_clock::now();
auto skip = utils::get_local_injector().inject_parameter<std::string_view>("tablet_repair_skip_sched");
auto skip_tablets = skip ? split_string_to_tablet_id(*skip, ',') : std::unordered_set<locator::tablet_id>();
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
auto gid = locator::global_tablet_id{table, id};
// Skip tablet that is in transitions.
@@ -926,11 +913,6 @@ public:
co_return;
}
if (skip_tablets.contains(id)) {
lblogger.debug("Skipped tablet repair for tablet={} by error injector", gid);
co_return;
}
// Avoid rescheduling a failed tablet repair in a loop
// TODO: Allow user to config
const auto min_reschedule_time = std::chrono::seconds(5);

View File

@@ -10,7 +10,6 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/storage_service.hh"
#include "repair/row_level.hh"
#include "service/task_manager_module.hh"
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
@@ -110,16 +109,6 @@ future<std::optional<tasks::virtual_task_hint>> tablet_virtual_task::contains(ta
tid = tmap.next_tablet(*tid);
}
}
// Check if the task id is present in the repair task table
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(task_id);
if (progress && progress->requested > 0) {
co_return tasks::virtual_task_hint{
.table_id = progress->table_uuid,
.task_type = locator::tablet_task_type::user_repair,
.tablet_id = std::nullopt,
};
}
co_return std::nullopt;
}
@@ -254,20 +243,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
size_t sched_nr = 0;
auto tmptr = _ss.get_token_metadata_ptr();
auto& tmap = tmptr->tablets().get_tablet_map(table);
bool repair_task_finished = false;
bool repair_task_pending = false;
if (is_repair_task(task_type)) {
auto progress = co_await _ss._repair.local().get_tablet_repair_task_progress(id);
if (progress) {
res.status.progress.completed = progress->finished;
res.status.progress.total = progress->requested;
res.status.progress_units = "tablets";
if (progress->requested > 0 && progress->requested == progress->finished) {
repair_task_finished = true;
} if (progress->requested > 0 && progress->requested > progress->finished) {
repair_task_pending = true;
}
}
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
auto& task_info = info.repair_task_info;
if (task_info.tablet_task_id.uuid() == id.uuid()) {
@@ -299,17 +275,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
co_return res;
}
if (repair_task_pending) {
// When repair_task_pending is true, the res.tablets will be empty iff the request is aborted by user.
res.status.state = res.tablets.empty() ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::running;
co_return res;
}
if (repair_task_finished) {
res.status.state = tasks::task_manager::task_state::done;
co_return res;
}
// FIXME: Show finished tasks.
co_return std::nullopt;
}

View File

@@ -1205,8 +1205,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
// Record the repair_time returned by the repair_tablet rpc call
db_clock::time_point repair_time;
// Record the repair task update muations
utils::chunked_vector<canonical_mutation> repair_task_updates;
service::session_id session_id;
};
@@ -1739,14 +1737,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
dst = dst_opt.value().host;
}
// Update repair task
db::system_keyspace::repair_task_entry entry{
.task_uuid = tasks::task_id(tinfo.repair_task_info.tablet_task_id.uuid()),
.operation = db::system_keyspace::repair_task_operation::finished,
.first_token = dht::token::to_int64(tmap.get_first_token(gid.tablet)),
.last_token = dht::token::to_int64(tmap.get_last_token(gid.tablet)),
.table_uuid = gid.table,
};
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
@@ -1755,10 +1745,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
if (_feature_service.tablet_repair_tasks_table) {
entry.timestamp = db_clock::now();
tablet_state.repair_task_updates = co_await _sys_ks.get_update_repair_task_mutations(entry, api::new_timestamp());
}
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
dst, tablet, duration, res.repair_time);
})) {
@@ -1777,9 +1763,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
.del_repair_task_info(last_token, _feature_service)
.del_session(last_token);
for (auto& m : tablet_state.repair_task_updates) {
updates.push_back(std::move(m));
}
// Skip update repair time in case hosts filter or dcs filter is set.
if (valid && is_filter_off) {
auto sched_time = tinfo.repair_task_info.sched_time;

View File

@@ -2117,14 +2117,11 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
}
sstable_id sid;
// Force a random sstable_id for testing purposes
bool random_sstable_identifier = utils::get_local_injector().is_enabled("random_sstable_identifier");
if (!random_sstable_identifier && generation().is_uuid_based()) {
if (generation().is_uuid_based()) {
sid = sstable_id(generation().as_uuid());
} else {
sid = sstable_id(utils::UUID_gen::get_time_UUID());
auto msg = random_sstable_identifier ? "forced random sstable_id" : "has numerical generation";
sstlog.info("SSTable {} {}. SSTable identifier in scylla_metadata set to {}", get_filename(), msg, sid);
sstlog.info("SSTable {} has numerical generation. SSTable identifier in scylla_metadata set to {}", get_filename(), sid);
}
_components->scylla_metadata->data.set<scylla_metadata_type::SSTableIdentifier>(scylla_metadata::sstable_identifier{sid});
@@ -2538,11 +2535,8 @@ std::vector<std::pair<component_type, sstring>> sstable::all_components() const
return all;
}
future<generation_type> sstable::snapshot(const sstring& dir, bool use_sstable_identifier) const {
// Use the sstable identifier UUID if available to enable global de-duplication of sstables in backup.
generation_type gen = (use_sstable_identifier && _sstable_identifier) ? generation_type(_sstable_identifier->uuid()) : _generation;
co_await _storage->snapshot(*this, dir, storage::absolute_path::yes, gen);
co_return gen;
future<> sstable::snapshot(const sstring& dir) const {
return _storage->snapshot(*this, dir, storage::absolute_path::yes);
}
future<> sstable::change_state(sstable_state to, delayed_commit_changes* delay_commit) {

View File

@@ -397,10 +397,6 @@ public:
return _version;
}
format_types get_format() const {
return _format;
}
// Returns the total bytes of all components.
uint64_t bytes_on_disk() const;
file_size_stats get_file_size_stats() const;
@@ -442,10 +438,7 @@ public:
std::vector<std::pair<component_type, sstring>> all_components() const;
// When use_sstable_identifier is true and the sstable identifier is available,
// use it to name the sstable in the snapshot, rather than the sstable generation.
// Returns the generation used for snapshot.
future<generation_type> snapshot(const sstring& dir, bool use_sstable_identifier = false) const;
future<> snapshot(const sstring& dir) const;
// Delete the sstable by unlinking all sstable files
// Ignores all errors.

View File

@@ -31,7 +31,6 @@
#include "replica/database.hh"
#include "utils/assert.hh"
#include "utils/lister.hh"
#include "utils/rjson.hh"
#include "partition_slice_builder.hh"
#include "mutation/frozen_mutation.hh"
#include "test/lib/mutation_source_test.hh"
@@ -39,7 +38,6 @@
#include "service/migration_manager.hh"
#include "sstables/sstables.hh"
#include "sstables/generation_type.hh"
#include "sstables/sstable_version.hh"
#include "db/config.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog.hh"
@@ -53,7 +51,6 @@
#include "db/system_keyspace.hh"
#include "db/view/view_builder.hh"
#include "replica/mutation_dump.hh"
#include "utils/error_injection.hh"
using namespace std::chrono_literals;
using namespace sstables;
@@ -615,13 +612,13 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
});
}
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", bool skip_flush = false) {
try {
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, skip_flush);
} catch (...) {
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={} use_sstable_identifier={}: {}",
ks_name, cf_name, snapshot_name, opts.skip_flush, opts.use_sstable_identifier, std::current_exception());
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
ks_name, cf_name, snapshot_name, skip_flush, std::current_exception());
throw;
}
}
@@ -635,37 +632,6 @@ future<std::set<sstring>> collect_files(fs::path path) {
co_return ret;
}
static bool is_component(const sstring& fname, const sstring& suffix) {
return fname.ends_with(suffix);
}
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
// Verify manifest against the files in the snapshots dir
auto pred = [&suffix] (const sstring& fname) {
return is_component(fname, suffix);
};
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
}
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
static future<> validate_manifest(const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir) {
sstring suffix = "-Data.db";
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
std::set<sstring> sstables_in_manifest;
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
auto manifest_json = rjson::parse(manifest_str);
auto& manifest_files = manifest_json["files"];
BOOST_REQUIRE(manifest_files.IsArray());
for (auto& f : manifest_files.GetArray()) {
if (is_component(f.GetString(), suffix)) {
sstables_in_manifest.insert(f.GetString());
}
}
testlog.debug("SSTables in manifest.json: {}", fmt::join(sstables_in_manifest, ", "));
BOOST_REQUIRE_EQUAL(sstables_in_snapshot, sstables_in_manifest);
}
static future<> snapshot_works(const std::string& table_name) {
return do_with_some_data({"cf"}, [table_name] (cql_test_env& e) {
take_snapshot(e, "ks", table_name).get();
@@ -685,8 +651,6 @@ static future<> snapshot_works(const std::string& table_name) {
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
validate_manifest(snapshot_dir, in_snapshot_dir).get();
return make_ready_future<>();
}, true);
}
@@ -705,8 +669,7 @@ SEASTAR_TEST_CASE(index_snapshot_works) {
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
db::snapshot_options opts = {.skip_flush = true};
take_snapshot(e, "ks", "cf", "test", opts).get();
take_snapshot(e, "ks", "cf", "test", true /* skip_flush */).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -719,41 +682,6 @@ SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
});
}
SEASTAR_TEST_CASE(snapshot_use_sstable_identifier_works) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return make_ready_future<>();
#endif
sstring table_name = "cf";
// Force random sstable identifiers, otherwise the initial sstable_id is equal
// to the sstable generation and the test can't distinguish between them.
utils::get_local_injector().enable("random_sstable_identifier", false);
return do_with_some_data({table_name}, [table_name] (cql_test_env& e) -> future<> {
sstring tag = "test";
db::snapshot_options opts = {.use_sstable_identifier = true};
co_await take_snapshot(e, "ks", table_name, tag, opts);
auto& cf = e.local_db().find_column_family("ks", table_name);
auto table_directory = table_dir(cf);
auto snapshot_dir = table_directory / sstables::snapshots_dir / tag;
auto in_table_dir = co_await collect_files(table_directory);
// snapshot triggered a flush and wrote the data down.
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
testlog.info("Files in table dir: {}", fmt::join(in_table_dir, ", "));
auto in_snapshot_dir = co_await collect_files(snapshot_dir);
testlog.info("Files in snapshot dir: {}", fmt::join(in_snapshot_dir, ", "));
in_table_dir.insert("manifest.json");
in_table_dir.insert("schema.cql");
// all files were copied and manifest was generated
BOOST_REQUIRE_EQUAL(in_table_dir.size(), in_snapshot_dir.size());
BOOST_REQUIRE_NE(in_table_dir, in_snapshot_dir);
co_await validate_manifest(snapshot_dir, in_snapshot_dir);
}, true);
}
SEASTAR_TEST_CASE(snapshot_list_okay) {
return do_with_some_data({"cf"}, [] (cql_test_env& e) {
auto& cf = e.local_db().find_column_family("ks", "cf");
@@ -1528,7 +1456,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
}
BOOST_REQUIRE(found);
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
co_await take_snapshot(e, "ks", "cf", "test", true /* skip_flush */);
testlog.debug("Expected: {}", expected);

View File

@@ -346,60 +346,4 @@ SEASTAR_TEST_CASE(repair_rows_size_considers_external_memory) {
});
}
SEASTAR_TEST_CASE(test_tablet_token_range_count) {
{
// Simple case: one large range covers a smaller one
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges overlap and should merge to cover r1
// r2: [0, 50] + [40, 100] -> merges to [0, 100]
// r1: [10, 90] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{10, 90}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 50}, {40, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r2 ranges are adjacent (contiguous) and should merge
// r2: [0, 10] + [11, 20] -> merges to [0, 20]
// r1: [5, 15] should be covered
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}, {11, 20}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 1);
}
{
// r1 overlaps r2 but is not FULLY contained
// r2: [0, 10]
// r1: [5, 15] (Ends too late), [ -5, 5 ] (Starts too early)
utils::chunked_vector<tablet_token_range> r1 = {{5, 15}, {-5, 5}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 10}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 0);
}
{
// A single merged range in r2 covers multiple distinct ranges in r1
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}, {30, 40}, {50, 60}};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 3);
}
{
// Inputs are provided in random order, ensuring the internal sort works
utils::chunked_vector<tablet_token_range> r1 = {{50, 60}, {10, 20}};
utils::chunked_vector<tablet_token_range> r2 = {{50, 100}, {0, 40}};
// r2 merges effectively to [0, 40] and [50, 100]
// Both r1 items are covered
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2) == 2);
}
{
utils::chunked_vector<tablet_token_range> r1 = {{10, 20}};
utils::chunked_vector<tablet_token_range> r2_empty = {};
utils::chunked_vector<tablet_token_range> r1_empty = {};
utils::chunked_vector<tablet_token_range> r2 = {{0, 100}};
BOOST_REQUIRE(co_await count_finished_tablets(r1, r2_empty) == 0);
BOOST_REQUIRE(co_await count_finished_tablets(r1_empty, r2) == 0);
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -25,7 +25,7 @@ from typing import Any, Optional, override
import pytest
import requests
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.cluster import NoHostAvailable, Session
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.query import SimpleStatement, named_tuple_factory
from ccmlib.scylla_node import ScyllaNode, NodeError
@@ -1135,6 +1135,14 @@ class TestCQLAudit(AuditTester):
session.execute("DROP TABLE test1")
def _get_attempt_count(self, session: Session, *, execution_profile=EXEC_PROFILE_DEFAULT, consistency_level: ConsistencyLevel = ConsistencyLevel.ONE) -> int:
# dtest env is using FlakyRetryPolicy which has `max_retries` attribute
cl_profile = session.execution_profile_clone_update(execution_profile, consistency_level=consistency_level)
policy = cl_profile.retry_policy
retries = getattr(policy, "max_retries", None)
assert retries is not None
return 1 + retries
def _test_insert_failure_doesnt_report_success_assign_nodes(self, session: Session = None):
all_nodes: set[ScyllaNode] = set(self.cluster.nodelist())
assert len(all_nodes) == 7
@@ -1154,6 +1162,7 @@ class TestCQLAudit(AuditTester):
for i in range(256):
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES ({i}, 1337)", consistency_level=ConsistencyLevel.THREE)
session.execute(stmt)
attempt_count = self._get_attempt_count(session, consistency_level=ConsistencyLevel.THREE)
token = rows_to_list(session.execute(f"SELECT token(k) FROM ks.test1 WHERE k = {i}"))[0][0]
@@ -1168,9 +1177,9 @@ class TestCQLAudit(AuditTester):
audit_partition_nodes = [address_to_node[address] for address in audit_nodes]
insert_node = address_to_node[insert_node.pop()]
kill_node = address_to_node[partitions.pop()]
return audit_partition_nodes, insert_node, kill_node, stmt.query_string
return audit_partition_nodes, insert_node, kill_node, stmt.query_string, attempt_count
return [], [], None, None
return [], [], None, None, None
@pytest.mark.exclude_errors("audit - Unexpected exception when writing log with: node_ip")
def test_insert_failure_doesnt_report_success(self):
@@ -1192,7 +1201,7 @@ class TestCQLAudit(AuditTester):
with self.assert_exactly_n_audit_entries_were_added(session, 1):
conn.execute(stmt)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail, query_fail_count = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
# TODO: remove the loop when scylladb#24473 is fixed
# We call get_host_id only to cache host_id
@@ -1231,8 +1240,8 @@ class TestCQLAudit(AuditTester):
# If any audit mode is not done yet, continue polling.
all_modes_done = True
for mode, rows in rows_dict.items():
rows_with_error = list(filter(lambda r: r.error, rows))
if len(rows_with_error) == 6:
rows_with_error = [row for row in rows if row.error and row.operation == query_to_fail]
if len(rows_with_error) == query_fail_count:
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
assert rows_with_error[0].error is True
assert rows_with_error[0].consistency == "THREE"

View File

@@ -220,14 +220,14 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 0, 100)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
skipped_bytes = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes = get_incremental_repair_sst_read_bytes(servers[0])
# Nothing to skip. Repair all data.
assert skipped_bytes == 0
assert read_bytes > 0
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
skipped_bytes2 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes2 = get_incremental_repair_sst_read_bytes(servers[0])
# Skip all. Nothing to repair
@@ -236,7 +236,7 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 200, 300)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
skipped_bytes3 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes3 = get_incremental_repair_sst_read_bytes(servers[0])
# Both skipped and read bytes should grow
@@ -272,7 +272,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert get_sstables_repaired_at(map0, token) == sstables_repaired_at
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
map1 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map1={map1}')
# Check sstables_repaired_at is increased by 1
@@ -288,7 +288,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert len(enable) == 1
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
map2 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map2={map2}')
# Check sstables_repaired_at is increased by 1
@@ -313,7 +313,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
# Repair should not finish with error
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
try:
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, timeout=10)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental', timeout=10)
assert False # Check the tablet repair is not supposed to finish
except TimeoutError:
logger.info("Repair timeout as expected")
@@ -329,7 +329,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 0 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
@@ -355,7 +355,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
else:
assert False # Wrong ops
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 1 skip 1 mark
for log in logs:
@@ -394,7 +394,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -402,7 +402,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -436,7 +436,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
@@ -445,7 +445,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 3
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -505,7 +505,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
await manager.server_start(servers[1].server_id)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
scylla_path = get_scylla_path(cql)
@@ -521,8 +521,8 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -532,7 +532,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
# Second repair
await inject_error_on(manager, "repair_tablet_no_update_sstables_repair_at", servers)
# some sstable repaired_at = 3, but sstables_repaired_at = 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await inject_error_off(manager, "repair_tablet_no_update_sstables_repair_at", servers)
scylla_path = get_scylla_path(cql)
@@ -561,8 +561,8 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -574,7 +574,7 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
last_tokens = [t.last_token for t in replicas]
for t in last_tokens[0::2]:
logging.info(f"Start repair for token={t}");
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t) # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t, incremental_mode='incremental') # sstables_repaired_at 3
scylla_path = get_scylla_path(cql)
@@ -595,7 +595,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -659,13 +659,18 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
assert read1 == 0
assert skip2 == 0
assert read2 > 0
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
await do_repair_and_check('incremental', 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
def check2(skip1, read1, skip2, read2):
assert skip1 == skip2
assert read1 == read2
await do_repair_and_check('disabled', 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
# FIXME: Incremental repair is disabled by default due to
# https://github.com/scylladb/scylladb/issues/26041 and
# https://github.com/scylladb/scylladb/issues/27414
await do_repair_and_check(None, 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
def check3(skip1, read1, skip2, read2):
assert skip1 < skip2
assert read1 == read2
@@ -677,14 +682,14 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
await do_repair_and_check('full', 1, rf'Starting tablet repair by API .* incremental_mode=full.*', check4)
@pytest.mark.asyncio
async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
for s in servers:
time1 += get_repair_tablet_time_ms(s)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
for s in servers:
time2 += get_repair_tablet_time_ms(s)
@@ -694,7 +699,7 @@ async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
# Reproducer for https://github.com/scylladb/scylladb/issues/26346
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
async def test_incremental_repair_finishes_when_tablet_skips_end_repair_stage(manager):
servers = await manager.servers_add(3, auto_rack_dc="dc1")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
@@ -719,7 +724,7 @@ async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_rejoin_do_tablet_operation(manager):
async def test_incremental_repair_rejoin_do_tablet_operation(manager):
cmdline = ['--logger-log-level', 'raft_topology=debug']
servers = await manager.servers_add(3, auto_rack_dc="dc1", cmdline=cmdline)

View File

@@ -43,86 +43,6 @@ async def guarantee_repair_time_next_second():
# different than the previous one.
await asyncio.sleep(1)
async def do_test_tablet_repair_progress_split_merge(manager: ManagerClient, do_split=False, do_merge=False):
nr_tablets = 16
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=True, tablets=nr_tablets)
token = 'all'
logs = []
for s in servers:
logs.append(await manager.server_open_log(s.server_id))
# Skip repair for the listed tablet id
nr_tablets_skipped = 4
nr_tablets_repaired = nr_tablets - nr_tablets_skipped
await inject_error_on(manager, "tablet_repair_skip_sched", servers, params={'value':"0,1,5,8"})
# Request to repair all tablets
repair_res = await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
logging.info(f'{repair_res=}')
tablet_task_id = repair_res['tablet_task_id']
async def get_task_status(desc):
task_status = await manager.api.get_task_status(servers[0].ip_addr, tablet_task_id)
completed = int(task_status['progress_completed'])
total = int(task_status['progress_total'])
logging.info(f'{desc=} {completed=} {total=} {task_status=}')
return completed, total
async def wait_task_progress(wanted_complete, wanted_total):
while True:
completed, total = await get_task_status("wait_task_progress")
if completed == wanted_complete and total == wanted_total:
break
await asyncio.sleep(1)
async def get_task_status_and_check(desc):
completed, total = await get_task_status(desc)
assert completed == nr_tablets_repaired
assert total == nr_tablets
# 12 out of 16 tablets should finish
await wait_task_progress(nr_tablets_repaired, nr_tablets)
if do_split:
await get_task_status_and_check("before_split")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_increase", servers)
await logs[0].wait_for('Detected tablet split for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_increase", servers)
await get_task_status_and_check("after_split")
if do_merge:
await get_task_status_and_check("before_merge")
s1_mark = await logs[0].mark()
await inject_error_on(manager, "tablet_force_tablet_count_decrease", servers)
await logs[0].wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
await get_task_status_and_check("after_merge")
# Wait for all repair to finish after all tablets can be scheduled to run repair
await inject_error_off(manager, "tablet_repair_skip_sched", servers)
await wait_task_progress(nr_tablets, nr_tablets)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=False, do_merge=False)
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.asyncio
async def test_tablet_repair_progress_split(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_split=True)
@pytest.mark.asyncio
@pytest.mark.skip(reason="https://github.com/scylladb/scylladb/issues/26844")
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_repair_progress_merge(manager: ManagerClient):
await do_test_tablet_repair_progress_split_merge(manager, do_merge=True)
@pytest.mark.asyncio
async def test_tablet_manual_repair(manager: ManagerClient):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)

View File

@@ -67,11 +67,11 @@ nodetool_cmd.conf = False
# Run the external "nodetool" executable (can be overridden by the NODETOOL
# environment variable). Only call this if the REST API doesn't work.
def run_nodetool(cql, *args):
def run_nodetool(cql, *args, **subprocess_kwargs):
# TODO: We may need to change this function or its callers to add proper
# support for testing on multi-node clusters.
host = cql.cluster.contact_points[0]
subprocess.run([nodetool_cmd(), '-h', host, *args])
return subprocess.run([nodetool_cmd(), '-h', host, *args], **subprocess_kwargs)
def flush(cql, table):
ks, cf = table.split('.')
@@ -115,7 +115,7 @@ def compact_keyspace(cql, ks, flush_memtables=True):
args.extend([ks, cf])
run_nodetool(cql, "compact", *args)
def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
def take_snapshot(cql, table, tag, skip_flush):
ks, cf = table.split('.')
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/storage_service/snapshots/', params={'kn': ks, 'cf' : cf, 'tag': tag, 'sf': skip_flush})
@@ -123,8 +123,6 @@ def take_snapshot(cql, table, tag, skip_flush, use_sstable_identifier=False):
args = ['--tag', tag, '--table', cf]
if skip_flush:
args.append('--skip-flush')
if use_sstable_identifier:
args.append('--use-sstable-identifier')
args.append(ks)
run_nodetool(cql, "snapshot", *args)
@@ -159,6 +157,28 @@ def disablebinary(cql):
else:
run_nodetool(cql, "disablebinary")
def getlogginglevel(cql, logger):
if has_rest_api(cql):
resp = requests.get(f'{rest_api_url(cql)}/system/logger/{logger}')
if resp.ok:
return resp.text.strip()
raise RuntimeError(f"failed to fetch logging level for {logger}: {resp.status_code} {resp.text}")
result = run_nodetool(
cql,
"getlogginglevels",
capture_output=True,
text=True,
check=True,
)
for line in result.stdout.splitlines():
stripped = line.strip()
parts = stripped.split()
if len(parts) >= 2 and parts[0] == logger:
return parts[-1]
raise RuntimeError(f"logger {logger} not found in getlogginglevels output")
def setlogginglevel(cql, logger, level):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/system/logger/{logger}', params={'level': level})

View File

@@ -10,6 +10,7 @@ import re
import requests
import socket
import struct
from test.cqlpy import nodetool
from test.cqlpy.util import cql_session
def get_protocol_error_metrics(host) -> int:
@@ -58,11 +59,50 @@ def try_connect(host, port, creds, protocol_version):
with cql_with_protocol(host, port, creds, protocol_version) as session:
return 1 if session else 0
@pytest.fixture
def debug_exceptions_logging(request, cql):
def _read_level() -> str | None:
try:
level = nodetool.getlogginglevel(cql, "exception")
if level:
level = level.strip().strip('"').lower()
return level
except Exception as exc:
print(f"Failed to read exception logger level: {exc}")
return None
def _set_and_verify(level: str) -> bool:
try:
nodetool.setlogginglevel(cql, "exception", level)
except Exception as exc:
print(f"Failed to set exception logger level to '{level}': {exc}")
return False
observed = _read_level()
if observed == level:
return True
print(f"Exception logger level observed as '{observed}' while expecting '{level}'")
return False
def _restore_logging():
if not enabled and previous_level is None:
return
target_level = previous_level or "info"
_set_and_verify(target_level)
previous_level = _read_level()
enabled = _set_and_verify("debug")
yield
_restore_logging()
# If there is a protocol version mismatch, the server should
# raise a protocol error, which is counted in the metrics.
def test_protocol_version_mismatch(scylla_only, request, host):
run_count = 100
cpp_exception_threshold = 10
def test_protocol_version_mismatch(scylla_only, debug_exceptions_logging, request, host):
run_count = 200
cpp_exception_threshold = 20
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)
@@ -244,8 +284,8 @@ def _protocol_error_impl(
s.close()
def _test_impl(host, flag):
run_count = 100
cpp_exception_threshold = 10
run_count = 200
cpp_exception_threshold = 20
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)
@@ -267,47 +307,47 @@ def no_ssl(request):
yield
# Malformed BATCH with an invalid kind triggers a protocol error.
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, host):
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_bad_batch")
# Send OPTIONS during AUTHENTICATE to trigger auth-state error.
def test_unexpected_message_during_auth(scylla_only, no_ssl, host):
def test_unexpected_message_during_auth(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_unexpected_auth")
# STARTUP with an invalid/missing string-map entry should produce a protocol error.
def test_process_startup_invalid_string_map(scylla_only, no_ssl, host):
def test_process_startup_invalid_string_map(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_startup_invalid_string_map")
# STARTUP with unknown COMPRESSION option should produce a protocol error.
def test_unknown_compression_algorithm(scylla_only, no_ssl, host):
def test_unknown_compression_algorithm(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_unknown_compression")
# QUERY long-string truncation: declared length > provided bytes triggers protocol error.
def test_process_query_internal_malformed_query(scylla_only, no_ssl, host):
def test_process_query_internal_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_query_internal_malformed_query")
# QUERY options malformed: PAGE_SIZE flag set but page_size truncated triggers protocol error.
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, host):
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_query_internal_fail_read_options")
# PREPARE long-string truncation: declared length > provided bytes triggers protocol error.
def test_process_prepare_malformed_query(scylla_only, no_ssl, host):
def test_process_prepare_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_prepare_malformed_query")
# EXECUTE cache-key malformed: short-bytes length > provided bytes triggers protocol error.
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, host):
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_execute_internal_malformed_cache_key")
# REGISTER malformed string list: declared string length > provided bytes triggers protocol error.
def test_process_register_malformed_string_list(scylla_only, no_ssl, host):
def test_process_register_malformed_string_list(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_register_malformed_string_list")
# Test if the protocol exceptions do not decrease after running the test happy path.
# This is to ensure that the protocol exceptions are not cleared or reset
# during the test execution.
def test_no_protocol_exceptions(scylla_only, no_ssl, host):
run_count = 100
cpp_exception_threshold = 10
def test_no_protocol_exceptions(scylla_only, no_ssl, debug_exceptions_logging, host):
run_count = 200
cpp_exception_threshold = 20
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)

View File

@@ -99,7 +99,7 @@ def test_listsnapshots_no_snapshots(nodetool, request):
assert res.stdout == "Snapshot Details: \nThere are no snapshots\n"
def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=False):
def check_snapshot_out(res, tag, ktlist, skip_flush):
"""Check that the output of nodetool snapshot contains the expected messages"""
if len(ktlist) == 0:
@@ -110,7 +110,7 @@ def check_snapshot_out(res, tag, ktlist, skip_flush, use_sstable_identifier=Fals
pattern = re.compile("Requested creating snapshot\\(s\\)"
f" for \\[{keyspaces}\\]"
f" with snapshot name \\[(.+)\\]"
f" and options \\{{skip_flush={str(skip_flush).lower()}, use_sstable_identifier={str(use_sstable_identifier).lower()}\\}}")
f" and options \\{{skipFlush={str(skip_flush).lower()}\\}}")
print(res)
print(pattern)
@@ -138,13 +138,13 @@ def test_snapshot_keyspace(nodetool):
res = nodetool("snapshot", "--tag", tag, "ks1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1"})
params={"tag": tag, "sf": "false", "kn": "ks1"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", "ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1", "ks2"], False)
@@ -155,13 +155,13 @@ def test_snapshot_keyspace_with_table(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl"})
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
res = nodetool("snapshot", "--tag", tag, "ks1", option_name, "tbl1,tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1,tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1"], False)
@@ -186,7 +186,7 @@ class kn_param(NamedTuple):
def test_snapshot_keyspace_table_single_arg(nodetool, param, scylla_only):
tag = "my_snapshot"
req_params = {"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": param.kn}
req_params = {"tag": tag, "sf": "false", "kn": param.kn}
if param.cf:
req_params["cf"] = param.cf
@@ -202,19 +202,19 @@ def test_snapshot_ktlist(nodetool, option_name):
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1", "cf": "tbl1"})
params={"tag": tag, "sf": "false", "kn": "ks1", "cf": "tbl1"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1.tbl1,ks2.tbl2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1.tbl1,ks2.tbl2"})
params={"tag": tag, "sf": "false", "kn": "ks1.tbl1,ks2.tbl2"})
])
check_snapshot_out(res.stdout, tag, ["ks1.tbl1", "ks2.tbl2"], False)
res = nodetool("snapshot", "--tag", tag, option_name, "ks1,ks2", expected_requests=[
expected_request("POST", "/storage_service/snapshots",
params={"tag": tag, "sf": "false", "use_sstable_identifier": "false", "kn": "ks1,ks2"})
params={"tag": tag, "sf": "false", "kn": "ks1,ks2"})
])
check_snapshot_out(res.stdout, tag, ["ks1" ,"ks2"], False)
@@ -229,8 +229,7 @@ def test_snapshot_ktlist(nodetool, option_name):
{"ks": ["ks1", "ks2"], "tbl": []},
])
@pytest.mark.parametrize("skip_flush", [False, True])
@pytest.mark.parametrize("use_sstable_identifier", [False, True])
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_identifier):
def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush):
cmd = ["snapshot"]
params = {}
@@ -243,11 +242,8 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
if skip_flush:
cmd.append("--skip-flush")
if use_sstable_identifier:
cmd.append("--use-sstable-identifier")
params["sf"] = str(skip_flush).lower()
params["use_sstable_identifier"] = str(use_sstable_identifier).lower()
if ktlist:
if "tbl" in ktlist:
@@ -277,7 +273,7 @@ def test_snapshot_options_matrix(nodetool, tag, ktlist, skip_flush, use_sstable_
expected_request("POST", "/storage_service/snapshots", params=params)
])
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush, use_sstable_identifier)
check_snapshot_out(res.stdout, tag, keyspaces, skip_flush)
def test_snapshot_multiple_keyspace_with_table(nodetool):

View File

@@ -654,7 +654,7 @@ void cluster_repair_operation(scylla_rest_client& client, const bpo::variables_m
for (const auto& table : tables.empty() ? ks_to_cfs[keyspace] : tables) {
repair_params["table"] = table;
try {
sstring task_id = client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"].GetString();
sstring task_id = rjson::to_sstring(client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"]);
log("Starting repair with task_id={} keyspace={} table={}", task_id, keyspace, table);
@@ -2362,23 +2362,16 @@ void snapshot_operation(scylla_rest_client& client, const bpo::variables_map& vm
params["sf"] = "false";
}
if (vm.contains("use-sstable-identifier")) {
params["use_sstable_identifier"] = "true";
} else {
params["use_sstable_identifier"] = "false";
}
client.post("/storage_service/snapshots", params);
if (kn_msg.empty()) {
kn_msg = params["kn"];
}
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skip_flush={}, use_sstable_identifier={}}}\n",
fmt::print(std::cout, "Requested creating snapshot(s) for [{}] with snapshot name [{}] and options {{skipFlush={}}}\n",
kn_msg,
params["tag"],
params["sf"],
params["use_sstable_identifier"]);
params["sf"]);
fmt::print(std::cout, "Snapshot directory: {}\n", params["tag"]);
}
@@ -4605,7 +4598,6 @@ For more information, see: {}
typed_option<sstring>("keyspace-table-list", "The keyspace.table pair(s) to snapshot, multiple ones can be joined with ','"),
typed_option<sstring>("tag,t", "The name of the snapshot"),
typed_option<>("skip-flush", "Do not flush memtables before snapshotting (snapshot will not contain unflushed data)"),
typed_option<>("use-sstable-identifier", "Use the sstable identifier UUID, if available, rather than the sstable generation for the sstable file names within the snapshot dir and the manifest file"),
},
{
typed_option<std::vector<sstring>>("keyspaces", "The keyspaces to snapshot", -1),

View File

@@ -29,11 +29,8 @@ class counted_data_source_impl : public data_source_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () {
return fun();
}).finally([this] () {
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};
@@ -60,11 +57,8 @@ class counted_data_sink_impl : public data_sink_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () mutable {
return fun();
}).finally([this] () {
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};

View File

@@ -832,6 +832,12 @@ to_bytes(bytes_view x) {
return bytes(x.begin(), x.size());
}
inline
bytes
to_bytes(std::string_view x) {
return to_bytes(to_bytes_view(x));
}
inline
bytes_opt
to_bytes_opt(bytes_view_opt bv) {

View File

@@ -204,7 +204,7 @@ public:
public:
template <typename Clock, typename Duration>
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr) {
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr, std::source_location loc = std::source_location::current()) {
if (!_shared_data) {
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
}
@@ -234,7 +234,8 @@ public:
throw;
}
catch (const std::exception& e) {
on_internal_error(errinj_logger, "Error injection wait_for_message timeout: " + std::string(e.what()));
on_internal_error(errinj_logger, fmt::format("Error injection [{}] wait_for_message timeout: Called from `{}` @ {}:{}:{:d}: {}",
_shared_data->injection_name, loc.function_name(), loc.file_name(), loc.line(), loc.column(), e.what()));
}
++_read_messages_counter;
}

View File

@@ -174,13 +174,6 @@ future<> print_with_extra_array(const rjson::value& value,
seastar::output_stream<char>& os,
size_t max_nested_level = default_max_nested_level);
// Returns a string_view to the string held in a JSON value (which is
// assumed to hold a string, i.e., v.IsString() == true). This is a view
// to the existing data - no copying is done.
inline std::string_view to_string_view(const rjson::value& v) {
return std::string_view(v.GetString(), v.GetStringLength());
}
// Copies given JSON value - involves allocation
rjson::value copy(const rjson::value& value);
@@ -236,6 +229,27 @@ rjson::value parse_yieldable(chunked_content&&, size_t max_nested_level = defaul
rjson::value from_string(const char* str, size_t size);
rjson::value from_string(std::string_view view);
// Returns a string_view to the string held in a JSON value (which is
// assumed to hold a string, i.e., v.IsString() == true). This is a view
// to the existing data - no copying is done.
inline std::string_view to_string_view(const rjson::value& v) {
return std::string_view(v.GetString(), v.GetStringLength());
}
// Those functions must be called on json string object.
// They make a copy of underlying data so it's safe to destroy
// rjson::value afterwards.
//
// Rapidjson's GetString method alone is not good enough
// for string conversion because it needs to scan the string
// unnecessarily and GetStringLength could be used to avoid that.
inline sstring to_sstring(const rjson::value& str) {
return sstring(str.GetString(), str.GetStringLength());
}
inline std::string to_string(const rjson::value& str) {
return std::string(str.GetString(), str.GetStringLength());
}
// Returns a pointer to JSON member if it exists, nullptr otherwise
rjson::value* find(rjson::value& value, std::string_view name);
const rjson::value* find(const rjson::value& value, std::string_view name);
@@ -377,7 +391,7 @@ rjson::value from_string_map(const std::map<sstring, sstring>& map);
sstring quote_json_string(const sstring& value);
inline bytes base64_decode(const value& v) {
return ::base64_decode(std::string_view(v.GetString(), v.GetStringLength()));
return ::base64_decode(to_string_view(v));
}
// A writer which allows writing json into an std::ostream in a streaming manner.

View File

@@ -10,6 +10,7 @@
#include "utils/http.hh"
#include "utils/s3/client.hh"
#include "utils/s3/default_aws_retry_strategy.hh"
#include "utils/rjson.hh"
#include <rapidjson/document.h>
#include <rapidjson/error/en.h>
#include <seastar/http/client.hh>
@@ -72,7 +73,7 @@ future<> instance_profile_credentials_provider::update_credentials() {
}
s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sstring& creds_response) {
rapidjson::Document document;
rjson::document document;
document.Parse(creds_response.data());
if (document.HasParseError()) {
@@ -81,9 +82,9 @@ s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sst
}
// Retrieve credentials
return {.access_key_id = document["AccessKeyId"].GetString(),
.secret_access_key = document["SecretAccessKey"].GetString(),
.session_token = document["Token"].GetString(),
return {.access_key_id = rjson::to_string(document["AccessKeyId"]),
.secret_access_key = rjson::to_string(document["SecretAccessKey"]),
.session_token = rjson::to_string(document["Token"]),
// Set the expiration to one minute earlier to ensure credentials are renewed slightly before they expire
.expires_at = seastar::lowres_clock::now() + std::chrono::seconds(session_duration - 60)};
}

View File

@@ -152,7 +152,7 @@ seastar::future<bool> client::check_status() {
}
auto resp = co_await std::move(f);
auto json = rjson::parse(std::move(resp.content));
co_return json.IsString() && json.GetString() == std::string_view("SERVING");
co_return json.IsString() && rjson::to_string_view(json) == "SERVING";
}
seastar::future<> client::close() {