Compare commits

...

98 Commits

Author SHA1 Message Date
Nadav Har'El
a4968f0342 Fix for Statement has no effect
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
2025-12-15 16:05:05 +02:00
Jenkins Promoter
d5641398f5 Update pgo profiles - aarch64 2025-12-15 05:16:31 +02:00
Nadav Har'El
c06e63daed Merge 'auth: start using SHA 512 hashing originated from musl with added yielding' from Andrzej Jackowski
This patch series contains the following changes:
 - Incorporation of `crypt_sha512.c` from musl to out codebase
 - Conversion of `crypt_sha512.c` to C++ and coroutinization
 - Coroutinization of `auth::passwords::check`
 - Enabling use of `__crypt_sha512` orignated from `crypt_sha512.c` for
   computing SHA 512 passwords of length <=255
 - Addition of yielding in the aforementioned hashing implementation.

The alien thread was a solution for reactor stalls caused by indivisible
password‑hashing tasks (https://github.com/scylladb/scylladb/issues/24524).
However, because there is only one alien thread, overall hashing throughput was reduced
(see, e.g., https://github.com/scylladb/scylla-enterprise/issues/5711). To address this,
the alien‑thread solution is reverted, and a hashing implementation
with yielding is introduced in this patch series.

Before this patch series, ScyllaDB used SHA-512 hashing provided
by the `crypt_r` function, which in our case meant using the implementation
from the `libxcrypt` library. Adding yielding to this `libxcrypt`
implementation is problematic, both due to licensing (LGPL) and because the
implementation is split into many functions across multiple files. In
contrast, the SHA-512 implementation from `musl libc` has a more
permissive license and is concise, which makes it easier to incorporate
into the ScyllaDB codebase.

The performance of this solution was compared with the previous
implementation that used one alien thread and the implementation
after the alien thread was reverted. The results (median) of
`perf-cql-raw` with `--connection-per-request 1 --smp 10` parameters
are as follows:
 - Alien thread: 41.5 new connections/s per shard
 - Reverted alien thread: 244.1 new connections/s per shard
 - This commit (yielding in hashing): 198.4 new connections/s per shard

The roughly 20% performance deterioration compared to
the old implementation without the alien thread comes from the fact
that the new hashing algorithm implemented in `utils/crypt_sha512.cc`
performs an expensive self-verification and stack cleanup.

On the other hand, with smp=10 the current implementation achieves
roughly 5x higher throughput than the alien thread. In addition,
due to yielding added in this commit, the algorithm is expected
to provide similar protection from stalls as the alien thread did.
In a test that in parallel started a cassandra-stress workload and
created thousands of new connections using python-driver, the values of
`scylla_reactor_stalls_count` metric were as follows:
 - Alien thread: 109 stalls/shard total
 - Reverted alien thread: 13186 stalls/shard total
 - This commit (yielding in hashing): 149 stalls/shard total

Similarly, the `scylla_scheduler_time_spent_on_task_quota_violations_ms`
values were:
 - Alien thread: 1087 ms/shard total
 - Reverted alien thread: 72839 ms/shard total
 - This commit (yielding in hashing): 1623 ms/shard total

To summarize, yielding during hashing computations achieves similar
throughput to the old solution without the alien thread but also
prevents stalls similarly to the alien thread.

Fixes: scylladb/scylladb#26859
Refs: scylladb/scylla-enterprise#5711

No automatic backport. After this PR is completed, the alien thread should be rather reverted from older branches (2025.2-2025.4 because on 2025.1 it's already removed). Backporting of the other commits needs further discussion.

Closes scylladb/scylladb#26860

* github.com:scylladb/scylladb:
  test/boost: add too_long_password to auth_passwords_test
  test/boost: add same_hashes_as_crypt_r to auth_passwords_test
  auth: utils: add yielding to crypt_sha512
  auth: change return type of passwords::check to future
  auth: remove code duplication in verify_scheme
  test/boost: coroutinize auth_passwords_test
  utils: coroutinize crypt_sha512
  utils: make crypt_sha512.cc to compile
  utils: license: import crypt_sha512.c from musl to the project
  Revert "auth: move passwords::check call to alien thread"
2025-12-14 14:01:01 +02:00
David Garcia
c1c3b2c5bb docs: fix local build
prevents early exits in metrics docs generation to break the local build.

Fixes #27497

Closes scylladb/scylladb#27615
2025-12-14 11:48:48 +02:00
Botond Dénes
7e7e378a4b Merge 'Revert "Merge 'Add option to use sstable identifier in snapshot' from Benny Halevy"' from null
Reverts commit 8192f45e84.

The merge exposed a critical bug where truncate operations during table drop with auto-snapshot fail, causing Raft applier fiber to stop with unhandled exceptions. This leads to schema inconsistencies across nodes and test failures with "Keyspace does not exist" errors.

**Root Cause**

Commit 19b6207f modified `truncate_table_on_all_shards` to set `use_sstable_identifier = true`:

```cpp
// Before (working)
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name);

// After (broken)
auto opts = db::snapshot_options{.use_sstable_identifier = true};
co_await table::snapshot_on_all_shards(sharded_db, table_shards, name, opts);
```

This triggers exceptions during snapshot that propagate through Raft state machine, causing:
- Raft applier stops: `raft::state_machine_error` at `raft/server.cc:1369`
- Schema changes fail to propagate
- Nodes report non-existent keyspaces for valid schemas

**Changes**

Reverts 15 files (200 deletions, 74 insertions):
- Removes `use_sstable_identifier` from truncate/snapshot code paths
- Reverts `snapshot_options` struct back to simple `skip_flush` boolean
- Removes REST API and nodetool `--use-sstable-identifier` parameter
- Removes feature tests from `test/boost/database_test.cc`

No backport required - the original feature was merged to master only and never released.

<!-- START COPILOT ORIGINAL PROMPT -->

<details>

<summary>Original prompt</summary>

----

*This section details on the original issue you should resolve*

<issue_title>test_table_drop_with_auto_snapshot failed with InvalidRequest</issue_title>
<issue_description>Seen in: https://jenkins.scylladb.com/job/scylla-master/job/next/9968//testReport
Logs: [download](https://downloads.scylladb.com/unstable/scylla/master/testLogs/2025-12-08T15:05:42Z/)

Error message:
```
cassandra.InvalidRequest: Error from server: code=2200 [Invalid query] message="Keyspace test does not exist"
```

Stacktrace:
```
manager = <test.pylib.manager_client.ManagerClient object at 0xffff734e6c10>

    @pytest.mark.asyncio
    async def test_table_drop_with_auto_snapshot(manager: ManagerClient):
        logger.info("Bootstrapping cluster")
        cfg = { 'auto_snapshot': True }
        servers = await manager.servers_add(3, config = cfg)

        cql = manager.get_cql()

        # Increases the chance of tablet migration concurrent with schema change
        await inject_error_on(manager, "tablet_allocator_shuffle", servers)

        for i in range(3):
            await cql.run_async("DROP KEYSPACE IF EXISTS test;")
            await cql.run_async("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8 };")
            await cql.run_async("CREATE TABLE IF NOT EXISTS test.tbl_sample_kv (id int, value text, PRIMARY KEY (id));")
>           await cql.run_async("INSERT INTO test.tbl_sample_kv (id, value) VALUES (1, 'ala');")
E           cassandra.InvalidRequest: Error from server: code=2200 [Invalid query] message="Keyspace test does not exist"

test/cluster/test_tablets2.py:173: InvalidRequest
```
</issue_description>

<agent_instructions>this issue was exposed by commit 8192f45e84, please send a pull request reverting that merge commit and mark it as fixing this github issue.</agent_instructions>

<comments>
<comment_new><author>@yaronkaikov</author><body>
@denesb is this something in your team area? if not , please feel free to delegate it or un-assign yourself :-)</body></comment_new>
<comment_new><author>@nyh</author><body>
This is very strange. Clearly the keyspace `test` does exist at this point, because we created it two lines above and also we ran `CREATE TABLE .. test.tbl_sample_kv` which would have failed if the keyspace `test` didn't exist - so it must exit, no?

In the past, we had a bug where the running `CREATE KEYSPACE IF NOT EXISTS` forgot to set the "schema modified" event in the response so it failed to wait for schema agreement, but 1. we fixed this bug (https://github.com/scylladb/scylladb/pull/18819 by @nuivall ) and 2. this bug didn't happen in this case, where CREATE TABLE deed had work to do.

But I just realized something... Our fix in https://github.com/scylladb/scylladb/pull/18819 only applies to CREATE KEYSPACE / TABLE / VIEW / TYPE statements. It wasn't applied to `DROP KEYSPACE` - and it should have been....

But I don't have a good theory how a bug like https://github.com/scylladb/scylladb/pull/18819 can explain this specific test failure. Different schema operations are already linearized, so if a `CREATE TABLE test.tbl_sample_kv` succeeded, I don't see how there could possibly be any earlier `DROP KEYSPACE test` that suddenly springs to life. Unless we have a serious bug in our raft-based schema operations.</body></comment_new>
<comment_new><author>@nyh</author><body>
Another bug we could have in theory is that the Python driver's async `cql.run_async` might have a bug where it is not waiting for the schema agreement despite being told to wait. If it doesn't wait for schema agreement, this can easily explain this bug:
1. the CREATE KEYSPACE, CREATE TABLE both are sent to node A, but
2. the last INSERT INTO is sent to node B which is not yet aware of this new keyspace and table, and fails.

Copilot claims that **execute_async() does have this bug!**

> For schema-altering statements, schema agreement (meaning all nodes agree on the new schema) is important before running follow-up operations, but this is enforced only by synchronous helpers like Session.execute(), not the asynchronous version.
> If you use execute_async() for schema operations, you are responsible for checking schema agreement yourself, using [Session.check_schema_agreement()](https://docs.datastax.com/en/developer/python-driver/latest/api/cassandra/cluster/#cassandra.cluster.Session.check_schema_agreement) or (in newer code) ResponseFuture.check_schema_agreement.
> According to [a discussion on the DataStax support forum](https://support.datastax.com/s/article/Does-the-Python-Driver-for-Cassandra-Wait-for-Schema-Agreement-after-a-Schema-Change?language=en_US) and the [driver’s source code](7f12a5e1c6/cassandra/cluster.py (L487)), schema agreement is not ch...

</details>

<!-- START COPILOT CODING AGENT SUFFIX -->

- Fixes scylladb/scylladb#27501

<!-- START COPILOT CODING AGENT TIPS -->
---

Closes scylladb/scylladb#27604

* github.com:scylladb/scylladb:
  Revert "Merge 'Add option to use sstable identifier in snapshot' from Benny Halevy"
  Initial plan
2025-12-12 13:20:49 +02:00
copilot-swe-agent[bot]
77ee7f3417 Revert "Merge 'Add option to use sstable identifier in snapshot' from Benny Halevy"
This reverts commit 8192f45e84.

The merge exposed a bug where truncate (via drop) fails and causes Raft
errors, leading to schema inconsistencies across nodes. This results in
test_table_drop_with_auto_snapshot failures with 'Keyspace test does not exist'
errors.

The specific problematic change was in commit 19b6207f which modified
truncate_table_on_all_shards to set use_sstable_identifier = true. This
causes exceptions during truncate that are not properly handled, leading
to Raft applier fiber stopping and nodes losing schema synchronization.
2025-12-12 03:55:13 +00:00
copilot-swe-agent[bot]
0ff89a58be Initial plan 2025-12-12 03:48:12 +00:00
Yaron Kaikov
f7ffa395a8 workflows: trigger CI automatically when conflicts label is removed
Add pull_request_target event with unlabeled type to trigger-scylla-ci
workflow. This allows automatic CI triggering when the 'conflicts' label
is removed from a PR, in addition to the existing manual trigger via
comment.

The workflow now runs when:
- A user posts a comment with '@scylladbbot trigger-ci' (existing)
- The 'conflicts' label is removed from a PR (new)

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-84

Closes scylladb/scylladb#27521
2025-12-11 16:48:06 +02:00
Piotr Smaron
3fa3b920de Update CODEOWNERS to remove redundant entries
Removing myself as I have no maintainer's permissions to review the code

Closes scylladb/scylladb#27576
2025-12-11 16:47:08 +02:00
Botond Dénes
e7ca52ee79 Merge 'api: storage_service/tablets/repair: disable incremental repair by default' from Benny Halevy
Change the default incremental_mode to `disabled` due to https://github.com/scylladb/scylladb/issues/26041 and https://github.com/scylladb/scylladb/issues/27414

** Backport to 2025.4 where 611918056a was introduced **

Closes scylladb/scylladb#27530

* github.com:scylladb/scylladb:
  api: storage_service/tablets/repair: disable incremental repair by default
  docs: nodetool-commands: cluster: repair: fix incremental-mode example
2025-12-11 15:23:09 +02:00
Botond Dénes
730eca5dac Merge 'Remove noexcept from storage_group and table functions to allow exception propagation' from null
Fixed a critical bug where `storage_group::for_each_compaction_group()` was incorrectly marked `noexcept`, causing `std::terminate` when actions threw exceptions (e.g., `utils::memory_limit_reached` during memory-constrained reader creation).

**Changes made:**
1. Removed `noexcept` from `storage_group::for_each_compaction_group()` declaration and implementation
2. Removed `noexcept` from `storage_group::compaction_groups()` overloads (they call for_each_compaction_group)
3. Removed `noexcept` from `storage_group::live_disk_space_used()` and `memtable_count()` (they call compaction_groups())
4. Kept `noexcept` on `storage_group::flush()` - it's a coroutine that automatically captures exceptions and returns them as exceptional futures
5. Removed `noexcept` from `table_load_stats()` functions in base class, table, and storage group managers

**Rationale:**

As noted by reviewers, there's no reason to kill the server if these functions throw. For coroutines returning futures, `noexcept` is appropriate because Seastar automatically captures exceptions and returns them as exceptional futures. For other functions, proper exception handling allows the system to recover gracefully instead of terminating.

Fixes #27475

Closes scylladb/scylladb#27476

* github.com:scylladb/scylladb:
  replica: Remove unnecessary noexcept
  replica: Remove noexcept from compaction_groups() functions
  replica: Remove noexcept from storage_group::for_each_compaction_group
2025-12-11 15:17:35 +02:00
Benny Halevy
c8cff94a5a api: storage_service/tablets/repair: disable incremental repair by default
Change the default incremental_mode to `disabled` due to
https://github.com/scylladb/scylladb/issues/26041 and
https://github.com/scylladb/scylladb/issues/27414

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-11 14:25:21 +02:00
Benny Halevy
5fae4cdf80 docs: nodetool-commands: cluster: repair: fix incremental-mode example
There is no 'regular' incremental mode anymore.
The example seems have meant 'disabled'.

Fixes #27587

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-11 14:25:11 +02:00
Marcin Maliszkiewicz
8bbcaacba1 auth: always catch by const reference
This is best practice.

Closes scylladb/scylladb#27525
2025-12-11 12:42:30 +01:00
Yaron Kaikov
3dfa5ebd7f Add JIRA issue validation to backport PR fixes check
Extend the Fixes validation pattern to also accept JIRA issue references
(format: [A-Z]+-\d+) in addition to GitHub issue references. This allows
backport PRs to reference JIRA issues in the format 'Fixes: PROJECT-123'.

Fixes: https://github.com/scylladb/scylladb/issues/27571

Closes scylladb/scylladb#27572
2025-12-11 12:23:16 +02:00
Avi Kivity
24264e24bb Revert "repair: Add tablet repair progress report support"
This reverts commit faad0167d7. It causes
a regression in

test_two_tablets_concurrent_repair_and_migration_repair_writer_level

in debug mode (with ~5%-10% probability).

Fixes #27510.

Closes scylladb/scylladb#27560
2025-12-11 12:18:11 +02:00
Nadav Har'El
0c64e3be9a Merge 'Unify and fix rjson string and string_view conversions' from Marcin Maliszkiewicz
This patch-set consolidates and corrects rjson string conversion handling.
It removes unnecessary string copies, ensures proper length usage and
replaces ad-hoc conversions with consistent helper functions.

Overall, the changes make rjson string handling safer, faster, and more uniform across the codebase.

Backport: no, it's a refactor

Closes scylladb/scylladb#27394

* github.com:scylladb/scylladb:
  fix rjson::value to bytes conversion with missing GetStringLength call
  alternator: change type from string to string_view in should_add_capacity
  fix rjson::value to string_view conversion with missing GetStringLength call
  use rjson::to_string_view when rjson::value gets converted using GetStringLength
  use rjson::to_sstring and rjson::to_string for various string conversions
  utils: use rjson document wrapper in instance_profile_credentials_provider::parse_creds
  utils: move rjson::to_string_view func to string related place
  utils: add to_sstring and to_string rjson helper
2025-12-11 12:05:41 +02:00
Marcin Maliszkiewicz
d5b63df46e transport: remove redundant futurize_invoke from counted data sink and source
Closes scylladb/scylladb#27526
2025-12-11 10:32:16 +03:00
Dario Mirovic
f545ed37bc test: dtest: audit_test.py: fix audit error log detection
`test_insert_failure_doesnt_report_success` test in `test/cluster/dtest/audit_test.py`
has an insert statement that is expected to fail. Dtest environment uses
`FlakyRetryPolicy`, which has `max_retries = 5`. 1 initial fail and 5 retry fails
means we expect 6 error audit logs.

The test failed because `create keyspace ks` failed once, then succeeded on retry.
It allowed the test to proceed properly, but the last part of the test that expects
exactly 6 failed queries actually had 7.

The goal of this patch is to make sure there are exactly 6 = 1 + `max_retries` failed
queries, counting only the query expected to fail. If other queries fail with
successful retry, it's fine. If other queries fail without successful retry, the test
will fail, as it should in such situations. They are not related to this expected
failed insert statement.

Fixes #27322

Closes scylladb/scylladb#27378
2025-12-11 10:17:07 +03:00
Benny Halevy
5f13880a91 utils: error_injection: wait_for_message: print injection_name and caller source_location on timeout
When waiting for the condition variable times out
we call on_internal_error, but unfortunately, the backtrace
it generates is obfuscated by
`coroutine_handle<seastar::internal::coroutine_traits_base<void>::promise_type>::resume`.

To make the log more useful, print the error injection name
and the caller's source_location in the timeout error message.

Fixes #27531

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#27532
2025-12-10 23:25:54 +01:00
Andrzej Jackowski
11ad32c85e test/boost: add too_long_password to auth_passwords_test
The test documents the current behavior of hashing algorithms that
fail if the passphrase has 512 bytes or more.

Moreover, it documents the behavior of the current bcrypt
implementation that compares only the first 72 bytes of the password.
Although we don't typically use bcrypt for password hashing, it is
possible to insert such a hash using
`CREATE ROLE ... WITH HASHED PASSWORD ...`.

Refs: scylladb/scylladb#26842
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
4c8c9cd548 test/boost: add same_hashes_as_crypt_r to auth_passwords_test
The test verifies that the old and new implementation of SHA-512
hashing returns exactly the same values.

Refs: scylladb/scylladb#26859
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
98f431dd81 auth: utils: add yielding to crypt_sha512
This change allows yielding during hashing computations to prevent
stalls.

The performance of this solution was compared with the previous
implementation that used one alien thread and the implementation
after the alien thread was reverted. The results (median) of
`perf-cql-raw` with `--connection-per-request 1 --smp 10` parameters
are as follows:
 - Alien thread: 41.5 new connections/s per shard
 - Reverted alien thread: 244.1 new connections/s per shard
 - This commit (yielding in hashing): 198.4 new connections/s per shard

The alien thread is limited by a single-core hashing throughput,
which is roughly 400-500 hashes/s in the test environment. Therefore,
with smp=10, the throughput is below 50 hashes/s, and the difference
between the alien thread and other solutions further increases with
higer smp.

The roughly 20% performance deterioration compared to
the old implementation without the alien thread comes from the fact
that the new hashing algorithm implemented in `utils/crypt_sha512.cc`
performs an expensive self-verification and stack cleanup.

On the other hand, with smp=10 the current implementation achieves
roughly 5x higher throughput than the alien thread. In addition,
due to yielding added in this commit, the algorithm is expected
to provide similar protection from stalls as the alien thread did.
In a test that in parallel started a cassandra-stress workload and
created thousands of new connections using python-driver, the values of
`scylla_reactor_stalls_count` metric were as follows:
 - Alien thread: 109 stalls/shard total
 - Reverted alien thread: 13186 stalls/shard total
 - This commit (yielding in hashing): 149 stalls/shard total

Similarly, the `scylla_scheduler_time_spent_on_task_quota_violations_ms`
values were:
 - Alien thread: 1087 ms/shard total
 - Reverted alien thread: 72839 ms/shard total
 - This commit (yielding in hashing): 1623 ms/shard total

To summarize, yielding during hashing computations achieves similar
throughput to the old solution without the alien thread but also
prevents stalls similarly to the alien thread.

Fixes: scylladb/scylladb#26859
Refs: scylladb/scylla-enterprise#5711
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
4ffdb0721f auth: change return type of passwords::check to future
Introduce a new `passwords::hash_with_salt_async` and change the return
type of `passwords::check` to `future<bool>`. This enables yielding
during password computations later in this patch series.

The old method, `hash_with_salt`, is marked as deprecated because
new code should use the new `hash_with_salt_async` function.
We are not removing `hash_with_salt` now to reduce the regression risk
of changing the hashing implementation—at least the methods that change
persistent hashes (CREATE, ALTER) will continue to use the old hashing
method. However, in the future, `hash_with_salt` should be entirely
removed.

Refs: scylladb/scylladb#26859
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
775906d749 auth: remove code duplication in verify_scheme
Refactoring: create a new function `verify_hashing_output` to reuse
code in `hash_with_salt` and `verify_scheme`. The change is introduced
to facilitate verification of hashing output when the implementation
is extended later in this patch series.

Refs: scylladb/scylladb#26859
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
11eca621b0 test/boost: coroutinize auth_passwords_test
This commit prepares `auth_passwords_test` for using coroutines,
because later in this patch series `auth::passwords::check` and other
similar functions will return Seastar futures.

Refs: scylladb/scylladb#26859
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
d7818b56df utils: coroutinize crypt_sha512
Change `sha512crypt` and `__crypt_sha512` to coroutines to allow
yielding during hash computations later in this patch series.

Refs: scylladb/scylladb#26859
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
033fed5734 utils: make crypt_sha512.cc to compile
The purpose of this change is to allow the usage of Seastar futures in
crypt_sha512 later in this patch series.

Refs: scylladb/scylladb#26859
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
c6c30b7d0a utils: license: import crypt_sha512.c from musl to the project
This patch imports the `crypt_sha512.c` file from the musl library.
We need it to incorporate yielding in the `crypt_r` function to avoid
reactor stalls during long hashing computations.

Before this patch series, ScyllaDB used SHA-512 hashing provided
by the `crypt_r` function, which in our case meant using the implementation
from the `libxcrypt` library. Adding yielding to this `libxcrypt`
implementation is problematic, both due to licensing (LGPL) and because the
implementation is split into many functions across multiple files. In
contrast, the SHA-512 implementation from `musl libc` has a more
permissive license and is concise, which makes it easier to incorporate
into the ScyllaDB codebase.

Both `crypt_sha512.c` and musl license are obtained from
git.musl-libc.org:
 - https://git.musl-libc.org/cgit/musl/tree/src/crypt/crypt_sha512.c
 - https://git.musl-libc.org/cgit/musl/tree/COPYRIGHT

Import commit:
  commit 1b76ff0767d01df72f692806ee5adee13c67ef88
  Author: Alex Rønne Petersen <alex@alexrp.com>
  Date:   Sun Oct 12 05:35:19 2025 +0200

  s390x: shuffle register usage in __tls_get_offset to avoid r0 as address

Refs: scylladb/scylladb#26859
2025-12-10 15:36:18 +01:00
Andrzej Jackowski
5afcec4a3d Revert "auth: move passwords::check call to alien thread"
The alien thread was a solution for reactor stalls caused by indivisible
password‑hashing tasks (scylladb/scylladb#24524). However, because
there is only one alien thread, overall hashing throughput was reduced
(see, e.g., scylladb/scylla-enterprise#5711). To address this,
the alien‑thread solution is reverted, and a hashing implementation
with yielding will be introduced later in this patch series.

This reverts commit 9574513ec1.
2025-12-10 15:36:09 +01:00
Tomasz Grabiec
0e51a1f812 replica: Remove unnecessary noexcept
Can potentially lead to unnecessary abort.

compaction_groups() and for_each_compaction_group() can throw.

Co-authored-by: bhalevy <20910904+bhalevy@users.noreply.github.com>
2025-12-10 14:51:35 +01:00
Tomasz Grabiec
8b807b299e replica: Remove noexcept from compaction_groups() functions
They can throw during merge, when the number of compaction groups
is higher than 3.

Callers can deal with that, so we shouldn't abort.
2025-12-10 14:48:23 +01:00
Tomasz Grabiec
07ff659849 replica: Remove noexcept from storage_group::for_each_compaction_group
They don't really have to be noexcept.

And "action" may actually throw, leading to abort.

It was observed to throw when creating memtable readers:

terminate called after throwing an instance of 'utils::memory_limit_reached'
   what():  kill limit triggered on semaphore sl:users by permit xxx
Aborting on shard 4, in scheduling group sl:users.

std::terminate() at ??:0
__clang_call_terminate at main.cc:0
replica::storage_group::for_each_compaction_group(std::function<void (seastar::lw_shared_ptr<replica::compaction_group> const&)>) const at ./replica/table.cc:920
 (inlined by) replica::table::add_memtables_to_reader_list(std::vector<mutation_reader, std::allocator<mutation_reader>>&, seastar::lw_shared_ptr<schema const> const&, reader_permit const&, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr const&, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>, std::function<void (unsigned long)>) const at ./replica/table.cc:196
 (inlined by) replica::table::make_reader_v2(seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>) const at ./replica/table.cc:243
 (inlined by) replica::table::as_mutation_source() const::$_0::operator()(seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>) const at ./replica/table.cc:3673
 (inlined by) mutation_reader std::__invoke_impl<mutation_reader, replica::table::as_mutation_source() const::$_0&, seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>>(std::__invoke_other, replica::table::as_mutation_source() const::$_0&, seastar::lw_shared_ptr<schema const>&&, reader_permit&&, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr&&, seastar::bool_class<streamed_mutation::forwarding_tag>&&, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>&&) at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/invoke.h:61
 (inlined by) std::enable_if<is_invocable_r_v<mutation_reader, replica::table::as_mutation_source() const::$_0&, seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>>, mutation_reader>::type std::__invoke_r<mutation_reader, replica::table::as_mutation_source() const::$_0&, seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>>(replica::table::as_mutation_source() const::$_0&, seastar::lw_shared_ptr<schema const>&&, reader_permit&&, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr&&, seastar::bool_class<streamed_mutation::forwarding_tag>&&, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>&&) at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/invoke.h:114
 (inlined by) std::_Function_handler<mutation_reader (seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>), replica::table::as_mutation_source() const::$_0>::_M_invoke(std::_Any_data const&, seastar::lw_shared_ptr<schema const>&&, reader_permit&&, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr&&, seastar::bool_class<streamed_mutation::forwarding_tag>&&, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>&&) at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/std_function.h:290
 (inlined by) std::function<mutation_reader (seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>)>::operator()(seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>) const at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/std_function.h:591
 (inlined by) mutation_source::make_reader_v2(seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position> const&, query::partition_slice const&, tracing::trace_state_ptr, seastar::bool_class<streamed_mutation::forwarding_tag>, seastar::bool_class<mutation_reader::partition_range_forwarding_tag>) const at ././readers/mutation_source.hh:143
query::querier_base::querier_base(seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position>, query::partition_slice, mutation_source const&, tracing::trace_state_ptr, query::querier_base::querier_config) at ././querier.hh:91
 (inlined by) query::querier::querier(mutation_source const&, seastar::lw_shared_ptr<schema const>, reader_permit, interval<dht::ring_position>, query::partition_slice, tracing::trace_state_ptr, query::querier_base::querier_config) at ././querier.hh:164
 (inlined by) replica::table::query(seastar::lw_shared_ptr<schema const>, reader_permit, query::read_command const&, query::result_options, std::vector<interval<dht::ring_position>, std::allocator<interval<dht::ring_position>>> const&, tracing::trace_state_ptr, query::result_memory_limiter&, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l>>>, std::optional<query::querier>*) at ./replica/table.cc:3583
replica::database::query(seastar::lw_shared_ptr<schema const>, query::read_command const&, query::result_options, std::vector<interval<dht::ring_position>, std::allocator<interval<dht::ring_position>>> const&, tracing::trace_state_ptr, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l>>>, std::variant<std::monostate, db::per_partition_rate_limit::account_only, db::per_partition_rate_limit::account_and_enforce>)::$_0::operator()(reader_permit) const at ./replica/database.cc:1533
 (inlined by) seastar::noncopyable_function<seastar::future<void> (reader_permit)>::indirect_vtable_for<replica::database::query(seastar::lw_shared_ptr<schema const>, query::read_command const&, query::result_options, std::vector<interval<dht::ring_position>, std::allocator<interval<dht::ring_position>>> const&, tracing::trace_state_ptr, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l>>>, std::variant<std::monostate, db::per_partition_rate_limit::account_only, db::per_partition_rate_limit::account_and_enforce>)::$_0>::call(seastar::noncopyable_function<seastar::future<void> (reader_permit)> const*, reader_permit) (.llvm.13537529942037499926) at ././seastar/include/seastar/util/noncopyable_function.hh:158
seastar::noncopyable_function<seastar::future<void> (reader_permit)>::operator()(reader_permit) const at ././seastar/include/seastar/util/noncopyable_function.hh:215
 (inlined by) reader_concurrency_semaphore::execution_loop() (.resume) at ./reader_concurrency_semaphore.cc:980
std::__n4861::coroutine_handle<seastar::internal::coroutine_traits_base<void>::promise_type>::resume() const at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/coroutine:242
 (inlined by) seastar::internal::coroutine_traits_base<void>::promise_type::run_and_dispose() at ./build/release/seastar/./seastar/include/seastar/core/coroutine.hh:122
 (inlined by) seastar::reactor::run_tasks(seastar::reactor::task_queue&) at ./build/release/seastar/./seastar/src/core/reactor.cc:2627
 (inlined by) seastar::reactor::run_some_tasks() at ./build/release/seastar/./seastar/src/core/reactor.cc:3099
seastar::reactor::do_run() at ./build/release/seastar/./seastar/src/core/reactor.cc:3267
seastar::smp::configure(seastar::smp_options const&, seastar::reactor_options const&)::$_0::operator()() const at ./build/release/seastar/./seastar/src/core/reactor.cc:4591
 (inlined by) void std::__invoke_impl<void, seastar::smp::configure(seastar::smp_options const&, seastar::reactor_options const&)::$_0&>(std::__invoke_other, seastar::smp::configure(seastar::smp_options const&, seastar::reactor_options const&)::$_0&) at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/invoke.h:61
 (inlined by) std::enable_if<is_invocable_r_v<void, seastar::smp::configure(seastar::smp_options const&, seastar::reactor_options const&)::$_0&>, void>::type std::__invoke_r<void, seastar::smp::configure(seastar::smp_options const&, seastar::reactor_options const&)::$_0&>(seastar::smp::configure(seastar::smp_options const&, seastar::reactor_options const&)::$_0&) at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/invoke.h:111
 (inlined by) std::_Function_handler<void (), seastar::smp::configure(seastar::smp_options const&, seastar::reactor_options const&)::$_0>::_M_invoke(std::_Any_data const&) at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/std_function.h:290
std::function<void ()>::operator()() const at /usr/lib/gcc/x86_64-redhat-linux/14/../../../../include/c++/14/bits/std_function.h:591

Fixes #27475

Co-authored-by: bhalevy <20910904+bhalevy@users.noreply.github.com>
2025-12-10 14:48:11 +01:00
Yaron Kaikov
d3e199984e auto-backport.py: modify instruction for making PR ready for review
Update the comment sent when PR has conflicts with clear instrauctions how to make the PR Ready for review

Fixes: https://scylladb.atlassian.net/browse/RELENG-152

Closes scylladb/scylladb#27547
2025-12-10 14:53:38 +02:00
Nadav Har'El
8822c23ad4 Merge 'test: cqlpy: test_protocol_exceptions.py: increase cpp exceptions thr…' from Dario Mirovic
…eshold

The initial problem:

Some of the tests in test_protocol_exceptions.py started failing. The failure is on the condition that no more than `cpp_exception_threshold` happened.

Test logic:

These tests assert that specific code paths do not throw an exception anymore. Initial implementation ran a code path once, and asserted there were 0 exceptions. Sometimes an exception or several can occur, not directly related to the code paths the tests check, but those would fail the tests.

The solution was to run the tests multiple times. If there is a regression, there would be at least as many exceptions thrown as there are test runs. If there is no regression, a few exceptions might happen, up to 10 per 100 test runs. I have arbitrarily chosen `run_count = 100` and `cpp_exception_threshold = 10` values.

Note that the exceptions are counted per shard, not per code path.

The new problem:

The occassional exceptions thrown by some parts of the server now throw a bit more than before. Based on the logs linked on the issues, it is usually 12.

There are possibly multiple ways to resolve the issue. I have considered logging exceptions and parsing them. I would have to filter exception logs only for wanted exceptions. However, if a new, different exception is introduced, it might not be counted.

Another approach is to just increase the threshold a bit. The issue of throwing more exceptions than before in some other server modules should be addressed by a set of tests for that module, just like these tests check protocol exceptions, not caring who used protocol check code paths.

For those reasons, the solution implemented here is to increase `cpp_exception_threshold` to `20`. It will not make the tests unreliable, because, as mentioned, if there is a regression, there would be at least `run_count` exceptions per `run_count` test runs (1 exception per single test run).

Still, to make "background exceptions" occurence a bit more normalized, `run_count` too is doubled, from `100` to `200`. At the first glance this looks like nothing is changed, but actually doubling both run count and exception threshold here implies that the burst does not scale as much as run count, it is just that the "jitter" is bigger than the old threshold.

Also, this patch series enables debug logging for `exception` logger. This will allow us to inspect which exceptions happened if a protocol exceptions test fails again.

Fixes #27247
Fixes #27325

Issue observed on master and branch-2025.4. The tests, in the same form, exist on master, branch-2025.4, branch-2025.3, branch-2025.2, and branch-2025.1. Code change is simple, and no issue is expected with backport automation. Thus, backports for all the aforementioned versions is requested.

Closes scylladb/scylladb#27412

* github.com:scylladb/scylladb:
  test: cqlpy: test_protocol_exceptions.py: enable debug exception logging
  test: cqlpy: test_protocol_exceptions.py: increase cpp exceptions threshold
2025-12-10 10:53:30 +02:00
Marcin Maliszkiewicz
be9992cfb3 fix rjson::value to bytes conversion with missing GetStringLength call 2025-12-09 19:27:22 +01:00
Marcin Maliszkiewicz
daf00a7f24 alternator: change type from string to string_view in should_add_capacity
It avoids allocation.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
62962f33bb fix rjson::value to string_view conversion with missing GetStringLength call
In some cases we unnecessarily convert to string which
causes a copy. In other we convert without calling
GetStringLength which causes iteration to dermine length
which is already known. In some cases we do even both.
This commit fixes that.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
060c2f7c0d use rjson::to_string_view when rjson::value gets converted using GetStringLength
This commit is only cosmetics, changes calls to GetStringLength
into rjson::to_string_view with the same underlying implementation.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
64149b57c3 use rjson::to_sstring and rjson::to_string for various string conversions
In some cases we ommit size checking which is wrong
as according to rapid json documentation strings may
contain \0 byte in the middle.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
4b004fcdfc utils: use rjson document wrapper in instance_profile_credentials_provider::parse_creds
So that we can use our common utility functions.
2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
5e38b3071b utils: move rjson::to_string_view func to string related place 2025-12-09 19:27:21 +01:00
Marcin Maliszkiewicz
225b3351fc utils: add to_sstring and to_string rjson helper
So that conversion code is common and it's easier
to avoid accidental type conversions. Additionally
according to rapid json library size must be checked
explicitly, this also avoids extra iteration in char*
to (s)string conversion.
2025-12-09 19:27:21 +01:00
Avi Kivity
80c6718ea8 build: update toolchain to Fedora 43 with clang 21.1.6
Rebase to Fedora 43 with clang 21.1 and libstdc++ 15.

Fedora container image registry moved to registry.fedoraproject.org as
it seems to be updated more regularly.

Added python3-devel to the dependencies as some packages scylla-cqlsh
depends on aren't yet available in the form of wheels for Python 3.14,
and so have to be built locally. In any case it's better to reduce
dependency on those wheels even if the ones currently missing appear
eventually.

Added libev-devel to the dependencies so that the python driver
builds correctly even if "wheels" are not published. This reduces
our dependency on the python driver's binary release schedule.
Without libev-devel, TLS does not work correctly.

We no long remove the clang and clang-libs packages. Doxygen
started depending on clang-libs, and removing them removes
doxygen, breaking the build when it looks for that. The build
will still pick up the optimized clang, since /usr/local/bin
is earlier in the path. We keep the clang package, since it allows
us to mess a little less with the directory structure.

Optimized clang binaries generates and stored in

  https://devpkg.scylladb.com/clang/clang-21.1.6-Fedora-43-aarch64.tar.gz
  https://devpkg.scylladb.com/clang/clang-21.1.6-Fedora-43-x86_64.tar.gz

With ./scripts/refresh-pgo-profiles.sh, the new compiler shows a small
performance improvement (instructions_per_op) in perf-simple-query:

clang 21:

259353.60 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35720 insns/op,   17427 cycles/op,        0 errors)
265940.08 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35725 insns/op,   17042 cycles/op,        0 errors)
262650.01 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35720 insns/op,   17240 cycles/op,        0 errors)
262881.22 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35675 insns/op,   17222 cycles/op,        0 errors)
264898.68 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35732 insns/op,   17070 cycles/op,        0 errors)
throughput:
	mean=   263144.72 standard-deviation=2528.69
	median= 262881.22 median-absolute-deviation=1753.96
	maximum=265940.08 minimum=259353.60
instructions_per_op:
	mean=   35714.47 standard-deviation=22.34
	median= 35720.38 median-absolute-deviation=10.20
	maximum=35732.14 minimum=35675.50
cpu_cycles_per_op:
	mean=   17200.12 standard-deviation=154.62
	median= 17221.70 median-absolute-deviation=129.77
	maximum=17427.33 minimum=17041.57

clang 20:

254431.39 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35883 insns/op,   17708 cycles/op,        0 errors)
259701.02 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35883 insns/op,   17351 cycles/op,        0 errors)
261166.92 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35912 insns/op,   17270 cycles/op,        0 errors)
260656.31 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35869 insns/op,   17289 cycles/op,        0 errors)
259628.13 tps ( 64.1 allocs/op,   0.0 logallocs/op,  14.1 tasks/op,   35946 insns/op,   17370 cycles/op,        0 errors)
throughput:
	mean=   259116.75 standard-deviation=2698.56
	median= 259701.02 median-absolute-deviation=1539.55
	maximum=261166.92 minimum=254431.39
instructions_per_op:
	mean=   35898.42 standard-deviation=30.69
	median= 35882.97 median-absolute-deviation=15.90
	maximum=35945.63 minimum=35869.02
cpu_cycles_per_op:
	mean=   17397.49 standard-deviation=178.35
	median= 17351.35 median-absolute-deviation=108.79
	maximum=17707.63 minimum=17269.68

Closes scylladb/scylladb#26773
2025-12-09 15:16:31 +02:00
Pavel Emelyanov
855b91ec20 scripts: Make PR merging check more granular
Currently we have 3 explicit checks, and some of them are configurable:
- Jenkins job being stable. Can be disabled with --force
- Whether submodule update is happenning. It's not allowed by default, and
  should be enabled with --allow-submodule option
- Target branch checking (recently merged #27249). Happens unconditionally

This PR unifies all checks in two ways.

First, each restriction can be lifted with --allow-foo options. The existing
--allow-submodule stays and two options are added:

- --allow-unstable to skip jenkins job check (like --force works now)
- --allow-any-branch to skip target branch check

Second, the --force option lifts all the known restrictions.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27294
2025-12-09 13:58:21 +02:00
Nadav Har'El
95e303faf3 Merge 'Refactor get_view_natural_endpoint' from Wojciech Mitros
With the introduction of rack-lists and the reliance of materialized views on them, the `get_view_natural_endpoint` function can be greatly simplified. When using tablets, instead of doing any index-matching, we can now pair base tables with views only in the same rack.
In this series we remove no longer needed code and reorganize the needed code for better clarity.
After the changes, the `get_view_natural_endpoint` function goes down from 245 lines to 85 lines, while the whole pairing-related text goes down from 346 lines to 239 lines.

Fixes https://github.com/scylladb/scylladb/issues/26313

Closes scylladb/scylladb#27383

* github.com:scylladb/scylladb:
  mv: replace the simple/complex rack-aware pairing with exact rack matching
  mv: split out vnode pairing code from get_view_natural_endpoint
  mv: unify self-pairing and rack-aware pairing into one bool
  mv: remove the workaround for left nodes when sending view updates
2025-12-09 13:19:13 +02:00
Nadav Har'El
8ba595e472 Merge 'alternator: fix batch writes during intranode tablet migrations' from Petr Gusev
Scylla implements `LWT` in the` storage_proxy::cas` method. This method expects to be called on a specific shard, represented by the `cas_shard` parameter. Clients must create this object before calling `storage_proxy::cas`, check its `this_shard()` method, and jump to `cas_shard.shard()` if it returns false.

The nuance is that by the time the request reaches the destination shard, the tablet may have already advanced in its migration state machine. For example, a client may acquire a `cas_shard` at the `streaming` tablet state, then submit a request to another shard via `smp::submit_to(cas_shard.shard())`. However, the new `cas_shard` created on that other shard might already be in the `write_both_read_new` state, and its `cas_shard.shard()` would not be equal to `this_shard_id()`. Such broken invariant results in an `on_internal_error` in `storage_proxy::cas`.

Clients of `storage_proxy::cas` are expected to check` cas_shard.this_shard()` and recursively jump to another shard if it returns false. Most calls to `storage_proxy::cas` already implement this logic. The only exception is `executor::do_batch_write`, which currently checks `cas_shard.this_shard()` only once. This can break the invariant if the tablet state changes more than once during the operation.

This PR fixes the issue by implementing recursive `cas_shard.this_shard()` checks in `executor::do_batch_write`. It also adds a test that reproduces the problem.

Fixes: scylladb/scylladb#27353

backport: need to be backported to 2025.4

Closes scylladb/scylladb#27396

* github.com:scylladb/scylladb:
  alternator/executor.cc: eliminate redundant dk copy
  alternator/executor.cc: release cas_shard on the original shard
  alternator/executor.cc: move shard check into cas_write
  alternator/executor.cc: make cas_write a private method
  alternator/executor.cc: make do_batch_write a private method
  alternator/executor.cc: fix indent
  test_alternator: add test_alternator_invalid_shard_for_lwt
2025-12-09 11:25:15 +02:00
Petr Gusev
608eee0357 alternator/executor.cc: eliminate redundant dk copy
A small refactoring/optimization.
2025-12-09 10:21:06 +01:00
Petr Gusev
0bcc2977bb alternator/executor.cc: release cas_shard on the original shard
Before this series, we kept the cas_shard on the original shard to
guard against tablet movements running in parallel with
storage_proxy::cas.

The bug addressed by this PR shows that this approach is flawed:
keeping the cas_shard on the original shard does not guarantee that
a new cas_shard acquired on the target shard won’t require another
jump.

We fixed this in the previous commit by checking cas_shard.this_shard()
on the target shard and continuing to jump to another shard if
necessary. Once cas_shard.this_shard() on the target shard returns
true, the storage_proxy::cas invariants are satisfied, and no other
cas_shard instances need to remain alive except the one passed
into storage_proxy::cas.
2025-12-09 10:21:06 +01:00
Petr Gusev
3a865fe991 alternator/executor.cc: move shard check into cas_write
This change ensures that if cas_shard points to a different shard,
the executor will continue issuing shard jumps until
cas_shard.this_shard() returns true. The commit simply moves the
this_shard() check from the parallel_for_each lambda into cas_write,
with minimal functional changes.

We enable test_alternator_invalid_shard_for_lwt since now it should
pass.

Fixes scylladb/scylladb#27353
2025-12-09 10:21:01 +01:00
Pavel Emelyanov
fb32e1c7ee Merge 'streaming: tablet_sstable_streamer::stream refactoring' from Ernest Zaslavsky
Refactor the way we decide the sstable belong to a tablet, fully or partially to simplify the flow and make it more readable. Also extract the logic and make it testable, add tests to cover changes

The change is purely aesthetic, no need to backport

Closes scylladb/scylladb#27101

* github.com:scylladb/scylladb:
  streaming: remove unnecessary lambda creating sstable token range
  streaming: simplify get_sstables_for_tablets logic
  streaming: switch to range-based for loop
  streaming: drop sstable skip microoptimization in tablet loop
  streaming: replace reverse iterators with reverse view in sstables scan
  streaming: return from get_sstables_for_tablets earlier
  streaming: add get_sstables_by_tablet_range tests
  test,sstables: add helper to set sstable first and last keys
  streaming: refactor get_sstables_for_tablets to make it accessible
  streaming: refactor get_sstables_for_tablets to make it testable
  streaming: refactor tablet_sstable_streamer::stream by extracting SST filtering logic
2025-12-09 10:53:57 +03:00
Patryk Jędrzejczak
b6895f0fa7 test: make test_broken_bootstrap faster
This change makes the test ~20 s faster. It's a forgotten follow-up:
https://github.com/scylladb/scylladb/pull/18927#discussion_r1627331946

Closes scylladb/scylladb#27445
2025-12-09 09:25:42 +02:00
Dario Mirovic
c30b326033 test: cqlpy: test_protocol_exceptions.py: enable debug exception logging
Enable debug logging for "exception" logger inside protocol exception tests.
The exceptions will be logged, and it will be possible to see which ones
occured if a protocol exceptions test fails.

Refs #27272
Refs #27325
2025-12-09 01:35:42 +01:00
Dario Mirovic
807fc68dc5 test: cqlpy: test_protocol_exceptions.py: increase cpp exceptions threshold
The initial problem:

Some of the tests in test_protocol_exceptions.py started failing. The failure is
on the condition that no more than `cpp_exception_threshold` happened.

Test logic:

These tests assert that specific code paths do not throw an exception anymore.
Initial implementation ran a code path once, and asserted there were 0 exceptions.
Sometimes an exception or several can occur, not directly related to the code paths
the tests check, but those would fail the tests.

The solution was to run the tests multiple times. If there is a regression, there
would be at least as many exceptions thrown as there are test runs. If there is no
regression, a few exceptions might happen, up to 10 per 100 test runs.
I have arbitrarily chosen `run_count = 100` and `cpp_exception_threshold = 10` values.

Note that the exceptions are counted per shard, not per code path.

The new problem:

The occassional exceptions thrown by some parts of the server now throw a bit more
than before. Based on the logs linked on the issues, it is usually 12.

There are possibly multiple ways to resolve the issue. I have considered logging
exceptions and parsing them. I would have to filter exception logs only for wanted
exceptions. However, if a new, different exception is introduced, it might not be
counted.

Another approach is to just increase the threshold a bit. The issue of throwing
more exceptions than before in some other server modules should be addressed by
a set of tests for that module, just like these tests check protocol exceptions,
not caring who used protocol check code paths.

For those reasons, the solution implemented here is to increase `cpp_exception_threshold`
to `20`. It will not make the tests unreliable, because, as mentioned, if there is a
regression, there would be at least `run_count` exceptions per `run_count` test runs
(1 exception per single test run).

Still, to make "background exceptions" occurence a bit more normalized, `run_count` too
is doubled, from `100` to `200`. At the first glance this looks like nothing is changed,
but actually doubling both run count and exception threshold here implies that the
exception burst does not scale as much as run count, it is just that the "jitter" is
bigger than the old threshold.

Fixes #27247
Fixes #27325
2025-12-09 01:34:48 +01:00
Michał Jadwiszczak
51843195f7 test/boost/view_build_test: increase number of retires
Default number of retires in `eventually()` in `test_builder_with_concurrent_drop`
sometimes is not enough to observe changes in system tables on aarch64
builds.

This patch increases the number of retries to 30.

Fixes scylladb/scylladb#27370

Closes scylladb/scylladb#27493
2025-12-08 23:14:01 +02:00
Gleb Natapov
7038b8b544 test/scylla_cluster: fix the check that a process failed to start
If the process is running returncode will be Node, otherwise it will
have some value (which can be 0 s well) and the current code treats 0
as if the process is still running.

Closes scylladb/scylladb#27490
2025-12-08 18:23:29 +01:00
Tomasz Grabiec
7df610b73d sstables: Remove host id mismatch warning for sstable streaming
Tablet migration transfers sstable files without changing origin
host-id.  As it should, becuase those sstables were not written on the
destination host, and should be ignored by commit log replay.

So it's a normal situation, and it's confusing to see this warning in
logs.

Fixes #26957

Closes scylladb/scylladb#27433
2025-12-08 18:39:22 +02:00
Piotr Dulikowski
386309d6a0 Merge 'Improve the way distributed-loader constructs storage_options for backup sstables' from Pavel Emelyanov
The distributed_loader::get_sstables_from_object_store() method accepts an endpoint parameter and internally wants to get storage type for that endpoint (s3 or gcs). This is needed to construct storage_options object to create an sstable object.

To get the type, the method scans db::config option, but there's much simpler way to get one.

Code cleanup, no need to backport

Closes scylladb/scylladb#27381

* github.com:scylladb/scylladb:
  sstables_loader: Provide endpoint type for get_sstables_from_object_store()
  storage_manager: Introduce get_endpoint_type() method
  storage_manager: Split get_endpoint_client()
2025-12-08 16:55:20 +01:00
Amnon Heiman
a213e41250 scylla-node-exporter: Add ethtool to node exporter
AWS suggests following multiple network performance metrics:
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/monitoring-network-performance-ena.html#network-performance-metrics

This patch enables the ethtool collector with the specific list of
metrics

Ater this patch the relevant metris looks like:

$ curl http://localhost:9100/metrics |& grep ethtool
node_ethtool_bw_in_allowance_exceeded{device="ens5"} 0
node_ethtool_bw_out_allowance_exceeded{device="ens5"} 0
node_ethtool_conntrack_allowance_available{device="ens5"} 51303
node_ethtool_conntrack_allowance_exceeded{device="ens5"} 0
node_ethtool_info{bus_info="0000:00:05.0",device="ens5",driver="ena",expansion_rom_version="",firmware_version="",version="6.14.0-1015-aws"} 1
node_ethtool_linklocal_allowance_exceeded{device="ens5"} 0
node_scrape_collector_duration_seconds{collector="ethtool"} 0.001091436
node_scrape_collector_success{collector="ethtool"} 1

Signed-off-by: Amnon Heiman <amnon@scylladb.com>

Closes scylladb/scylladb#27358
2025-12-08 14:27:10 +02:00
Dawid Mędrek
58dc414912 test/cluster/mv: Rewrite test_view_building_scheduling_group
We rewrite the test to avoid flakiness. Instead of looking at the
metrics, we make a trade-off and start depending on a less reliable
mechanism -- logs. We grep all relevant messages printed by Scylla
in TRACE mode and make sure that they were all printed from a context
using the streaming scheduling group.

Although it's a "less proper" way of testing, it should be much more
dependable and avoid flakiness.

Fixes scylladb/scylladb#25957

Closes scylladb/scylladb#26656
2025-12-08 14:24:25 +02:00
Ferenc Szili
d883ff2317 test: fix flakyness caused by TRUNCATE retries
The test test_truncate_during_topology_change tests TRUNCATE TABLE while
bootstrapping a new node. With tablets enabled TRUNCATE is a global
topology operation which needs to serialize with boostrap.

When TRUNCATE TABLE is issued, it first checks if there is an already
queued truncate for the same table. This can happen if a previous
TRUNCATE operation has timed out, and the client retried. The newly
issued truncate will only join the queued one if it is waiting to be
processed, and will fail immediatelly if the TRUNCATE is already being
processed.

In this test, TRUNCATE will be retried after a timeout (1 minute) due to
the default retry policy, and will be retried up to 3 times, while the
bootstrap is delayed by 2 minutes. This means that the test can validate
the result of a truncate which was started after bootstrap was
completed.

Because of the way truncate joins existing truncate operations, we can
also have the following scenario:
- TRUNCATE times out after one minute because the new node is being
  bootstrapped
- the client retries the TRUNCATE command which also times out after 1m
- the third attempt is received during TRUNCATE being processed which
  fails the test

This patch changes the retry policy of the TRUNCATE operation to
FallthroughRetryPolicy which guarantees that TRUNCATE will not be
retried on timeout. It also increases the timeout of the TRUNCATE from 1
to 4 minutes. This way the test will actually validate the performance
of the TRUNCATE operation which was issued during bootstrap, instead of
the subsequent, retried TRUNCATEs which could have been issued after the
bootstrap was complete.

Fixes: #26347

Closes scylladb/scylladb#27245
2025-12-08 14:13:26 +02:00
dependabot[bot]
1f777da863 build(deps): bump sphinx-scylladb-theme from 1.8.9 to 1.8.10 in /docs
Bumps [sphinx-scylladb-theme](https://github.com/scylladb/sphinx-scylladb-theme) from 1.8.9 to 1.8.10.
- [Release notes](https://github.com/scylladb/sphinx-scylladb-theme/releases)
- [Commits](https://github.com/scylladb/sphinx-scylladb-theme/commits)

---
updated-dependencies:
- dependency-name: sphinx-scylladb-theme
  dependency-version: 1.8.10
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Closes scylladb/scylladb#27468
2025-12-08 13:40:51 +02:00
Asias He
faad0167d7 repair: Add tablet repair progress report support
This patch adds tablet repair progress report support so that the user
could use the /task_manager/task_status API to query the progress.

In order to support this, a new system table is introduced to record the
user request related info, i.e, start of the request and end of the
request.

The progress is accurate when tablet split or merge happens in the
middle of the request, since the tokens of the tablet are recorded when
the request is started and when repair of each tablet is finished. The
original tablet repair is considered as finished when the finished
ranges cover the original tablet token ranges.

After this patch, the /task_manager/task_status API will report correct
progress_total and progress_completed.

Fixes #22564
Fixes #26896

Closes scylladb/scylladb#26924
2025-12-08 13:35:19 +02:00
Andrei Chekun
0115a21b9a test.py: fail test when timeout reached for boost test
There is a bug in current pytest's boost implementation. When timeout
reached process will be killed, but it was not correctly propagated,
that lead to a false positive result. This will fail test case when
timeout for the process is reached.
This is to prevent issues like this https://github.com/scylladb/scylladb/issues/27237

Closes scylladb/scylladb#27463
2025-12-08 11:49:46 +01:00
Ernest Zaslavsky
71834ce7dd streaming: remove unnecessary lambda creating sstable token range
The `sstable_token_range` lambda was only used once to create a token
range for an SSTable. Inline the construction directly where needed,
removing the extra lambda. This simplifies the code without changing
behavior.
2025-12-08 12:30:24 +02:00
Ernest Zaslavsky
df21112c39 streaming: simplify get_sstables_for_tablets logic
Remove the use of the `overlaps` helper and unnest nested conditionals
in get_sstables_for_tablets. Straightforward `before` and `after` checks
are sufficient to decide how each SSTable should be handled.
2025-12-08 12:30:24 +02:00
Ernest Zaslavsky
bd339cc4d8 streaming: switch to range-based for loop
Replace the explicit iterator loop with a range-based for loop. This
simplifies the code, enforces constness, and avoids the unnecessary
use of postfix increment. The behavior remains unchanged,but
readability and maintainability are improved.
2025-12-08 12:30:23 +02:00
Ernest Zaslavsky
91bf23eea1 streaming: drop sstable skip microoptimization in tablet loop
Remove the microoptimization that advanced over SSTables ending before a
tablet range. This approach is misleading since SSTables are not sorted
by their end token, and the extra logic adds complexity with little to
no benefit. The streaming path here is not performance‑critical, so the
simpler loop is preferable.
2025-12-08 12:30:23 +02:00
Ernest Zaslavsky
f925ed176b streaming: replace reverse iterators with reverse view in sstables scan
Use a reverse view over the SSTables vector instead of reverse iterators.
This avoids awkward rbegin/rend usage and the mental overhead of tracking
inverted sort order. With a view, we can use standard begin/end iteration
while preserving the intended scan direction.
2025-12-08 12:30:23 +02:00
Ernest Zaslavsky
68dcd1b1b2 streaming: return from get_sstables_for_tablets earlier
Check if tablets or sstables list is empty and if so, return immediately
2025-12-08 12:30:23 +02:00
Ernest Zaslavsky
6fd5160947 streaming: add get_sstables_by_tablet_range tests
Add a comprehensive test suite that exercises various combinations of
SSTable containment within tablet ranges. These cases cover boundary
conditions, partial overlaps, and full containment to validate all
recent changes made to `get_sstables_by_tablet_range`.
2025-12-08 12:30:23 +02:00
Ernest Zaslavsky
3fc914ca59 test,sstables: add helper to set sstable first and last keys
Introduce a utility helper to set the first and last decorated keys on
an SSTable. This is intended for testing purposes, making it easier to
construct SSTables with defined boundaries in unit tests.
2025-12-08 12:30:23 +02:00
Ernest Zaslavsky
6ef7ad9b5a streaming: refactor get_sstables_for_tablets to make it accessible
Create `get_sstables_for_tablets_for_tests` friend free function
for testing purposes. Adding this free function allows
direct testing without requiring the full streamer context.
2025-12-08 12:30:23 +02:00
Ernest Zaslavsky
581b8ace83 streaming: refactor get_sstables_for_tablets to make it testable
Make the `get_sstables_for_tablets` member function `static`. This
is a step toward improved testability, allowing the function to be
invoked directly without requiring a full instance of the streamer.
2025-12-08 12:30:23 +02:00
Pavel Emelyanov
8192f45e84 Merge 'Add option to use sstable identifier in snapshot' from Benny Halevy
This change adds a new option to the REST api and correspondingly, to scylla nodetool: use_sstable_identifier.
When set, we use the sstable identifier, if available, to name each sstable in the snapshots directory
and the manifest.json file, rather than using the sstable generation.

This can be used by the user (e.g. Scylla Manager) for global deduplication with tablets, where an sstable
may be migrated across shards or across nodes, and in this case, its generation may change, but its
sstable identifier remains sstable.

Currently, Scylla manager uses the sstable generation to detect sstables that are already backed up to
object storage and exist in previous backed up snapshots.
Historically, the sstable generation was guaranteed to be unique only per table per node,
so the dedup code currently checks for deduplication in the node scope.

However, with tablet migration, sstables are renamed when migrated to a different shard,
i.e. their generation changes, and they may be renamed when migrated to another node,
but even if they are not, the dedup logic still assumes uniqueness only within a node.

To address both cases, we keep the sstable_id stable throughout the sstable life cycle (since 3a12ad96c7).
Given the globally unique sstable identifier, scylla manager can now detect duplicate sstables
in a wider scope.  This can be cluster-wide, but we practically need only rack-wide deduplication
or dc-wide, as tablets are migrated across racks only in rare occasions (like when converting from a
numerical replication factor to a rack list containing a subset of the available racks in a datacenter).

Fixes #27181

* New feature, no backport required

Closes scylladb/scylladb#27184

* github.com:scylladb/scylladb:
  database: truncate_table_on_all_shards: set use_sstable_identifier to true
  nodetool: snapshot: add --use-sstable-identifier option
  api: storage_service: take_snapshot: add use_sstable_identifier option
  test: database_test: add snapshot_use_sstable_identifier_works
  test: database_test: snapshot_works: add validate_manifest
  sstable: write_scylla_metadata: add random_sstable_identifier error injection
  table: snapshot_on_all_shards: take snapshot_options
  sstable: add get_format getter
  sstable: snapshot: add use_sstable_identifier option
  db: snapshot_ctl: snapshot_options: add use_sstable_identifier options
  db: snapshot_ctl: move skip_flush to struct snapshot_options
2025-12-08 12:56:12 +03:00
Petr Gusev
c6eec4eeef alternator/executor.cc: make cas_write a private method
We will need to access executor::_stats field from cas_write. We could
pass it as a paramter, but it seems simpler to just make cas_write
and instance method too.
2025-12-08 10:29:54 +01:00
Petr Gusev
9bef142328 alternator/executor.cc: make do_batch_write a private method
We will need to access executor::_stats field on other shards.
2025-12-08 10:29:54 +01:00
Petr Gusev
74bf24a4a7 alternator/executor.cc: fix indent 2025-12-08 10:29:28 +01:00
Petr Gusev
e60bcd0011 test_alternator: add test_alternator_invalid_shard_for_lwt
This test reproduces scylladb/scylladb#27353 using two injection
points. First, the test triggers an intra-node tablet migration and
suspends it at the streaming stage using the
intranode_migration_streaming_wait injection. Next, it enables the
alternator_executor_batch_write_wait injection, which suspends a
batch write after its cas_shard has already been created.
The test then issues several batch writes and waits until one of them
hits this injection on the destination shard. At this point, the
cas_shard.erm for that write is still in the streaming state,
meaning the executor would need to jump back to the source shard.
The test then resumes the suspended tablet migration, allowing it to
update the ERM on the source shard to write_both_read_new. After that,
the test releases the suspended batch write and expects it to perform
two shard jumps: first from the destination to the source shard, and
then again back to the source shard.

This commit adds the alternator_executor_batch_write_wait injection to
alternator/executor.cc. Coroutines are intentionally avoided in the
parallel_for_each lambda to prevent unnecessary coroutine-frame
allocations.
2025-12-08 10:29:28 +01:00
Benny Halevy
19b6207f17 database: truncate_table_on_all_shards: set use_sstable_identifier to true
To facilitate global sstable deduplication on backup.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:39 +02:00
Benny Halevy
ff52550739 nodetool: snapshot: add --use-sstable-identifier option
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:39 +02:00
Benny Halevy
e654045755 api: storage_service: take_snapshot: add use_sstable_identifier option
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:39 +02:00
Benny Halevy
07b92a1ee8 test: database_test: add snapshot_use_sstable_identifier_works
Test that taking a snapshot with the use_sstable_identifier
option (and injecting `random_sstable_identifier`) produces
different file names in the snapshot than the original
sstable names and validate te manifest.json file respectively.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:57:38 +02:00
Benny Halevy
7504d10d9e test: database_test: snapshot_works: add validate_manifest
Validate the manifest.json format by loading it using rjson::parse
and then validate its contents to ensure it lists exactly the
SSTables present in the snapshot directory.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
28cb300d0a sstable: write_scylla_metadata: add random_sstable_identifier error injection
To be used by a unit test in the following patch for testing
the snapshot use_sstable_identifier option.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
9b3fbedc8c table: snapshot_on_all_shards: take snapshot_options
And pass the use_sstable_identifier down the stack
to the sstables layer.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
420fb1fd53 sstable: add get_format getter
To be used by the snapshot code in te following patch
for manufacturing a basename using the sstable_id rather
than its generation.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:55:50 +02:00
Benny Halevy
7c62417b54 sstable: snapshot: add use_sstable_identifier option
When set to true, use the sstable_identifier as the sstable name
in the snapshot rather than its generation.

sstable::snapshot now returns the generation it used
for the sstable in the snapshot, based on the `use_sstable_identifier`
option, to be used by the upper layer generating the manifest.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 11:53:32 +02:00
Benny Halevy
1c45ad7cee db: snapshot_ctl: snapshot_options: add use_sstable_identifier options
To be used for naming sstables in the snapshot by their
sstable identifiers rather than their generation, to
facilitate global deduplication of sstables in backup.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 09:46:35 +02:00
Benny Halevy
c18133b6cb db: snapshot_ctl: move skip_flush to struct snapshot_options
Prepare for adding another option: use_sstable_identifer.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-04 09:46:35 +02:00
Wojciech Mitros
6221c58325 mv: replace the simple/complex rack-aware pairing with exact rack matching
When the initial version of rack-aware pairing was introduced, materialized
views with tablets were still experimental. Since then, we decided that
we'll only allow materialized views in clusters where the base table and
the view are replicated on the same racks, with one replica of each tablet
on each rack.
This allows us to remove almost all logic from our base-view pairing. The
only check for the paired view replica is now whether it's in the same
rack as the base replica sending the update.
In this patch we replace the simple and complex rack-aware pairing with
the simple check above.
Because of this, we have to remove a test case from network_topology_strategy_test
which was testing complex pairing. The tested topology is not supported
for views with tablets (or is unlikely to be supported, as it's a random test),
so there's no use keeping the test.
The test case for simple rack aware pairing was kept, but now we only test
the case where each rack has one replica, not multiple.
Additionally, we split finding of an unpaired replica to a separate function
and partially rewrite it without reusing the helper stuctures that were
present when calculating the simple and complex rack-aware pairing.
We only look for an unpaired replica if we couldn't find a paired replica
ourselves or if the number of view replicas didn't match the base replicas.
If an unpaired replica appears while these conditions pass, we won't send
an extra update, but that would be a new bug altogether, because we only
expect the unpaired replica to appear during RF changes, so when these
conditions aren't fulfilled.

Fixes https://github.com/scylladb/scylladb/issues/26313
2025-12-02 10:52:36 +01:00
Pavel Emelyanov
6c115c691f sstables_loader: Provide endpoint type for get_sstables_from_object_store()
Currently the method scans db::config to find one. It has some
drawbacks. First, it's not very nice. Second, it needs to handle the
case when the endpoint is missing, while it relally never is. Third, the
type in config entry is not necessarily set.

It's nicer to get the type from storage manager.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-12-02 11:18:32 +03:00
Pavel Emelyanov
5924c36b50 storage_manager: Introduce get_endpoint_type() method
So that other code (spoiler: see next patch) have simple API to get one.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-12-02 11:18:27 +03:00
Pavel Emelyanov
ad6a73c29b storage_manager: Split get_endpoint_client()
To get the get_endpoint() internal helper for future use.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-12-02 11:18:23 +03:00
Wojciech Mitros
4ec0fa6eb5 mv: split out vnode pairing code from get_view_natural_endpoint
To avoid repeatedly checking whether we're using tablets and having
to use unnecesarily flexible code fitting both cases, we split out
the base-view pairing code for the case of vnodes to another function.
The get_view_natural_endpoint will now have only common steps,
a call to that function, and steps specific to tablets.
2025-12-02 03:32:36 +01:00
Wojciech Mitros
c313b215e4 mv: unify self-pairing and rack-aware pairing into one bool
We always use "legacy self pairing" when not using tablets, and
the "rack aware pairing" has been enabled in every version where
views with tablets isn't experimental. So in practice, instead
of checking these variables we can just look at whether the
table uses tablets.
2025-12-02 03:32:32 +01:00
Wojciech Mitros
7c612e1789 mv: remove the workaround for left nodes when sending view updates
At one point, the get_view_natural_endpoint was using IP for the
view update (and hint) destinations, but the hint code was using
host_id for the destinations. When a node left, we could no longer
have a mapping for a IP to host_id and when trying to store a hint
for this IP, we'd crash.
We worked around this issue by dropping the view update completely
if the target is in the "left" state.
Since then, we also moved to host_id's in the view update code, so
there's no longer any translation needed when storing the hints.
Additionally, we now drain hints not when entering the "left" state,
but when the node actually stops owning tokens.
Because of that, the workaround is not needed anymore, so we remove
it in this commit.
The existing test_mv_tablets_empty_ip case verifies that indeed, we
do not crash in the original problematic scenario.
2025-12-01 12:27:28 +01:00
Ernest Zaslavsky
f0e2941e34 streaming: refactor tablet_sstable_streamer::stream by extracting SST filtering logic
Extract the SST filtering logic into a dedicated member function. This
prepares the code for independent testing without requiring the entire
streamer to be initialized.
2025-11-30 18:27:15 +02:00
87 changed files with 1937 additions and 852 deletions

8
.github/CODEOWNERS vendored
View File

@@ -1,5 +1,5 @@
# AUTH
auth/* @nuivall @ptrsmrn
auth/* @nuivall
# CACHE
row_cache* @tgrabiec
@@ -25,11 +25,11 @@ compaction/* @raphaelsc
transport/*
# CQL QUERY LANGUAGE
cql3/* @tgrabiec @nuivall @ptrsmrn
cql3/* @tgrabiec @nuivall
# COUNTERS
counters* @nuivall @ptrsmrn
tests/counter_test* @nuivall @ptrsmrn
counters* @nuivall
tests/counter_test* @nuivall
# DOCS
docs/* @annastuchlik @tzach

View File

@@ -62,7 +62,7 @@ def create_pull_request(repo, new_branch_name, base_branch_name, pr, backport_pr
if is_draft:
labels_to_add.append("conflicts")
pr_comment = f"@{pr.user.login} - This PR was marked as draft because it has conflicts\n"
pr_comment += "Please resolve them and mark this PR as ready for review"
pr_comment += "Please resolve them and remove the 'conflicts' label. The PR will be made ready for review automatically."
backport_pr.create_issue_comment(pr_comment)
# Apply all labels at once if we have any

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? (?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)`;
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -7,7 +7,7 @@ on:
- enterprise
paths:
- '**/*.cc'
- 'scripts/metrics-config.yml'
- 'scripts/metrics-config.yml'
- 'scripts/get_description.py'
- 'docs/_ext/scylladb_metrics.py'
@@ -15,20 +15,20 @@ jobs:
validate-metrics:
runs-on: ubuntu-latest
name: Check metrics documentation coverage
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: '3.10'
- name: Install dependencies
run: pip install PyYAML
- name: Validate metrics
run: python3 scripts/get_description.py --validate -c scripts/metrics-config.yml

View File

@@ -3,10 +3,13 @@ name: Trigger Scylla CI Route
on:
issue_comment:
types: [created]
pull_request_target:
types:
- unlabeled
jobs:
trigger-jenkins:
if: github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')
if: (github.event.comment.user.login != 'scylladbbot' && contains(github.event.comment.body, '@scylladbbot') && contains(github.event.comment.body, 'trigger-ci')) || github.event.label.name == 'conflicts'
runs-on: ubuntu-latest
steps:
- name: Trigger Scylla-CI-Route Jenkins Job

View File

@@ -42,7 +42,7 @@ comparison_operator_type get_comparison_operator(const rjson::value& comparison_
if (!comparison_operator.IsString()) {
throw api_error::validation(fmt::format("Invalid comparison operator definition {}", rjson::print(comparison_operator)));
}
std::string op = comparison_operator.GetString();
std::string op = rjson::to_string(comparison_operator);
auto it = ops.find(op);
if (it == ops.end()) {
throw api_error::validation(fmt::format("Unsupported comparison operator {}", op));
@@ -377,8 +377,8 @@ bool check_compare(const rjson::value* v1, const rjson::value& v2, const Compara
return cmp(unwrap_number(*v1, cmp.diagnostic), unwrap_number(v2, cmp.diagnostic));
}
if (kv1.name == "S") {
return cmp(std::string_view(kv1.value.GetString(), kv1.value.GetStringLength()),
std::string_view(kv2.value.GetString(), kv2.value.GetStringLength()));
return cmp(rjson::to_string_view(kv1.value),
rjson::to_string_view(kv2.value));
}
if (kv1.name == "B") {
auto d_kv1 = unwrap_bytes(kv1.value, v1_from_query);
@@ -470,9 +470,9 @@ static bool check_BETWEEN(const rjson::value* v, const rjson::value& lb, const r
return check_BETWEEN(unwrap_number(*v, diag), unwrap_number(lb, diag), unwrap_number(ub, diag), bounds_from_query);
}
if (kv_v.name == "S") {
return check_BETWEEN(std::string_view(kv_v.value.GetString(), kv_v.value.GetStringLength()),
std::string_view(kv_lb.value.GetString(), kv_lb.value.GetStringLength()),
std::string_view(kv_ub.value.GetString(), kv_ub.value.GetStringLength()),
return check_BETWEEN(rjson::to_string_view(kv_v.value),
rjson::to_string_view(kv_lb.value),
rjson::to_string_view(kv_ub.value),
bounds_from_query);
}
if (kv_v.name == "B") {

View File

@@ -8,6 +8,8 @@
#include "consumed_capacity.hh"
#include "error.hh"
#include "utils/rjson.hh"
#include <fmt/format.h>
namespace alternator {
@@ -32,12 +34,12 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
if (!return_consumed->IsString()) {
throw api_error::validation("Non-string ReturnConsumedCapacity field in request");
}
std::string consumed = return_consumed->GetString();
std::string_view consumed = rjson::to_string_view(*return_consumed);
if (consumed == "INDEXES") {
throw api_error::validation("INDEXES consumed capacity is not supported");
}
if (consumed != "TOTAL") {
throw api_error::validation("Unknown consumed capacity "+ consumed);
throw api_error::validation(fmt::format("Unknown consumed capacity {}", consumed));
}
return true;
}

View File

@@ -419,7 +419,7 @@ static std::optional<std::string> find_table_name(const rjson::value& request) {
if (!table_name_value->IsString()) {
throw api_error::validation("Non-string TableName field in request");
}
std::string table_name = table_name_value->GetString();
std::string table_name = rjson::to_string(*table_name_value);
return table_name;
}
@@ -546,7 +546,7 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
// does exist but the index does not (ValidationException).
if (proxy.data_dictionary().has_schema(keyspace_name, orig_table_name)) {
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", index_name->GetString(), orig_table_name));
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
@@ -587,7 +587,7 @@ static std::string get_string_attribute(const rjson::value& value, std::string_v
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
attribute_name, value));
}
return std::string(attribute_value->GetString(), attribute_value->GetStringLength());
return rjson::to_string(*attribute_value);
}
// Convenience function for getting the value of a boolean attribute, or a
@@ -1080,8 +1080,8 @@ static void add_column(schema_builder& builder, const std::string& name, const r
}
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (attribute_info["AttributeName"].GetString() == name) {
auto type = attribute_info["AttributeType"].GetString();
if (rjson::to_string_view(attribute_info["AttributeName"]) == name) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
data_type dt = parse_key_type(type);
if (computed_column) {
// Computed column for GSI (doesn't choose a real column as-is
@@ -1116,7 +1116,7 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First element of KeySchema must be an object");
}
const rjson::value *v = rjson::find((*key_schema)[0], "KeyType");
if (!v || !v->IsString() || v->GetString() != std::string("HASH")) {
if (!v || !v->IsString() || rjson::to_string_view(*v) != "HASH") {
throw api_error::validation("First key in KeySchema must be a HASH key");
}
v = rjson::find((*key_schema)[0], "AttributeName");
@@ -1124,14 +1124,14 @@ static std::pair<std::string, std::string> parse_key_schema(const rjson::value&
throw api_error::validation("First key in KeySchema must have string AttributeName");
}
validate_attr_name_length(supplementary_context, v->GetStringLength(), true, "HASH key in KeySchema - ");
std::string hash_key = v->GetString();
std::string hash_key = rjson::to_string(*v);
std::string range_key;
if (key_schema->Size() == 2) {
if (!(*key_schema)[1].IsObject()) {
throw api_error::validation("Second element of KeySchema must be an object");
}
v = rjson::find((*key_schema)[1], "KeyType");
if (!v || !v->IsString() || v->GetString() != std::string("RANGE")) {
if (!v || !v->IsString() || rjson::to_string_view(*v) != "RANGE") {
throw api_error::validation("Second key in KeySchema must be a RANGE key");
}
v = rjson::find((*key_schema)[1], "AttributeName");
@@ -1887,8 +1887,8 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string def_type = type_to_string(def.type);
for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) {
const rjson::value& attribute_info = *it;
if (attribute_info["AttributeName"].GetString() == def.name_as_text()) {
auto type = attribute_info["AttributeType"].GetString();
if (rjson::to_string_view(attribute_info["AttributeName"]) == def.name_as_text()) {
std::string_view type = rjson::to_string_view(attribute_info["AttributeType"]);
if (type != def_type) {
throw api_error::validation(fmt::format("AttributeDefinitions redefined {} to {} already a key attribute of type {} in this table", def.name_as_text(), type, def_type));
}
@@ -2362,7 +2362,7 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
bytes column_name = to_bytes(it->name.GetString());
bytes column_name = to_bytes(rjson::to_string_view(it->name));
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
@@ -2783,10 +2783,10 @@ static void verify_all_are_used(const rjson::value* field,
return;
}
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
if (!used.contains(it->name.GetString())) {
if (!used.contains(rjson::to_string(it->name))) {
throw api_error::validation(
format("{} has spurious '{}', not used in {}",
field_name, it->name.GetString(), operation));
field_name, rjson::to_string_view(it->name), operation));
}
}
}
@@ -3000,7 +3000,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
}
static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
sstring table_name = batch_request->name.GetString(); // JSON keys are always strings
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
try {
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
} catch(data_dictionary::no_such_column_family&) {
@@ -3055,17 +3055,44 @@ public:
}
};
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector<put_or_delete_item>& mutation_builders,
service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit) {
future<> executor::cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit)
{
if (!cas_shard.this_shard()) {
_stats.shard_bounce_for_lwt++;
return container().invoke_on(cas_shard.shard(), _ssg,
[cs = client_state.move_to_other_shard(),
&mb = mutation_builders,
&dk,
ks = schema->ks_name(),
cf = schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(executor& self) mutable {
return do_with(cs.get(), [&mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt), &self]
(service::client_state& client_state) mutable {
auto schema = self._proxy.data_dictionary().find_schema(ks, cf);
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return self.cas_write(schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
});
}
auto timeout = executor::default_timeout();
auto op = std::make_unique<put_or_delete_item_cas_request>(schema, mutation_builders);
auto* op_ptr = op.get();
auto cdc_opts = cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility =
schema->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
schema->cdc_options().enabled() && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
return proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
return _proxy.cas(schema, std::move(cas_shard), *op_ptr, nullptr, to_partition_ranges(dk),
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).finally([op = std::move(op)]{}).discard_result();
@@ -3091,13 +3118,11 @@ struct schema_decorated_key_equal {
// FIXME: if we failed writing some of the mutations, need to return a list
// of these failed mutations rather than fail the whole write (issue #5650).
static future<> do_batch_write(service::storage_proxy& proxy,
smp_service_group ssg,
future<> executor::do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit,
stats& stats) {
service_permit permit) {
if (mutation_builders.empty()) {
return make_ready_future<>();
}
@@ -3119,7 +3144,7 @@ static future<> do_batch_write(service::storage_proxy& proxy,
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return proxy.mutate(std::move(mutations),
return _proxy.mutate(std::move(mutations),
db::consistency_level::LOCAL_QUORUM,
executor::default_timeout(),
trace_state,
@@ -3128,7 +3153,7 @@ static future<> do_batch_write(service::storage_proxy& proxy,
false,
cdc::per_request_options{
.alternator = true,
.alternator_streams_increased_compatibility = any_cdc_enabled && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
.alternator_streams_increased_compatibility = any_cdc_enabled && _proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
});
} else {
// Do the write via LWT:
@@ -3140,46 +3165,35 @@ static future<> do_batch_write(service::storage_proxy& proxy,
schema_decorated_key_hash,
schema_decorated_key_equal>;
auto key_builders = std::make_unique<map_type>(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
for (auto& b : mutation_builders) {
auto dk = dht::decorate_key(*b.first, b.second.pk());
auto [it, added] = key_builders->try_emplace(schema_decorated_key{b.first, dk});
for (auto&& b : std::move(mutation_builders)) {
auto [it, added] = key_builders->try_emplace(schema_decorated_key {
.schema = b.first,
.dk = dht::decorate_key(*b.first, b.second.pk())
});
it->second.push_back(std::move(b.second));
}
auto* key_builders_ptr = key_builders.get();
return parallel_for_each(*key_builders_ptr, [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (const auto& e) {
stats.write_using_lwt++;
return parallel_for_each(*key_builders_ptr, [this, &client_state, trace_state, permit = std::move(permit)] (const auto& e) {
_stats.write_using_lwt++;
auto desired_shard = service::cas_shard(*e.first.schema, e.first.dk.token());
if (desired_shard.this_shard()) {
return cas_write(proxy, e.first.schema, std::move(desired_shard), e.first.dk, e.second, client_state, trace_state, permit);
} else {
stats.shard_bounce_for_lwt++;
return proxy.container().invoke_on(desired_shard.shard(), ssg,
[cs = client_state.move_to_other_shard(),
&mb = e.second,
&dk = e.first.dk,
ks = e.first.schema->ks_name(),
cf = e.first.schema->cf_name(),
gt = tracing::global_trace_state_ptr(trace_state),
permit = std::move(permit)]
(service::storage_proxy& proxy) mutable {
return do_with(cs.get(), [&proxy, &mb, &dk, ks = std::move(ks), cf = std::move(cf),
trace_state = tracing::trace_state_ptr(gt)]
(service::client_state& client_state) mutable {
auto schema = proxy.data_dictionary().find_schema(ks, cf);
auto s = e.first.schema;
// The desired_shard on the original shard remains alive for the duration
// of cas_write on this shard and prevents any tablet operations.
// However, we need a local instance of cas_shard on this shard
// to pass it to sp::cas, so we just create a new one.
service::cas_shard cas_shard(*schema, dk.token());
//FIXME: Instead of passing empty_service_permit() to the background operation,
// the current permit's lifetime should be prolonged, so that it's destructed
// only after all background operations are finished as well.
return cas_write(proxy, schema, std::move(cas_shard), dk, mb, client_state, std::move(trace_state), empty_service_permit());
});
}).finally([desired_shard = std::move(desired_shard)]{});
}
static const auto* injection_name = "alternator_executor_batch_write_wait";
return utils::get_local_injector().inject(injection_name, [s = std::move(s)] (auto& handler) -> future<> {
const auto ks = handler.get("keyspace");
const auto cf = handler.get("table");
const auto shard = std::atoll(handler.get("shard")->data());
if (ks == s->ks_name() && cf == s->cf_name() && shard == this_shard_id()) {
elogger.info("{}: hit", injection_name);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
elogger.info("{}: continue", injection_name);
}
}).then([&e, desired_shard = std::move(desired_shard),
&client_state, trace_state = std::move(trace_state), permit = std::move(permit), this]() mutable
{
return cas_write(e.first.schema, std::move(desired_shard), e.first.dk,
std::move(e.second), client_state, std::move(trace_state), std::move(permit));
});
}).finally([key_builders = std::move(key_builders)]{});
}
}
@@ -3327,7 +3341,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
_stats.wcu_total[stats::DELETE_ITEM] += wcu_delete_units;
_stats.api_operations.batch_write_item_batch_total += total_items;
_stats.api_operations.batch_write_item_histogram.add(total_items);
co_await do_batch_write(_proxy, _ssg, std::move(mutation_builders), client_state, trace_state, std::move(permit), _stats);
co_await do_batch_write(std::move(mutation_builders), client_state, trace_state, std::move(permit));
// FIXME: Issue #5650: If we failed writing some of the updates,
// need to return a list of these failed updates in UnprocessedItems
// rather than fail the whole write (issue #5650).
@@ -3372,7 +3386,7 @@ static bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>
}
rjson::value newv = rjson::empty_object();
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
std::string attr = it->name.GetString();
std::string attr = rjson::to_string(it->name);
auto x = members.find(attr);
if (x != members.end()) {
if (x->second) {
@@ -3592,7 +3606,7 @@ static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& re
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, it->GetString());
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
@@ -4258,12 +4272,12 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
attribute_collector& modified_attrs, bool& any_updates, bool& any_deletes) const {
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
// Note that it.key() is the name of the column, *it is the operation
bytes column_name = to_bytes(it->name.GetString());
bytes column_name = to_bytes(rjson::to_string_view(it->name));
const column_definition* cdef = _schema->get_column_definition(column_name);
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(format("UpdateItem cannot update key column {}", it->name.GetString()));
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
}
std::string action = (it->value)["Action"].GetString();
std::string action = rjson::to_string((it->value)["Action"]);
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -5460,7 +5474,7 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
std::string key = it->name.GetString();
sstring key = rjson::to_sstring(it->name);
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
@@ -5468,13 +5482,13 @@ calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (sstring(key) == pk_cdef.name_as_text()) {
if (key == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
@@ -5875,7 +5889,7 @@ future<executor::request_return_type> executor::list_tables(client_state& client
rjson::value* exclusive_start_json = rjson::find(request, "ExclusiveStartTableName");
rjson::value* limit_json = rjson::find(request, "Limit");
std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : "";
std::string exclusive_start = exclusive_start_json ? rjson::to_string(*exclusive_start_json) : "";
int limit = limit_json ? limit_json->GetInt() : 100;
if (limit < 1 || limit > 100) {
co_return api_error::validation("Limit must be greater than 0 and no greater than 100");

View File

@@ -40,6 +40,7 @@ namespace cql3::selection {
namespace service {
class storage_proxy;
class cas_shard;
}
namespace cdc {
@@ -57,6 +58,7 @@ class schema_builder;
namespace alternator {
class rmw_operation;
class put_or_delete_item;
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
bool is_alternator_keyspace(const sstring& ks_name);
@@ -219,6 +221,16 @@ private:
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
future<> do_batch_write(
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit);
future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk,
const std::vector<put_or_delete_item>& mutation_builders, service::client_state& client_state,
tracing::trace_state_ptr trace_state, service_permit permit);
public:
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);

View File

@@ -496,7 +496,7 @@ const std::pair<std::string, const rjson::value*> unwrap_set(const rjson::value&
return {"", nullptr};
}
auto it = v.MemberBegin();
const std::string it_key = it->name.GetString();
const std::string it_key = rjson::to_string(it->name);
if (it_key != "SS" && it_key != "BS" && it_key != "NS") {
return {std::move(it_key), nullptr};
}

View File

@@ -93,7 +93,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
if (v->GetStringLength() < 1 || v->GetStringLength() > 255) {
co_return api_error::validation("The length of AttributeName must be between 1 and 255");
}
sstring attribute_name(v->GetString(), v->GetStringLength());
sstring attribute_name = rjson::to_sstring(*v);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {

View File

@@ -3051,7 +3051,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled' mode.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -9,7 +9,6 @@
#include "auth/allow_all_authenticator.hh"
#include "service/migration_manager.hh"
#include "utils/alien_worker.hh"
#include "utils/class_registrator.hh"
namespace auth {
@@ -23,7 +22,6 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
cache&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}

View File

@@ -14,7 +14,6 @@
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
class query_processor;
@@ -30,7 +29,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&) {
allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&) {
}
virtual future<> start() override {

View File

@@ -35,14 +35,13 @@ static const class_registrator<auth::authenticator
, cql3::query_processor&
, ::service::raft_group0_client&
, ::service::migration_manager&
, auth::cache&
, utils::alien_worker&> cert_auth_reg(CERT_AUTH_NAME);
, auth::cache&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&, utils::alien_worker&)
auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, auth::cache&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();
@@ -77,9 +76,9 @@ auth::certificate_authenticator::certificate_authenticator(cql3::query_processor
throw std::invalid_argument(fmt::format("Invalid source: {}", map.at(cfg_source_attr)));
}
continue;
} catch (std::out_of_range&) {
} catch (const std::out_of_range&) {
// just fallthrough
} catch (boost::regex_error&) {
} catch (const boost::regex_error&) {
std::throw_with_nested(std::invalid_argument(fmt::format("Invalid query expression: {}", map.at(cfg_query_attr))));
}
}

View File

@@ -10,7 +10,6 @@
#pragma once
#include "auth/authenticator.hh"
#include "utils/alien_worker.hh"
#include <boost/regex_fwd.hpp> // IWYU pragma: keep
namespace cql3 {
@@ -34,7 +33,7 @@ class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
~certificate_authenticator();
future<> start() override;

View File

@@ -94,7 +94,7 @@ static future<> create_legacy_metadata_table_if_missing_impl(
try {
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts),
std::move(group0_guard), format("auth: create {} metadata table", table->cf_name()));
} catch (exceptions::already_exists_exception&) {}
} catch (const exceptions::already_exists_exception&) {}
}
}

View File

@@ -256,7 +256,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name, ::service::g
} else {
co_await collect_mutations(_qp, mc, query, {sstring(role_name)});
}
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
}
}
@@ -293,13 +293,13 @@ future<> default_authorizer::revoke_all_legacy(const resource& resource) {
[resource](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
}
});
});
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", resource, e);
return make_ready_future();
}

View File

@@ -49,8 +49,7 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
cache&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -64,14 +63,13 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: _qp(qp)
, _group0_client(g0)
, _migration_manager(mm)
, _cache(cache)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
, _hashing_worker(hashing_worker)
{}
static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
@@ -330,20 +328,18 @@ future<authenticated_user> password_authenticator::authenticate(
}
salted_hash = role->salted_hash;
}
const bool password_match = co_await _hashing_worker.submit<bool>([password = std::move(password), salted_hash] {
return passwords::check(password, *salted_hash);
});
const bool password_match = co_await passwords::check(password, *salted_hash);
if (!password_match) {
throw exceptions::authentication_exception("Username and/or password are incorrect");
}
co_return username;
} catch (std::system_error &) {
} catch (const std::system_error &) {
std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
} catch (exceptions::request_execution_exception& e) {
} catch (const exceptions::request_execution_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.what()));
} catch (exceptions::authentication_exception& e) {
} catch (const exceptions::authentication_exception& e) {
std::throw_with_nested(e);
} catch (exceptions::unavailable_exception& e) {
} catch (const exceptions::unavailable_exception& e) {
std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
} catch (...) {
std::throw_with_nested(exceptions::authentication_exception("authentication failed"));

View File

@@ -18,7 +18,6 @@
#include "auth/passwords.hh"
#include "auth/cache.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
namespace db {
class config;
@@ -49,13 +48,12 @@ class password_authenticator : public authenticator {
shared_promise<> _superuser_created_promise;
// We used to also support bcrypt, SHA-256, and MD5 (ref. scylladb#24524).
constexpr static auth::passwords::scheme _scheme = passwords::scheme::sha_512;
utils::alien_worker& _hashing_worker;
public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&);
password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
~password_authenticator();

View File

@@ -7,6 +7,8 @@
*/
#include "auth/passwords.hh"
#include "utils/crypt_sha512.hh"
#include <seastar/core/coroutine.hh>
#include <cerrno>
@@ -21,27 +23,48 @@ static thread_local crypt_data tlcrypt = {};
namespace detail {
void verify_hashing_output(const char * res) {
if (!res || (res[0] == '*')) {
throw std::system_error(errno, std::system_category());
}
}
void verify_scheme(scheme scheme) {
const sstring random_part_of_salt = "aaaabbbbccccdddd";
const sstring salt = sstring(prefix_for_scheme(scheme)) + random_part_of_salt;
const char* e = crypt_r("fisk", salt.c_str(), &tlcrypt);
if (e && (e[0] != '*')) {
return;
try {
verify_hashing_output(e);
} catch (const std::system_error& ex) {
throw no_supported_schemes();
}
throw no_supported_schemes();
}
sstring hash_with_salt(const sstring& pass, const sstring& salt) {
auto res = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
if (!res || (res[0] == '*')) {
throw std::system_error(errno, std::system_category());
}
verify_hashing_output(res);
return res;
}
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt) {
sstring res;
// Only SHA-512 hashes for passphrases shorter than 256 bytes can be computed using
// the __crypt_sha512 method. For other computations, we fall back to the
// crypt_r implementation from `<crypt.h>`, which can stall.
if (salt.starts_with(prefix_for_scheme(scheme::sha_512)) && pass.size() <= 255) {
char buf[128];
const char * output_ptr = co_await __crypt_sha512(pass.c_str(), salt.c_str(), buf);
verify_hashing_output(output_ptr);
res = output_ptr;
} else {
const char * output_ptr = crypt_r(pass.c_str(), salt.c_str(), &tlcrypt);
verify_hashing_output(output_ptr);
res = output_ptr;
}
co_return res;
}
std::string_view prefix_for_scheme(scheme c) noexcept {
switch (c) {
case scheme::bcrypt_y: return "$2y$";
@@ -58,8 +81,9 @@ no_supported_schemes::no_supported_schemes()
: std::runtime_error("No allowed hashing schemes are supported on this system") {
}
bool check(const sstring& pass, const sstring& salted_hash) {
return detail::hash_with_salt(pass, salted_hash) == salted_hash;
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash) {
const auto pwd_hash = co_await detail::hash_with_salt_async(pass, salted_hash);
co_return pwd_hash == salted_hash;
}
} // namespace auth::passwords

View File

@@ -11,6 +11,7 @@
#include <random>
#include <stdexcept>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
@@ -75,10 +76,19 @@ sstring generate_salt(RandomNumberEngine& g, scheme scheme) {
///
/// Hash a password combined with an implementation-specific salt string.
/// Deprecated in favor of `hash_with_salt_async`.
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
sstring hash_with_salt(const sstring& pass, const sstring& salt);
[[deprecated("Use hash_with_salt_async instead")]] sstring hash_with_salt(const sstring& pass, const sstring& salt);
///
/// Async version of `hash_with_salt` that returns a future.
/// If possible, hashing uses `coroutine::maybe_yield` to prevent reactor stalls.
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
seastar::future<sstring> hash_with_salt_async(const sstring& pass, const sstring& salt);
} // namespace detail
@@ -107,6 +117,6 @@ sstring hash(const sstring& pass, RandomNumberEngine& g, scheme scheme) {
///
/// \throws \ref std::system_error when an unexpected implementation-specific error occurs.
///
bool check(const sstring& pass, const sstring& salted_hash);
seastar::future<bool> check(const sstring& pass, const sstring& salted_hash);
} // namespace auth::passwords

View File

@@ -35,10 +35,9 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
cache&,
utils::alien_worker&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
cache&> saslauthd_auth_reg("com.scylladb.auth.SaslauthdAuthenticator");
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&, utils::alien_worker&)
saslauthd_authenticator::saslauthd_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&, cache&)
: _socket_path(qp.db().get_config().saslauthd_socket_path())
{}

View File

@@ -12,7 +12,6 @@
#include "auth/authenticator.hh"
#include "auth/cache.hh"
#include "utils/alien_worker.hh"
namespace cql3 {
class query_processor;
@@ -30,7 +29,7 @@ namespace auth {
class saslauthd_authenticator : public authenticator {
sstring _socket_path; ///< Path to the domain socket on which saslauthd is listening.
public:
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&,utils::alien_worker&);
saslauthd_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&, cache&);
future<> start() override;

View File

@@ -191,8 +191,7 @@ service::service(
::service::migration_manager& mm,
const service_config& sc,
maintenance_socket_enabled used_by_maintenance_socket,
cache& cache,
utils::alien_worker& hashing_worker)
cache& cache)
: service(
std::move(c),
cache,
@@ -200,7 +199,7 @@ service::service(
g0,
mn,
create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache, hashing_worker),
create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm, cache),
create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm, cache),
used_by_maintenance_socket) {
}
@@ -226,7 +225,7 @@ future<> service::create_legacy_keyspace_if_missing(::service::migration_manager
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), seastar::format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (::service::group0_concurrent_modification&) {
} catch (const ::service::group0_concurrent_modification&) {
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}

View File

@@ -27,7 +27,6 @@
#include "cql3/description.hh"
#include "seastarx.hh"
#include "service/raft/raft_group0_client.hh"
#include "utils/alien_worker.hh"
#include "utils/observable.hh"
#include "utils/serialized_action.hh"
#include "service/maintenance_mode.hh"
@@ -131,8 +130,7 @@ public:
::service::migration_manager&,
const service_config&,
maintenance_socket_enabled,
cache&,
utils::alien_worker&);
cache&);
future<> start(::service::migration_manager&, db::system_keyspace&);

View File

@@ -192,7 +192,7 @@ future<> standard_role_manager::legacy_create_default_role_if_missing() {
{_superuser},
cql3::query_processor::cache_internal::no).discard_result();
log.info("Created default superuser role '{}'.", _superuser);
} catch(const exceptions::unavailable_exception& e) {
} catch (const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
throw e;
}

View File

@@ -38,8 +38,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache, utils::alien_worker& hashing_worker)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache, hashing_worker)) {
transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm, cache& cache)
: transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm, cache)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -81,7 +81,7 @@ public:
}).handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::authentication_exception&) {
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -126,7 +126,7 @@ public:
virtual bytes evaluate_response(bytes_view client_response) override {
try {
return _sasl->evaluate_response(client_response);
} catch (exceptions::authentication_exception&) {
} catch (const exceptions::authentication_exception&) {
_complete = true;
return {};
}
@@ -141,7 +141,7 @@ public:
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);
} catch (exceptions::authentication_exception&) {
} catch (const exceptions::authentication_exception&) {
// return anon user
return make_ready_future<authenticated_user>(anonymous_user());
}
@@ -241,8 +241,7 @@ static const class_registrator<
cql3::query_processor&,
::service::raft_group0_client&,
::service::migration_manager&,
auth::cache&,
utils::alien_worker&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
auth::cache&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<
auth::authorizer,

View File

@@ -859,6 +859,7 @@ scylla_core = (['message/messaging_service.cc',
'utils/alien_worker.cc',
'utils/array-search.cc',
'utils/base64.cc',
'utils/crypt_sha512.cc',
'utils/logalloc.cc',
'utils/large_bitset.cc',
'utils/buffer_input_stream.cc',
@@ -1479,7 +1480,6 @@ deps = {
pure_boost_tests = set([
'test/boost/anchorless_list_test',
'test/boost/auth_passwords_test',
'test/boost/auth_resource_test',
'test/boost/big_decimal_test',
'test/boost/caching_options_test',

View File

@@ -1744,6 +1744,115 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
&& std::ranges::contains(shards, this_shard_id());
}
static endpoints_to_update get_view_natural_endpoint_vnodes(
locator::host_id me,
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
locator::endpoint_dc_rack my_location,
const locator::network_topology_strategy* network_topology,
replica::cf_stats& cf_stats) {
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector base_endpoints, view_endpoints;
auto& my_datacenter = my_location.dc;
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (!network_topology || node.get().dc() == my_datacenter) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
}
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || view_node.get().dc() == my_datacenter) {
view_endpoints.push_back(view_node);
}
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
return {.natural_endpoint = view_endpoints[idx].get().host_id()};
}
static std::optional<locator::host_id> get_unpaired_view_endpoint(
std::vector<std::reference_wrapper<const locator::node>> base_nodes,
std::vector<std::reference_wrapper<const locator::node>> view_nodes,
replica::cf_stats& cf_stats) {
std::unordered_set<locator::endpoint_dc_rack> base_dc_racks;
for (auto&& base_node : base_nodes) {
if (base_dc_racks.contains(base_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple base table replicas in the same dc/rack({}/{}):",
base_node.get().dc(), base_node.get().rack());
return std::nullopt;
}
base_dc_racks.insert(base_node.get().dc_rack());
}
std::unordered_set<locator::endpoint_dc_rack> paired_view_dc_racks;
std::unordered_map<locator::endpoint_dc_rack, locator::host_id> unpaired_view_dc_rack_replicas;
for (auto&& view_node : view_nodes) {
if (paired_view_dc_racks.contains(view_node.get().dc_rack()) || unpaired_view_dc_rack_replicas.contains(view_node.get().dc_rack())) {
// We can't do rack-aware pairing if there are multiple replicas in the same rack.
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Can't perform base-view pairing in this topology. There are multiple view table replicas in the same dc/rack({}/{}):",
view_node.get().dc(), view_node.get().rack());
return std::nullopt;
}
// Track unpaired replicas in both sets
if (base_dc_racks.contains(view_node.get().dc_rack())) {
paired_view_dc_racks.insert(view_node.get().dc_rack());
} else {
unpaired_view_dc_rack_replicas.insert({view_node.get().dc_rack(), view_node.get().host_id()});
}
}
if (unpaired_view_dc_rack_replicas.size() > 0) {
// There are view replicas that can't be paired with any base replica
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
auto extra_replica = unpaired_view_dc_rack_replicas.begin()->second;
unpaired_view_dc_rack_replicas.erase(unpaired_view_dc_rack_replicas.begin());
if (unpaired_view_dc_rack_replicas.size() > 0) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
unpaired_view_dc_rack_replicas | std::views::values);
}
return extra_replica;
}
return std::nullopt;
}
// Calculate the node ("natural endpoint") to which this node should send
// a view update.
//
@@ -1756,29 +1865,19 @@ bool should_generate_view_updates_on_this_shard(const schema_ptr& base, const lo
// of this function is to find, assuming that this node is one of the base
// replicas for a given partition, the paired view replica.
//
// In the past, we used an optimization called "self-pairing" that if a single
// node was both a base replica and a view replica for a write, the pairing is
// modified so that this node would send the update to itself. This self-
// pairing optimization could cause the pairing to change after view ranges
// are moved between nodes, so currently we only use it if
// use_legacy_self_pairing is set to true. When using tablets - where range
// movements are common - it is strongly recommended to set it to false.
// When using vnodes, we have an optimization called "self-pairing" - if a single
// node is both a base replica and a view replica for a write, the pairing is
// modified so that this node sends the update to itself and this node is removed
// from the lists of nodes paired by index. This self-pairing optimization can
// cause the pairing to change after view ranges are moved between nodes.
//
// If the keyspace's replication strategy is a NetworkTopologyStrategy,
// we pair only nodes in the same datacenter.
//
// When use_legacy_self_pairing is enabled, if one of the base replicas
// also happens to be a view replica, it is paired with itself
// (with the other nodes paired by order in the list
// after taking this node out).
//
// If the table uses tablets and the replication strategy is NetworkTopologyStrategy
// and the replication factor in the node's datacenter is a multiple of the number
// of racks in the datacenter, then pairing is rack-aware. In this case,
// all racks have the same number of replicas, and those are never migrated
// outside their racks. Therefore, the base replicas are naturally paired with the
// view replicas that are in the same rack, based on the ordinal position.
// Note that typically, there is a single replica per rack and pairing is trivial.
// If the table uses tablets, then pairing is rack-aware. In this case, in each
// rack where we have a base replica there is also one replica of each view tablet.
// Therefore, the base replicas are naturally paired with the view replicas that
// are in the same rack.
//
// If the assumption that the given base token belongs to this replica
// does not hold, we return an empty optional.
@@ -1806,19 +1905,12 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_legacy_self_pairing,
bool use_tablets_rack_aware_view_pairing,
bool use_tablets,
replica::cf_stats& cf_stats) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
auto& my_location = topology.get_location(me);
auto& my_datacenter = my_location.dc;
auto* network_topology = dynamic_cast<const locator::network_topology_strategy*>(&replication_strategy);
auto rack_aware_pairing = use_tablets_rack_aware_view_pairing && network_topology;
bool simple_rack_aware_pairing = false;
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
node_vector orig_base_endpoints, orig_view_endpoints;
node_vector base_endpoints, view_endpoints;
auto resolve = [&] (const locator::topology& topology, const locator::host_id& ep, bool is_view) -> const locator::node& {
if (auto* np = topology.find_node(ep)) {
@@ -1829,6 +1921,7 @@ endpoints_to_update get_view_natural_endpoint(
// We need to use get_replicas() for pairing to be stable in case base or view tablet
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
using node_vector = std::vector<std::reference_wrapper<const locator::node>>;
auto base_nodes = base_erm->get_replicas(base_token) | std::views::transform([&] (const locator::host_id& ep) -> const locator::node& {
return resolve(topology, ep, false);
}) | std::ranges::to<node_vector>();
@@ -1852,231 +1945,43 @@ endpoints_to_update get_view_natural_endpoint(
// note that the recursive call will not recurse again because leaving_base is in base_nodes.
auto leaving_base = it->get().host_id();
return get_view_natural_endpoint(leaving_base, base_erm, view_erm, replication_strategy, base_token,
view_token, use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
view_token, use_tablets, cf_stats);
}
}
}
std::function<bool(const locator::node&)> is_candidate;
if (network_topology) {
is_candidate = [&] (const locator::node& node) { return node.dc() == my_datacenter; };
} else {
is_candidate = [&] (const locator::node&) { return true; };
}
auto process_candidate = [&] (node_vector& nodes, std::reference_wrapper<const locator::node> node) {
if (is_candidate(node)) {
nodes.emplace_back(node);
}
};
for (auto&& base_node : base_nodes) {
process_candidate(base_endpoints, base_node);
if (!use_tablets) {
return get_view_natural_endpoint_vnodes(
me,
base_nodes,
view_nodes,
my_location,
network_topology,
cf_stats);
}
if (use_legacy_self_pairing) {
for (auto&& view_node : view_nodes) {
auto it = std::ranges::find(base_endpoints, view_node.get().host_id(), std::mem_fn(&locator::node::host_id));
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
// We don't return an extra endpoint, as it's only needed when
// using tablets (so !use_legacy_self_pairing)
if (view_node.get().host_id() == me && it != base_endpoints.end()) {
return {.natural_endpoint = me};
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (is_candidate(view_node)) {
view_endpoints.push_back(view_node);
}
}
} else {
for (auto&& view_node : view_nodes) {
process_candidate(view_endpoints, view_node);
std::optional<locator::host_id> paired_replica;
for (auto&& view_node : view_nodes) {
if (view_node.get().dc_rack() == my_location) {
paired_replica = view_node.get().host_id();
break;
}
}
// Try optimizing for simple rack-aware pairing
// If the numbers of base and view replica differ, that means an RF change is taking place
// and we can't use simple rack-aware pairing.
if (rack_aware_pairing && base_endpoints.size() == view_endpoints.size()) {
auto dc_rf = network_topology->get_replication_factor(my_datacenter);
const auto& racks = topology.get_datacenter_rack_nodes().at(my_datacenter);
// Simple rack-aware pairing is possible when the datacenter replication factor
// is a multiple of the number of racks in the datacenter.
if (dc_rf % racks.size() == 0) {
simple_rack_aware_pairing = true;
size_t rack_rf = dc_rf / racks.size();
// If any rack doesn't have enough nodes to satisfy the per-rack rf
// simple rack-aware pairing is disabled.
for (const auto& [rack, nodes] : racks) {
if (nodes.size() < rack_rf) {
simple_rack_aware_pairing = false;
break;
}
}
}
if (dc_rf != base_endpoints.size()) {
// If the datacenter replication factor is not equal to the number of base replicas,
// we're in progress of a RF change and we can't use simple rack-aware pairing.
simple_rack_aware_pairing = false;
}
if (simple_rack_aware_pairing) {
std::erase_if(base_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
std::erase_if(view_endpoints, [&] (const locator::node& node) { return node.dc_rack() != my_location; });
}
if (paired_replica && base_nodes.size() == view_nodes.size()) {
// We don't need to find any extra replicas, so we can return early
return {.natural_endpoint = paired_replica};
}
orig_base_endpoints = base_endpoints;
orig_view_endpoints = view_endpoints;
// For the complex rack_aware_pairing case, nodes are already filtered by datacenter
// Use best-match, for the minimum number of base and view replicas in each rack,
// and ordinal match for the rest.
std::optional<std::reference_wrapper<const locator::node>> paired_replica;
if (rack_aware_pairing && !simple_rack_aware_pairing) {
struct indexed_replica {
size_t idx;
std::reference_wrapper<const locator::node> node;
};
std::unordered_map<sstring, std::vector<indexed_replica>> base_racks, view_racks;
// First, index all replicas by rack
auto index_replica_set = [] (std::unordered_map<sstring, std::vector<indexed_replica>>& racks, const node_vector& replicas) {
size_t idx = 0;
for (const auto& r: replicas) {
racks[r.get().rack()].emplace_back(idx++, r);
}
};
index_replica_set(base_racks, base_endpoints);
index_replica_set(view_racks, view_endpoints);
// Try optimistically pairing `me` first
const auto& my_base_replicas = base_racks[my_location.rack];
auto base_it = std::ranges::find(my_base_replicas, me, [] (const indexed_replica& ir) { return ir.node.get().host_id(); });
if (base_it == my_base_replicas.end()) {
return {};
}
const auto& my_view_replicas = view_racks[my_location.rack];
size_t idx = base_it - my_base_replicas.begin();
if (idx < my_view_replicas.size()) {
if (orig_view_endpoints.size() <= orig_base_endpoints.size()) {
return {.natural_endpoint = my_view_replicas[idx].node.get().host_id()};
} else {
// If the number of view replicas is larger than the number of base replicas,
// we need to find the unpaired view replica, so we can't return yet.
paired_replica = my_view_replicas[idx].node;
}
}
// Collect all unpaired base and view replicas,
// where the number of replicas in the base rack is different than the respective view rack
std::vector<indexed_replica> unpaired_base_replicas, unpaired_view_replicas;
for (const auto& [rack, base_replicas] : base_racks) {
const auto& view_replicas = view_racks[rack];
for (auto i = view_replicas.size(); i < base_replicas.size(); ++i) {
unpaired_base_replicas.emplace_back(base_replicas[i]);
}
}
for (const auto& [rack, view_replicas] : view_racks) {
const auto& base_replicas = base_racks[rack];
for (auto i = base_replicas.size(); i < view_replicas.size(); ++i) {
unpaired_view_replicas.emplace_back(view_replicas[i]);
}
}
// Sort by the original ordinality, and copy the sorted results
// back into {base,view}_endpoints, for backward compatible processing below.
std::ranges::sort(unpaired_base_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
base_endpoints.clear();
std::ranges::transform(unpaired_base_replicas, std::back_inserter(base_endpoints), std::mem_fn(&indexed_replica::node));
std::ranges::sort(unpaired_view_replicas, std::less(), std::mem_fn(&indexed_replica::idx));
view_endpoints.clear();
std::ranges::transform(unpaired_view_replicas, std::back_inserter(view_endpoints), std::mem_fn(&indexed_replica::node));
}
auto base_it = std::ranges::find(base_endpoints, me, std::mem_fn(&locator::node::host_id));
if (!paired_replica && base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost.
++cf_stats.total_view_updates_on_wrong_node;
vlogger.warn("Could not find {} in base_endpoints={}", me,
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
}
size_t idx = base_it - base_endpoints.begin();
std::optional<std::reference_wrapper<const locator::node>> no_pairing_replica;
if (!paired_replica && idx >= view_endpoints.size()) {
// There are fewer view replicas than base replicas
// FIXME: This might still happen when reducing replication factor with tablets,
// see https://github.com/scylladb/scylladb/issues/21492
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Could not pair {}: rack_aware={} base_endpoints={} view_endpoints={}", me,
rack_aware_pairing ? (simple_rack_aware_pairing ? "simple" : "complex") : "none",
orig_base_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)),
orig_view_endpoints | std::views::transform(std::mem_fn(&locator::node::host_id)));
return {};
} else if (base_endpoints.size() < view_endpoints.size()) {
// There are fewer base replicas than view replicas.
// This can happen as a result of an RF change when the view replica finishes streaming
// before the base replica.
// Because of this, a view replica might not get paired with any base replica, so we need
// to send an additional update to it.
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_endpoints.back();
if (base_endpoints.size() < view_endpoints.size() - 1) {
// We only expect one extra replica to appear due to an RF change. If there's more, that's an error,
// but we'll still perform updates to the paired and last replicas to minimize degradation.
vlogger.warn("There are too many view endpoints for base-view pairing. View updates may get lost on view_endpoints={}",
std::span(view_endpoints.begin() + base_endpoints.size(), view_endpoints.end() - 1) | std::views::transform(std::mem_fn(&locator::node::host_id)));
}
}
if (!paired_replica) {
paired_replica = view_endpoints[idx];
// We couldn't find any view replica in our rack
++cf_stats.total_view_updates_failed_pairing;
vlogger.warn("Could not find a view replica in the same rack as base replica {} for base_endpoints={} view_endpoints={}",
me,
base_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)),
view_nodes | std::views::transform(std::mem_fn(&locator::node::host_id)));
}
if (!no_pairing_replica && base_nodes.size() < view_nodes.size()) {
// This can happen when the view replica with no pairing is in another DC.
// We need to send an update to it if there are no base replicas in that DC yet,
// as it won't receive updates otherwise.
std::unordered_set<sstring> dcs_with_base_replicas;
for (const auto& base_node : base_nodes) {
dcs_with_base_replicas.insert(base_node.get().dc());
}
for (const auto& view_node : view_nodes) {
if (!dcs_with_base_replicas.contains(view_node.get().dc())) {
++cf_stats.total_view_updates_due_to_replica_count_mismatch;
no_pairing_replica = view_node;
break;
}
}
}
// https://github.com/scylladb/scylladb/issues/19439
// With tablets, a node being replaced might transition to "left" state
// but still be kept as a replica.
// As of writing this hints are not prepared to handle nodes that are left
// but are still replicas. Therefore, there is no other sensible option
// right now but to give up attempt to send the update or write a hint
// to the paired, permanently down replica.
// We use the same workaround for the extra replica.
auto return_host_id_if_not_left = [] (const auto& replica) -> std::optional<locator::host_id> {
if (!replica) {
return std::nullopt;
}
const auto& node = replica->get();
if (!node.left()) {
return node.host_id();
} else {
return std::nullopt;
}
};
return {.natural_endpoint = return_host_id_if_not_left(paired_replica),
.endpoint_with_no_pairing = return_host_id_if_not_left(no_pairing_replica)};
std::optional<locator::host_id> no_pairing_replica = get_unpaired_view_endpoint(base_nodes, view_nodes, cf_stats);
return {.natural_endpoint = paired_replica,
.endpoint_with_no_pairing = no_pairing_replica};
}
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
@@ -2136,12 +2041,6 @@ future<> view_update_generator::mutate_MV(
{
auto& ks = _db.find_keyspace(base->ks_name());
auto& replication = ks.get_replication_strategy();
// We set legacy self-pairing for old vnode-based tables (for backward
// compatibility), and unset it for tablets - where range movements
// are more frequent and backward compatibility is less important.
// TODO: Maybe allow users to set use_legacy_self_pairing explicitly
// on a view, like we have the synchronous_updates_flag.
bool use_legacy_self_pairing = !ks.uses_tablets();
std::unordered_map<table_id, locator::effective_replication_map_ptr> erms;
auto get_erm = [&] (table_id id) {
auto it = erms.find(id);
@@ -2154,10 +2053,6 @@ future<> view_update_generator::mutate_MV(
for (const auto& mut : view_updates) {
(void)get_erm(mut.s->id());
}
// Enable rack-aware view updates pairing for tablets
// when the cluster feature is enabled so that all replicas agree
// on the pairing algorithm.
bool use_tablets_rack_aware_view_pairing = _db.features().tablet_rack_aware_view_pairing && ks.uses_tablets();
auto me = base_ermp->get_topology().my_host_id();
static constexpr size_t max_concurrent_updates = 128;
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
@@ -2165,7 +2060,7 @@ future<> view_update_generator::mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto view_ermp = erms.at(mut.s->id());
auto [target_endpoint, no_pairing_endpoint] = get_view_natural_endpoint(me, base_ermp, view_ermp, replication, base_token, view_token,
use_legacy_self_pairing, use_tablets_rack_aware_view_pairing, cf_stats);
ks.uses_tablets(), cf_stats);
auto remote_endpoints = view_ermp->get_pending_replicas(view_token);
auto memory_units = seastar::make_lw_shared<db::timeout_semaphore_units>(pending_view_update_memory_units.split(memory_usage_of(mut)));
if (no_pairing_endpoint) {

View File

@@ -305,8 +305,7 @@ endpoints_to_update get_view_natural_endpoint(
const locator::abstract_replication_strategy& replication_strategy,
const dht::token& base_token,
const dht::token& view_token,
bool use_legacy_self_pairing,
bool use_tablets_basic_rack_aware_view_pairing,
bool use_tablets,
replica::cf_stats& cf_stats);
/// Verify that the provided keyspace is eligible for storing materialized views.

View File

@@ -1 +1 @@
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"
SCYLLA_NODE_EXPORTER_ARGS="--collector.interrupts --collector.ethtool.metrics-include='(bw_in_allowance_exceeded|bw_out_allowance_exceeded|conntrack_allowance_exceeded|conntrack_allowance_available|linklocal_allowance_exceeded)' --collector.ethtool --no-collector.hwmon --no-collector.bcache --no-collector.btrfs --no-collector.fibrechannel --no-collector.infiniband --no-collector.ipvs --no-collector.nfs --no-collector.nfsd --no-collector.powersupplyclass --no-collector.rapl --no-collector.tapestats --no-collector.thermal_zone --no-collector.udp_queues --no-collector.zfs"

View File

@@ -41,6 +41,8 @@ class MetricsProcessor:
# Get metrics from the file
try:
metrics_file = metrics.get_metrics_from_file(relative_path, "scylla_", metrics_info, strict=strict)
except SystemExit:
pass
finally:
os.chdir(old_cwd)
if metrics_file:

View File

@@ -28,7 +28,8 @@ Incremental Repair is only supported for tables that use the tablets architectur
Incremental Repair Modes
------------------------
While incremental repair is the default and recommended mode, you can control its behavior for a given repair operation using the ``incremental_mode`` parameter. This is useful for situations where you might need to force a full data validation.
Incremental is currently disabled by default. You can control its behavior for a given repair operation using the ``incremental_mode`` parameter.
This is useful for enabling incremental repair, or in situations where you might need to force a full data validation.
The available modes are:

View File

@@ -53,13 +53,13 @@ ScyllaDB nodetool cluster repair command supports the following options:
nodetool cluster repair --tablet-tokens 1,10474535988
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to 'disabled'.
For example:
::
nodetool cluster repair --incremental-mode regular
nodetool cluster repair --incremental-mode disabled
- ``keyspace`` executes a repair on a specific keyspace. The default is all keyspaces.

8
docs/poetry.lock generated
View File

@@ -1018,14 +1018,14 @@ sphinx-markdown-tables = "0.0.17"
[[package]]
name = "sphinx-scylladb-theme"
version = "1.8.9"
version = "1.8.10"
description = "A Sphinx Theme for ScyllaDB documentation projects"
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "sphinx_scylladb_theme-1.8.9-py3-none-any.whl", hash = "sha256:f8649a7753a29494fd2b417d1cb855035dddb9ebd498ea033fd73f5f9338271e"},
{file = "sphinx_scylladb_theme-1.8.9.tar.gz", hash = "sha256:ab7cda4c10a0d067c5c3a45f7b1f68cb8ebefe135a0be0738bfa282a344769b6"},
{file = "sphinx_scylladb_theme-1.8.10-py3-none-any.whl", hash = "sha256:8b930f33bec7308ccaa92698ebb5ad85059bcbf93a463f92917aeaf473fce632"},
{file = "sphinx_scylladb_theme-1.8.10.tar.gz", hash = "sha256:8a78a9b692d9a946be2c4a64aa472fd82204cc8ea0b1ee7f60de6db35b356326"},
]
[package.dependencies]
@@ -1603,4 +1603,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "74912627a3f424290ed7889451c0bdb1a862ab85b1d07c85f4f3b8c34f32a020"
content-hash = "0ae673106f45d3465cbdabbf511e165ca44feadd34d7753f2e68093afaa95c79"

View File

@@ -9,7 +9,7 @@ package-mode = false
python = "^3.10"
pygments = "^2.18.0"
redirects_cli ="^0.1.3"
sphinx-scylladb-theme = "^1.8.9"
sphinx-scylladb-theme = "^1.8.10"
sphinx-sitemap = "^2.6.0"
sphinx-autobuild = "^2024.4.19"
Sphinx = "^7.3.7"

View File

@@ -38,6 +38,7 @@ debian_base_packages=(
python3-aiohttp
python3-pyparsing
python3-colorama
python3-dev
python3-tabulate
python3-pytest
python3-pytest-asyncio
@@ -65,6 +66,7 @@ debian_base_packages=(
git-lfs
e2fsprogs
fuse3
libev-dev # for python driver
)
fedora_packages=(
@@ -90,6 +92,7 @@ fedora_packages=(
patchelf
python3
python3-aiohttp
python3-devel
python3-pip
python3-file-magic
python3-colorama
@@ -154,6 +157,8 @@ fedora_packages=(
https://github.com/scylladb/cassandra-stress/releases/download/v3.18.1/cassandra-stress-java21-3.18.1-1.noarch.rpm
elfutils
jq
libev-devel # for python driver
)
fedora_python3_packages=(

View File

@@ -15,3 +15,22 @@ with the Apache License (version 2) and ScyllaDB-Source-Available-1.0.
They contain the following tag:
SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
### `musl libc` files
`licenses/musl-license.txt` is obtained from:
https://git.musl-libc.org/cgit/musl/tree/COPYRIGHT
`utils/crypt_sha512.cc` is obtained from:
https://git.musl-libc.org/cgit/musl/tree/src/crypt/crypt_sha512.c
Both files are obtained from git.musl-libc.org.
Import commit:
commit 1b76ff0767d01df72f692806ee5adee13c67ef88
Author: Alex Rønne Petersen <alex@alexrp.com>
Date: Sun Oct 12 05:35:19 2025 +0200
s390x: shuffle register usage in __tls_get_offset to avoid r0 as address
musl as a whole is licensed under the standard MIT license included in
`licenses/musl-license.txt`.

193
licenses/musl-license.txt Normal file
View File

@@ -0,0 +1,193 @@
musl as a whole is licensed under the following standard MIT license:
----------------------------------------------------------------------
Copyright © 2005-2020 Rich Felker, et al.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
----------------------------------------------------------------------
Authors/contributors include:
A. Wilcox
Ada Worcester
Alex Dowad
Alex Suykov
Alexander Monakov
Andre McCurdy
Andrew Kelley
Anthony G. Basile
Aric Belsito
Arvid Picciani
Bartosz Brachaczek
Benjamin Peterson
Bobby Bingham
Boris Brezillon
Brent Cook
Chris Spiegel
Clément Vasseur
Daniel Micay
Daniel Sabogal
Daurnimator
David Carlier
David Edelsohn
Denys Vlasenko
Dmitry Ivanov
Dmitry V. Levin
Drew DeVault
Emil Renner Berthing
Fangrui Song
Felix Fietkau
Felix Janda
Gianluca Anzolin
Hauke Mehrtens
He X
Hiltjo Posthuma
Isaac Dunham
Jaydeep Patil
Jens Gustedt
Jeremy Huntwork
Jo-Philipp Wich
Joakim Sindholt
John Spencer
Julien Ramseier
Justin Cormack
Kaarle Ritvanen
Khem Raj
Kylie McClain
Leah Neukirchen
Luca Barbato
Luka Perkov
Lynn Ochs
M Farkas-Dyck (Strake)
Mahesh Bodapati
Markus Wichmann
Masanori Ogino
Michael Clark
Michael Forney
Mikhail Kremnyov
Natanael Copa
Nicholas J. Kain
orc
Pascal Cuoq
Patrick Oppenlander
Petr Hosek
Petr Skocik
Pierre Carrier
Reini Urban
Rich Felker
Richard Pennington
Ryan Fairfax
Samuel Holland
Segev Finer
Shiz
sin
Solar Designer
Stefan Kristiansson
Stefan O'Rear
Szabolcs Nagy
Timo Teräs
Trutz Behn
Will Dietz
William Haddon
William Pitcock
Portions of this software are derived from third-party works licensed
under terms compatible with the above MIT license:
The TRE regular expression implementation (src/regex/reg* and
src/regex/tre*) is Copyright © 2001-2008 Ville Laurikari and licensed
under a 2-clause BSD license (license text in the source files). The
included version has been heavily modified by Rich Felker in 2012, in
the interests of size, simplicity, and namespace cleanliness.
Much of the math library code (src/math/* and src/complex/*) is
Copyright © 1993,2004 Sun Microsystems or
Copyright © 2003-2011 David Schultz or
Copyright © 2003-2009 Steven G. Kargl or
Copyright © 2003-2009 Bruce D. Evans or
Copyright © 2008 Stephen L. Moshier or
Copyright © 2017-2018 Arm Limited
and labelled as such in comments in the individual source files. All
have been licensed under extremely permissive terms.
The ARM memcpy code (src/string/arm/memcpy.S) is Copyright © 2008
The Android Open Source Project and is licensed under a two-clause BSD
license. It was taken from Bionic libc, used on Android.
The AArch64 memcpy and memset code (src/string/aarch64/*) are
Copyright © 1999-2019, Arm Limited.
The implementation of DES for crypt (src/crypt/crypt_des.c) is
Copyright © 1994 David Burren. It is licensed under a BSD license.
The implementation of blowfish crypt (src/crypt/crypt_blowfish.c) was
originally written by Solar Designer and placed into the public
domain. The code also comes with a fallback permissive license for use
in jurisdictions that may not recognize the public domain.
The smoothsort implementation (src/stdlib/qsort.c) is Copyright © 2011
Lynn Ochs and is licensed under an MIT-style license.
The x86_64 port was written by Nicholas J. Kain and is licensed under
the standard MIT terms.
The mips and microblaze ports were originally written by Richard
Pennington for use in the ellcc project. The original code was adapted
by Rich Felker for build system and code conventions during upstream
integration. It is licensed under the standard MIT terms.
The mips64 port was contributed by Imagination Technologies and is
licensed under the standard MIT terms.
The powerpc port was also originally written by Richard Pennington,
and later supplemented and integrated by John Spencer. It is licensed
under the standard MIT terms.
All other files which have no copyright comments are original works
produced specifically for use as part of this library, written either
by Rich Felker, the main author of the library, or by one or more
contibutors listed above. Details on authorship of individual files
can be found in the git version control history of the project. The
omission of copyright and license comments in each file is in the
interest of source tree size.
In addition, permission is hereby granted for all public header files
(include/* and arch/*/bits/*) and crt files intended to be linked into
applications (crt/*, ldso/dlstart.c, and arch/*/crt_arch.h) to omit
the copyright notice and permission notice otherwise required by the
license, and to use these files without any requirement of
attribution. These files include substantial contributions from:
Bobby Bingham
John Spencer
Nicholas J. Kain
Rich Felker
Richard Pennington
Stefan Kristiansson
Szabolcs Nagy
all of whom have explicitly granted such permission.
This file previously contained text expressing a belief that most of
the files covered by the above exception were sufficiently trivial not
to be subject to copyright, resulting in confusion over whether it
negated the permissions granted in the license. In the spirit of
permissive licensing, and of not having licensing issues being an
obstacle to adoption, that text has been removed.

View File

@@ -200,7 +200,10 @@ enum class tablet_repair_incremental_mode : uint8_t {
disabled,
};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
// FIXME: Incremental repair is disabled by default due to
// https://github.com/scylladb/scylladb/issues/26041 and
// https://github.com/scylladb/scylladb/issues/27414
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::disabled};
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);

View File

@@ -748,8 +748,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// inherit Seastar's CPU affinity masks. We want this thread to be free
// to migrate between CPUs; we think that's what makes the most sense.
auto rpc_dict_training_worker = utils::alien_worker(startlog, 19, "rpc-dict");
// niceness=10 is ~10% of normal process time
auto hashing_worker = utils::alien_worker(startlog, 10, "pwd-hash");
return app.run(ac, av, [&] () -> future<int> {
@@ -779,8 +777,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return seastar::async([&app, cfg, ext, &disk_space_monitor_shard0, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker,
&hashing_worker, &vector_store_client] {
&repair, &sst_loader, &auth_cache, &ss, &lifecycle_notifier, &stream_manager, &task_manager, &rpc_dict_training_worker, &vector_store_client] {
try {
if (opts.contains("relabel-config-file") && !opts["relabel-config-file"].as<sstring>().empty()) {
// calling update_relabel_config_from_file can cause an exception that would stop startup
@@ -2060,7 +2057,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache), std::ref(hashing_worker)).get();
maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes, std::ref(auth_cache)).get();
cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group);
@@ -2336,7 +2333,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache), std::ref(hashing_worker)).get();
auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no, std::ref(auth_cache)).get();
std::any stop_auth_service;
// Has to be called after node joined the cluster (join_cluster())

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:80a47fe93866989aaf7e949168fcd308e95841e78c976a61f9eac20bfdd34d96
size 6448960
oid sha256:3cbe2dd05945f8fb76ebce2ea70864063d2b282c4d5080af1f290ead43321ab3
size 6444732

View File

@@ -297,17 +297,17 @@ public:
const dht::token_range& token_range() const noexcept;
size_t memtable_count() const noexcept;
size_t memtable_count() const;
const compaction_group_ptr& main_compaction_group() const noexcept;
const std::vector<compaction_group_ptr>& split_ready_compaction_groups() const;
compaction_group_ptr& select_compaction_group(locator::tablet_range_side) noexcept;
uint64_t live_disk_space_used() const noexcept;
uint64_t live_disk_space_used() const;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept;
utils::small_vector<compaction_group_ptr, 3> compaction_groups() noexcept;
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const noexcept;
void for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const;
utils::small_vector<compaction_group_ptr, 3> compaction_groups();
utils::small_vector<const_compaction_group_ptr, 3> compaction_groups() const;
utils::small_vector<compaction_group_ptr, 3> split_unready_groups() const;
bool split_unready_groups_are_empty() const;
@@ -430,7 +430,7 @@ public:
virtual storage_group& storage_group_for_token(dht::token) const = 0;
virtual utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
virtual locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const = 0;
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;

View File

@@ -1133,7 +1133,7 @@ public:
// The tablet filter is used to not double account migrating tablets, so it's important that
// only one of pending or leaving replica is accounted based on current migration stage.
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const;
const db::view::stats& get_view_stats() const {
return _view_stats;

View File

@@ -234,18 +234,12 @@ distributed_loader::get_sstables_from_upload_dir(sharded<replica::database>& db,
}
future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, prefix, sstables=std::move(sstables), &get_abort_src, &db] (auto& global_table, auto& directory) {
distributed_loader::get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> get_abort_src) {
return get_sstables_from(db, ks, cf, cfg, [bucket, endpoint, type, prefix, sstables=std::move(sstables), &get_abort_src] (auto& global_table, auto& directory) {
return directory.start(global_table.as_sharded_parameter(),
sharded_parameter([bucket, endpoint, prefix, &get_abort_src, &db] {
auto eps = db.local().get_config().object_storage_endpoints()
| std::views::filter([&endpoint](auto& ep) { return ep.key() == endpoint; })
;
if (eps.empty()) {
throw std::invalid_argument(fmt::format("Undefined endpoint {}", endpoint));
}
sharded_parameter([bucket, endpoint, type, prefix, &get_abort_src] {
seastar::abort_source* as = get_abort_src ? get_abort_src() : nullptr;
auto opts = data_dictionary::make_object_storage_options(endpoint, eps.front().type(), bucket, prefix, as);
auto opts = data_dictionary::make_object_storage_options(endpoint, type, bucket, prefix, as);
return make_lw_shared<const data_dictionary::storage_options>(std::move(opts));
}),
sstables,

View File

@@ -92,7 +92,7 @@ public:
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_upload_dir(sharded<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
get_sstables_from_object_store(sharded<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring type, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
static future<> process_upload_dir(sharded<replica::database>& db, sharded<db::view::view_builder>& vb, sharded<db::view::view_building_worker>& vbw, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
};

View File

@@ -708,7 +708,7 @@ public:
return *_single_sg;
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const noexcept override {
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const override {
return locator::combined_load_stats{
.table_ls = locator::table_load_stats{
.size_in_bytes = _single_sg->live_disk_space_used(),
@@ -874,7 +874,7 @@ public:
return storage_group_for_id(storage_group_of(token).first);
}
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
locator::combined_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const override;
bool all_storage_groups_split() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
@@ -922,7 +922,7 @@ compaction_group_ptr& storage_group::select_compaction_group(locator::tablet_ran
return _main_cg;
}
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const noexcept {
void storage_group::for_each_compaction_group(std::function<void(const compaction_group_ptr&)> action) const {
action(_main_cg);
for (auto& cg : _merging_groups) {
action(cg);
@@ -932,7 +932,7 @@ void storage_group::for_each_compaction_group(std::function<void(const compactio
}
}
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() noexcept {
utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups() {
utils::small_vector<compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -940,7 +940,7 @@ utils::small_vector<compaction_group_ptr, 3> storage_group::compaction_groups()
return cgs;
}
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const noexcept {
utils::small_vector<const_compaction_group_ptr, 3> storage_group::compaction_groups() const {
utils::small_vector<const_compaction_group_ptr, 3> cgs;
for_each_compaction_group([&cgs] (const compaction_group_ptr& cg) {
cgs.push_back(cg);
@@ -1890,7 +1890,7 @@ sstables::file_size_stats compaction_group::live_disk_space_used_full_stats() co
return _main_sstables->get_file_size_stats() + _maintenance_sstables->get_file_size_stats();
}
uint64_t storage_group::live_disk_space_used() const noexcept {
uint64_t storage_group::live_disk_space_used() const {
auto cgs = const_cast<storage_group&>(*this).compaction_groups();
return std::ranges::fold_left(cgs | std::views::transform(std::mem_fn(&compaction_group::live_disk_space_used)), uint64_t(0), std::plus{});
}
@@ -2813,7 +2813,7 @@ void table::on_flush_timer() {
});
}
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
locator::table_load_stats table_stats;
table_stats.split_ready_seq_number = _split_ready_seq_number;
@@ -2836,7 +2836,7 @@ locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std:
};
}
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
locator::combined_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const {
return _sg_manager->table_load_stats(std::move(tablet_filter));
}
@@ -3453,7 +3453,7 @@ size_t compaction_group::memtable_count() const noexcept {
return _memtables->size();
}
size_t storage_group::memtable_count() const noexcept {
size_t storage_group::memtable_count() const {
return std::ranges::fold_left(compaction_groups() | std::views::transform(std::mem_fn(&compaction_group::memtable_count)), size_t(0), std::plus{});
}

View File

@@ -22,7 +22,7 @@ format_match = re.compile(r'\s*(?:seastar::)?format\(\s*"([^"]+)"\s*,\s*(.*)\s*'
def handle_error(message, strict=True, verbose_mode=False):
if strict:
print(f"[ERROR] {message}")
exit(-1)
exit(1)
elif verbose_mode:
print(f"[WARNING] {message}")
@@ -180,12 +180,11 @@ def get_metrics_from_file(file_name, prefix, metrics_information, verb=None, str
groups = {}
if clean_name in metrics_information:
if (isinstance(metrics_information[clean_name], str) and metrics_information[clean_name] == "skip") or "skip" in metrics_information[clean_name]:
exit(0)
return {}
param_mapping = metrics_information[clean_name]["params"] if clean_name in metrics_information and "params" in metrics_information[clean_name] else {}
groups = metrics_information[clean_name]["groups"] if clean_name in metrics_information and "groups" in metrics_information[clean_name] else {}
metrics = {}
multi_line = False
names = undefined
typ = undefined
line_number = 0;

View File

@@ -1,6 +1,6 @@
"cdc/log.cc":
params:
cdc_group_name: cdc
cdc_group_name: "cdc"
part_name;suffix: [["static_row", "total"],["clustering_row", "total"], ["map", "total"], ["set", "total"], ["list", "total"], ["udt", "total"], ["range_tombstone", "total"],["partition_delete", "total"],["row_delete", "total"], ["static_row", "failed"],["clustering_row", "failed"], ["map", "failed"], ["set", "failed"], ["list", "failed"], ["udt", "failed"], ["range_tombstone", "failed"],["partition_delete", "failed"],["row_delete", "failed"]]
kind: ["total", "failed"]
"db/commitlog/commitlog.cc":
@@ -9,7 +9,7 @@
"cfg.max_active_flushes": "cfg.max_active_flushes"
"cql3/query_processor.cc":
groups:
"80": query_processor
"80": "query_processor"
"replica/dirty_memory_manager.cc":
params:
namestr: ["regular", "system"]
@@ -19,10 +19,11 @@
"replica/database.cc":
params:
"_dirty_memory_manager.throttle_threshold()": "throttle threshold"
"seastar/apps/metrics_tester/metrics_tester.cc": skip
"seastar/tests/unit/metrics_test.cc": skip
"seastar/tests/unit/metrics_tester.cc": skip
"seastar/tests/unit/prometheus_http_test.cc": skip
"seastar/apps/metrics_tester/metrics_tester.cc": "skip"
"seastar/tests/unit/metrics_test.cc": "skip"
"seastar/tests/unit/metrics_tester.cc": "skip"
"seastar/tests/unit/prometheus_http_test.cc": "skip"
"seastar/tests/unit/prometheus_text_test.cc": "skip"
"service/storage_proxy.cc":
params:
COORDINATOR_STATS_CATEGORY: "storage_proxy_coordinator"
@@ -32,25 +33,25 @@
_short_description_prefix: ["total_write_attempts", "write_errors", "background_replica_writes_failed", "read_repair_write_attempts"]
_long_description_prefix: ["total number of write requests", "number of write requests that failed", "background_replica_writes_failed", "number of write operations in a read repair context"]
_category: "storage_proxy_coordinator"
"thrift/server.cc": skip
"thrift/server.cc": "skip"
"tracing/tracing.cc":
params:
"max_pending_trace_records + write_event_records_threshold": "max_pending_trace_records + write_event_records_threshold"
"transport/server.cc":
groups:
"200": transport
"200": "transport"
params:
"_config.max_request_size": "max_request_size"
"seastar/src/net/dpdk.cc": skip
"seastar/src/net/dpdk.cc": "skip"
"db/hints/manager.cc":
params:
"group_name": ["hints_for_views_manager", "hints_manager"]
"seastar/src/core/execution_stage.cc":
groups:
"100": execution_stages
"100": "execution_stages"
"seastar/src/core/fair_queue.cc":
groups:
"300": io_queue
"300": "io_queue"
"seastar/src/net/net.cc":
params:
_stats_plugin_name: ["stats_plugin_name"]

View File

@@ -38,8 +38,9 @@ for required in jq curl; do
fi
done
FORCE=0
ALLOW_SUBMODULE=0
ALLOW_UNSTABLE=0
ALLOW_ANY_BRANCH=0
function print_usage {
cat << EOF
@@ -60,12 +61,18 @@ Options:
-h
Print this help message and exit.
--force
Do not check current branch to be next*
Do not check jenkins job status
--allow-submodule
Allow a PR to update a submudule
--allow-unstable
Do not check jenkins job status
--allow-any-branch
Merge PR even if target branch is not next
--force
Sets all above --allow-* options
EOF
}
@@ -73,13 +80,23 @@ while [[ $# -gt 0 ]]
do
case $1 in
"--force"|"-f")
FORCE=1
ALLOW_UNSTABLE=1
ALLOW_SUBMODULE=1
ALLOW_ANY_BRANCH=1
shift 1
;;
--allow-submodule)
ALLOW_SUBMODULE=1
shift
;;
--allow-unstable)
ALLOW_UNSTABLE=1
shift
;;
--allow-any-branch)
ALLOW_ANY_BRANCH=1
shift
;;
+([0-9]))
PR_NUM=$1
shift 1
@@ -147,7 +164,7 @@ check_jenkins_job_status() {
fi
}
if [[ $FORCE -eq 0 ]]; then
if [[ $ALLOW_UNSTABLE -eq 0 ]]; then
check_jenkins_job_status
fi
@@ -179,17 +196,19 @@ echo -n "Fetching full name of author $PR_LOGIN... "
USER_NAME=$(curl -s "https://api.github.com/users/$PR_LOGIN" | jq -r .name)
echo "$USER_NAME"
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
TARGET_BASE="unknown"
if [[ ${BASE_BRANCH} == master ]]; then
TARGET_BASE="next"
elif [[ ${BASE_BRANCH} == branch-* ]]; then
TARGET_BASE=${BASE_BRANCH//branch/next}
fi
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}"
exit 1
if [[ $ALLOW_ANY_BRANCH -eq 0 ]]; then
BASE_BRANCH=$(jq -r .base.ref <<< $PR_DATA)
CURRENT_BRANCH=$(git rev-parse --abbrev-ref HEAD)
TARGET_BASE="unknown"
if [[ ${BASE_BRANCH} == master ]]; then
TARGET_BASE="next"
elif [[ ${BASE_BRANCH} == branch-* ]]; then
TARGET_BASE=${BASE_BRANCH//branch/next}
fi
if [[ "${CURRENT_BRANCH}" != "${TARGET_BASE}" ]]; then
echo "Merging into wrong next, want ${TARGET_BASE}, have ${CURRENT_BRANCH}. Use --allow-any-branch or --force to skip this check"
exit 1
fi
fi
git fetch "$REMOTE" pull/$PR_NUM/head

View File

@@ -2485,11 +2485,6 @@ void sstable::validate_originating_host_id() const {
}
return;
}
if (*originating_host_id != local_host_id) {
// FIXME refrain from throwing an exception because of #10148
sstlog.warn("Host id {} does not match local host id {} while validating SSTable: {}. Load foreign SSTables via the upload dir instead.", *originating_host_id, local_host_id, get_filename());
}
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,

View File

@@ -135,13 +135,17 @@ future<> storage_manager::update_config(const db::config& cfg) {
co_return;
}
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
auto storage_manager::get_endpoint(const sstring& endpoint) -> object_storage_endpoint& {
auto found = _object_storage_endpoints.find(endpoint);
if (found == _object_storage_endpoints.end()) {
smlogger.error("unable to find {} in configured object-storage endpoints", endpoint);
throw std::invalid_argument(format("endpoint {} not found", endpoint));
}
auto& ep = found->second;
return found->second;
}
shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client(sstring endpoint) {
auto& ep = get_endpoint(endpoint);
if (ep.client == nullptr) {
ep.client = make_object_storage_client(ep.cfg, _object_storage_clients_memory, [&ct = container()] (std::string ep) {
return ct.local().get_endpoint_client(ep);
@@ -150,6 +154,10 @@ shared_ptr<sstables::object_storage_client> storage_manager::get_endpoint_client
return ep.client;
}
sstring storage_manager::get_endpoint_type(sstring endpoint) {
return get_endpoint(endpoint).cfg.type();
}
bool storage_manager::is_known_endpoint(sstring endpoint) const {
return _object_storage_endpoints.contains(endpoint);
}

View File

@@ -70,6 +70,7 @@ class storage_manager : public peering_sharded_service<storage_manager> {
seastar::metrics::metric_groups metrics;
future<> update_config(const db::config&);
object_storage_endpoint& get_endpoint(const sstring& ep);
public:
struct config {
@@ -80,6 +81,7 @@ public:
storage_manager(const db::config&, config cfg);
shared_ptr<object_storage_client> get_endpoint_client(sstring endpoint);
bool is_known_endpoint(sstring endpoint) const;
sstring get_endpoint_type(sstring endpoint);
future<> stop();
std::vector<sstring> endpoints(sstring type = "") const noexcept;
};

View File

@@ -205,6 +205,13 @@ private:
}
bool tablet_in_scope(locator::tablet_id) const;
friend future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
// Pay attention, while working with tablet ranges, the `erm` must be held alive as long as we retrieve (and use here) tablet ranges from
// the tablet map. This is already done when using `tablet_sstable_streamer` class but tread carefully if you plan to use this method somewhere else.
static future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);
};
host_id_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const {
@@ -343,55 +350,52 @@ public:
}
};
future<std::vector<tablet_sstable_collection>> tablet_sstable_streamer::get_sstables_for_tablets(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
auto tablets_sstables =
tablets_ranges | std::views::transform([](auto range) { return tablet_sstable_collection{.tablet_range = range}; }) | std::ranges::to<std::vector>();
if (sstables.empty() || tablets_sstables.empty()) {
co_return std::move(tablets_sstables);
}
// sstables are sorted by first key in reverse order.
auto reversed_sstables = sstables | std::views::reverse;
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : tablets_sstables) {
for (const auto& sst : reversed_sstables) {
auto sst_first = sst->get_first_decorated_key().token();
auto sst_last = sst->get_last_decorated_key().token();
// SSTable entirely after tablet -> no further SSTables (larger keys) can overlap
if (tablet_range.after(sst_first, dht::token_comparator{})) {
break;
}
// SSTable entirely before tablet -> skip and continue scanning later (larger keys)
if (tablet_range.before(sst_last, dht::token_comparator{})) {
continue;
}
if (tablet_range.contains(dht::token_range{sst_first, sst_last}, dht::token_comparator{})) {
sstables_fully_contained.push_back(sst);
} else {
sstables_partially_contained.push_back(sst);
}
co_await coroutine::maybe_yield();
}
}
co_return std::move(tablets_sstables);
}
future<> tablet_sstable_streamer::stream(shared_ptr<stream_progress> progress) {
if (progress) {
progress->start(_tablet_map.tablet_count());
}
// sstables are sorted by first key in reverse order.
auto sstable_it = _sstables.rbegin();
for (auto tablet_id : _tablet_map.tablet_ids() | std::views::filter([this] (auto tid) { return tablet_in_scope(tid); })) {
auto tablet_range = _tablet_map.get_token_range(tablet_id);
auto sstable_token_range = [] (const sstables::shared_sstable& sst) {
return dht::token_range(sst->get_first_decorated_key().token(),
sst->get_last_decorated_key().token());
};
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
// sstable is exhausted if its last key is before the current tablet range
auto exhausted = [&tablet_range] (const sstables::shared_sstable& sst) {
return tablet_range.before(sst->get_last_decorated_key().token(), dht::token_comparator{});
};
while (sstable_it != _sstables.rend() && exhausted(*sstable_it)) {
sstable_it++;
}
for (auto sst_it = sstable_it; sst_it != _sstables.rend(); sst_it++) {
auto sst_token_range = sstable_token_range(*sst_it);
// sstables are sorted by first key, so should skip this SSTable since it
// doesn't overlap with the current tablet range.
if (!tablet_range.overlaps(sst_token_range, dht::token_comparator{})) {
// If the start of the next SSTable's token range lies beyond the current tablet's token
// range, we can safely conclude that no more relevant SSTables remain for this tablet.
if (tablet_range.after(sst_token_range.start()->value(), dht::token_comparator{})) {
break;
}
continue;
}
if (tablet_range.contains(sst_token_range, dht::token_comparator{})) {
sstables_fully_contained.push_back(*sst_it);
} else {
sstables_partially_contained.push_back(*sst_it);
}
co_await coroutine::maybe_yield();
}
auto classified_sstables = co_await get_sstables_for_tablets(
_sstables, _tablet_map.tablet_ids() | std::views::filter([this](auto tid) { return tablet_in_scope(tid); }) | std::views::transform([this](auto tid) {
return _tablet_map.get_token_range(tid);
}) | std::ranges::to<std::vector>());
for (auto& [tablet_range, sstables_fully_contained, sstables_partially_contained] : classified_sstables) {
auto per_tablet_progress = make_shared<per_tablet_stream_progress>(
progress,
sstables_fully_contained.size() + sstables_partially_contained.size());
@@ -751,8 +755,9 @@ future<> sstables_loader::download_task_impl::run() {
};
llog.debug("Loading sstables from {}({}/{})", _endpoint, _bucket, _prefix);
auto ep_type = _loader.local()._storage_manager.get_endpoint_type(_endpoint);
std::vector<seastar::abort_source> shard_aborts(smp::count);
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, _bucket, _prefix, cfg, [&] {
auto [ table_id, sstables_on_shards ] = co_await replica::distributed_loader::get_sstables_from_object_store(_loader.local()._db, _ks, _cf, _sstables, _endpoint, ep_type, _bucket, _prefix, cfg, [&] {
return &shard_aborts[this_shard_id()];
});
llog.debug("Streaming sstables from {}({}/{})", _endpoint, _bucket, _prefix);
@@ -832,3 +837,7 @@ future<tasks::task_id> sstables_loader::download_new_sstables(sstring ks_name, s
std::move(prefix), std::move(sstables), scope, primary_replica_only(primary_replica));
co_return task->id();
}
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges) {
return tablet_sstable_streamer::get_sstables_for_tablets(sstables, std::move(tablets_ranges));
}

View File

@@ -10,6 +10,8 @@
#include <vector>
#include <seastar/core/sharded.hh>
#include "dht/i_partitioner_fwd.hh"
#include "dht/token.hh"
#include "schema/schema_fwd.hh"
#include "sstables/shared_sstable.hh"
#include "tasks/task_manager.hh"
@@ -152,3 +154,18 @@ struct fmt::formatter<sstables_loader::stream_scope> : fmt::formatter<string_vie
}
}
};
struct tablet_sstable_collection {
dht::token_range tablet_range;
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
};
// This function is intended for test purposes only.
// It assigns the given sstables to the given tablet ranges based on token containment.
// It returns a vector of tablet_sstable_collection, each containing the tablet range
// and the sstables that are fully or partially contained within that range.
// The prerequisite is the tablet ranges are sorted by the range in ascending order and non-overlapping.
// Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order.
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);

View File

@@ -12,7 +12,7 @@ add_scylla_test(alternator_unit_test
add_scylla_test(anchorless_list_test
KIND BOOST)
add_scylla_test(auth_passwords_test
KIND BOOST
KIND SEASTAR
LIBRARIES auth)
add_scylla_test(auth_resource_test
KIND BOOST)
@@ -370,6 +370,7 @@ add_scylla_test(combined_tests
sstable_compression_config_test.cc
sstable_directory_test.cc
sstable_set_test.cc
sstable_tablet_streaming.cc
statement_restrictions_test.cc
storage_proxy_test.cc
tablets_test.cc

View File

@@ -6,7 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#define BOOST_TEST_MODULE core
#include <seastar/testing/test_case.hh>
#include <array>
#include <random>
@@ -16,15 +16,21 @@
#include <boost/test/unit_test.hpp>
#include <seastar/core/sstring.hh>
#include <seastar/core/coroutine.hh>
#include "seastarx.hh"
extern "C" {
#include <crypt.h>
#include <unistd.h>
}
static auto rng_for_salt = std::default_random_engine(std::random_device{}());
//
// The same password hashed multiple times will result in different strings because the salt will be different.
//
BOOST_AUTO_TEST_CASE(passwords_are_salted) {
SEASTAR_TEST_CASE(passwords_are_salted) {
const char* const cleartext = "my_excellent_password";
std::unordered_set<sstring> observed_passwords{};
@@ -33,12 +39,13 @@ BOOST_AUTO_TEST_CASE(passwords_are_salted) {
BOOST_REQUIRE(!observed_passwords.contains(e));
observed_passwords.insert(e);
}
co_return;
}
//
// A hashed password will authenticate against the same password in cleartext.
//
BOOST_AUTO_TEST_CASE(correct_passwords_authenticate) {
SEASTAR_TEST_CASE(correct_passwords_authenticate) {
// Common passwords.
std::array<const char*, 3> passwords{
"12345",
@@ -47,14 +54,85 @@ BOOST_AUTO_TEST_CASE(correct_passwords_authenticate) {
};
for (const char* p : passwords) {
BOOST_REQUIRE(auth::passwords::check(p, auth::passwords::hash(p, rng_for_salt, auth::passwords::scheme::sha_512)));
BOOST_REQUIRE(co_await auth::passwords::check(p, auth::passwords::hash(p, rng_for_salt, auth::passwords::scheme::sha_512)));
}
}
std::string long_password(uint32_t len) {
std::string out;
auto pattern = "0123456789";
for (uint32_t i = 0; i < len; ++i) {
out.push_back(pattern[i % strlen(pattern)]);
}
return out;
}
SEASTAR_TEST_CASE(same_hashes_as_crypt_h) {
std::string long_pwd_254 = long_password(254);
std::string long_pwd_255 = long_password(255);
std::string long_pwd_511 = long_password(511);
std::array<const char*, 8> passwords{
"12345",
"1_am_the_greatest!",
"password1",
// Some special characters
"!@#$%^&*()_+-=[]{}|\n;:'\",.<>/?",
// UTF-8 characters
"こんにちは、世界!",
// Passwords close to __crypt_sha512 length limit
long_pwd_254.c_str(),
long_pwd_255.c_str(),
// Password of maximal accepted length
long_pwd_511.c_str(),
};
auto salt = "$6$aaaabbbbccccdddd";
for (const char* p : passwords) {
auto res = co_await auth::passwords::detail::hash_with_salt_async(p, salt);
BOOST_REQUIRE(res == auth::passwords::detail::hash_with_salt(p, salt));
}
}
SEASTAR_TEST_CASE(too_long_password) {
auto p1 = long_password(71);
auto p2 = long_password(72);
auto p3 = long_password(73);
auto too_long_password = long_password(512);
auto salt_bcrypt = "$2a$05$mAyzaIeJu41dWUkxEbn8hO";
auto h1_bcrypt = co_await auth::passwords::detail::hash_with_salt_async(p1, salt_bcrypt);
auto h2_bcrypt = co_await auth::passwords::detail::hash_with_salt_async(p2, salt_bcrypt);
auto h3_bcrypt = co_await auth::passwords::detail::hash_with_salt_async(p3, salt_bcrypt);
BOOST_REQUIRE(h1_bcrypt != h2_bcrypt);
// The check below documents the behavior of the current bcrypt
// implementation that compares only the first 72 bytes of the password.
// Although we don't typically use bcrypt for password hashing, it is
// possible to insert such a hash using`CREATE ROLE ... WITH HASHED PASSWORD ...`.
// Refs: scylladb/scylladb#26842
BOOST_REQUIRE(h2_bcrypt == h3_bcrypt);
// The current implementation of bcrypt password hasing fails with passwords of length 512 and above
BOOST_CHECK_THROW(co_await auth::passwords::detail::hash_with_salt_async(too_long_password, salt_bcrypt), std::system_error);
auto salt_sha512 = "$6$aaaabbbbccccdddd";
auto h1_sha512 = co_await auth::passwords::detail::hash_with_salt_async(p1, salt_sha512);
auto h2_sha512 = co_await auth::passwords::detail::hash_with_salt_async(p2, salt_sha512);
auto h3_sha512 = co_await auth::passwords::detail::hash_with_salt_async(p3, salt_sha512);
BOOST_REQUIRE(h1_sha512 != h2_sha512);
BOOST_REQUIRE(h2_sha512 != h3_sha512);
// The current implementation of SHA-512 password hasing fails with passwords of length 512 and above
BOOST_CHECK_THROW(co_await auth::passwords::detail::hash_with_salt_async(too_long_password, salt_sha512), std::system_error);
}
//
// A hashed password that does not match the password in cleartext does not authenticate.
//
BOOST_AUTO_TEST_CASE(incorrect_passwords_do_not_authenticate) {
SEASTAR_TEST_CASE(incorrect_passwords_do_not_authenticate) {
const sstring hashed_password = auth::passwords::hash("actual_password", rng_for_salt,auth::passwords::scheme::sha_512);
BOOST_REQUIRE(!auth::passwords::check("password_guess", hashed_password));
BOOST_REQUIRE(!co_await auth::passwords::check("password_guess", hashed_password));
}

View File

@@ -1450,8 +1450,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
std::map<sstring, replication_strategy_config_option> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto max_rf_factor = std::ranges::min(std::ranges::views::transform(node_count_per_rack.at(dc), [] (auto& x) { return x.second; }));
auto rf = num_racks * tests::random::get_int(1UL, max_rf_factor);
auto rf = num_racks;
options.emplace(dc, fmt::to_string(rf));
}
return options;
@@ -1487,8 +1486,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
bool use_tablets = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
@@ -1502,8 +1500,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
*ars_ptr,
base_token,
view_token,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
use_tablets,
cf_stats).natural_endpoint;
// view pair must be found
@@ -1525,181 +1522,6 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) {
}
}
// Called in a seastar thread
void test_complex_rack_aware_view_pairing_test(bool more_or_less) {
auto my_address = gms::inet_address("localhost");
// Create the RackInferringSnitch
snitch_config cfg;
cfg.listen_address = my_address;
cfg.broadcast_address = my_address;
cfg.name = "RackInferringSnitch";
sharded<snitch_ptr> snitch;
snitch.start(cfg).get();
auto stop_snitch = defer([&snitch] { snitch.stop().get(); });
snitch.invoke_on_all(&snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() };
std::map<sstring, size_t> node_count_per_dc;
std::map<sstring, std::map<sstring, size_t>> node_count_per_rack;
std::vector<ring_point> ring_points;
auto& random_engine = seastar::testing::local_random_engine;
unsigned shard_count = 2;
size_t num_dcs = 1 + tests::random::get_int(3);
// Generate a random cluster
double point = 1;
for (size_t dc = 0; dc < num_dcs; ++dc) {
sstring dc_name = fmt::format("{}", 100 + dc);
size_t num_racks = 2 + tests::random::get_int(4);
for (size_t rack = 0; rack < num_racks; ++rack) {
sstring rack_name = fmt::format("{}", 10 + rack);
size_t rack_nodes = 1 + tests::random::get_int(2);
for (size_t i = 1; i <= rack_nodes; ++i) {
ring_points.emplace_back(point, inet_address(format("192.{}.{}.{}", dc_name, rack_name, i)));
node_count_per_dc[dc_name]++;
node_count_per_rack[dc_name][rack_name]++;
point++;
}
}
}
testlog.debug("node_count_per_rack={}", node_count_per_rack);
// Initialize the token_metadata
locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg);
auto stop_stm = deferred_stop(stm);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert(token{tests::d2t(ring_point / ring_points.size())});
topo.add_node(id, make_endpoint_dc_rack(endpoint), locator::node::state::normal, shard_count);
co_await tm.update_normal_tokens(std::move(tokens), id);
}
}).get();
auto base_schema = schema_builder("ks", "base")
.with_column("k", utf8_type, column_kind::partition_key)
.with_column("v", utf8_type)
.build();
auto view_schema = schema_builder("ks", "view")
.with_column("v", utf8_type, column_kind::partition_key)
.with_column("k", utf8_type)
.build();
auto tmptr = stm.get();
// Create the replication strategy
auto make_random_options = [&] () {
auto option_dcs = node_count_per_dc | std::views::keys | std::ranges::to<std::vector>();
std::shuffle(option_dcs.begin(), option_dcs.end(), random_engine);
std::map<sstring, replication_strategy_config_option> options;
for (const auto& dc : option_dcs) {
auto num_racks = node_count_per_rack.at(dc).size();
auto rf = more_or_less ?
tests::random::get_int(num_racks, node_count_per_dc[dc]) :
tests::random::get_int(1UL, num_racks);
options.emplace(dc, fmt::to_string(rf));
}
return options;
};
auto options = make_random_options();
size_t tablet_count = 1 + tests::random::get_int(99);
testlog.debug("tablet_count={} rf_options={}", tablet_count, options);
locator::replication_strategy_params params(options, tablet_count, std::nullopt);
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", params, tmptr->get_topology());
auto tab_awr_ptr = ars_ptr->maybe_as_tablet_aware();
BOOST_REQUIRE(tab_awr_ptr);
auto base_tmap = tab_awr_ptr->allocate_tablets_for_new_table(base_schema, tmptr, 1).get();
auto base_table_id = base_schema->id();
testlog.debug("base_table_id={}", base_table_id);
auto view_table_id = view_schema->id();
auto view_tmap = tab_awr_ptr->allocate_tablets_for_new_table(view_schema, tmptr, 1).get();
testlog.debug("view_table_id={}", view_table_id);
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
tm.tablets().set_tablet_map(base_table_id, co_await base_tmap.clone_gently());
tm.tablets().set_tablet_map(view_table_id, co_await view_tmap.clone_gently());
}).get();
tmptr = stm.get();
auto base_erm = tab_awr_ptr->make_replication_map(base_table_id, tmptr);
auto view_erm = tab_awr_ptr->make_replication_map(view_table_id, tmptr);
auto& topology = tmptr->get_topology();
testlog.debug("topology: {}", topology.get_datacenter_racks());
// Test tablets rack-aware base-view pairing
auto base_token = dht::token::get_random_token();
auto view_token = dht::token::get_random_token();
bool use_legacy_self_pairing = false;
bool use_tablets_basic_rack_aware_view_pairing = true;
const auto& base_replicas = base_tmap.get_tablet_info(base_tmap.get_tablet_id(base_token)).replicas;
replica::cf_stats cf_stats;
std::unordered_map<locator::host_id, locator::host_id> base_to_view_pairing;
std::unordered_map<locator::host_id, locator::host_id> view_to_base_pairing;
std::unordered_map<sstring, size_t> same_rack_pairs;
std::unordered_map<sstring, size_t> cross_rack_pairs;
for (const auto& base_replica : base_replicas) {
auto& base_host = base_replica.host;
auto view_ep_opt = db::view::get_view_natural_endpoint(
base_host,
base_erm,
view_erm,
*ars_ptr,
base_token,
view_token,
use_legacy_self_pairing,
use_tablets_basic_rack_aware_view_pairing,
cf_stats).natural_endpoint;
// view pair must be found
if (!view_ep_opt) {
BOOST_FAIL(format("Could not pair base_host={} base_token={} view_token={}", base_host, base_token, view_token));
}
BOOST_REQUIRE(view_ep_opt);
auto& view_ep = *view_ep_opt;
// Assert pairing uniqueness
auto [base_it, inserted_base_pair] = base_to_view_pairing.emplace(base_host, view_ep);
BOOST_REQUIRE(inserted_base_pair);
auto [view_it, inserted_view_pair] = view_to_base_pairing.emplace(view_ep, base_host);
BOOST_REQUIRE(inserted_view_pair);
auto& base_location = topology.find_node(base_host)->dc_rack();
auto& view_location = topology.find_node(view_ep)->dc_rack();
// Assert dc- and rack- aware pairing
BOOST_REQUIRE_EQUAL(base_location.dc, view_location.dc);
if (base_location.rack == view_location.rack) {
same_rack_pairs[base_location.dc]++;
} else {
cross_rack_pairs[base_location.dc]++;
}
}
for (const auto& [dc, rf_opt] : options) {
auto rf = locator::get_replication_factor(rf_opt);
BOOST_REQUIRE_EQUAL(same_rack_pairs[dc] + cross_rack_pairs[dc], rf);
}
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_lt_racks) {
test_complex_rack_aware_view_pairing_test(false);
}
SEASTAR_THREAD_TEST_CASE(tablets_complex_rack_aware_view_pairing_test_rf_gt_racks) {
test_complex_rack_aware_view_pairing_test(true);
}
SEASTAR_THREAD_TEST_CASE(test_rack_diff) {
BOOST_REQUIRE(diff_racks({}, {}).empty());

View File

@@ -0,0 +1,367 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "dht/token.hh"
#include "sstable_test.hh"
#include "sstables_loader.hh"
#include "test/lib/sstable_test_env.hh"
BOOST_AUTO_TEST_SUITE(sstable_tablet_streaming_test)
using namespace sstables;
std::vector<shared_sstable> make_sstables_with_ranges(test_env& env, const std::vector<std::pair<int64_t, int64_t>>& ranges) {
std::vector<shared_sstable> ssts;
for (const auto& [first, last] : ranges) {
auto sst = env.make_sstable(uncompressed_schema(), uncompressed_dir());
test(sst).set_first_and_last_keys(dht::decorated_key(dht::token{first}, partition_key(std::vector<bytes>{"1"})),
dht::decorated_key(dht::token{last}, partition_key(std::vector<bytes>{"1"})));
ssts.push_back(std::move(sst));
}
// By sorting SSTables by their primary key, we enable runs to be
// streamed incrementally. Overlapping fragments can be deduplicated,
// reducing the amount of data sent over the wire. Elements are
// popped from the back of the vector, so we sort in descending
// order to begin with the smaller tokens.
// See sstable_streamer constructor for more details.
std::ranges::sort(ssts, [](const shared_sstable& x, const shared_sstable& y) { return x->compare_by_first_key(*y) > 0; });
return ssts;
}
std::vector<dht::token_range> get_tablet_sstable_collection(auto&&... tablet_ranges) {
// tablet ranges are left-non-inclusive, see `tablet_map::get_token_range` for details
std::vector<dht::token_range> collections{dht::token_range::make({tablet_ranges.start()->value(), false}, {tablet_ranges.end()->value(), true})...};
std::sort(collections.begin(), collections.end(), [](auto const& a, auto const& b) { return a.start()->value() < b.start()->value(); });
return collections;
}
#define REQUIRE_WITH_CONTEXT(sstables, expected_size) \
BOOST_TEST_CONTEXT("Testing with ranges: " << [&] { \
std::stringstream ss; \
for (const auto& sst : (sstables)) { \
ss << dht::token_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token()) << ", "; \
} \
return ss.str(); \
}()) \
BOOST_REQUIRE_EQUAL(sstables.size(), expected_size)
SEASTAR_TEST_CASE(test_streaming_ranges_distribution) {
return test_env::do_with_async([](test_env& env) {
// 1) Exact boundary equality: SSTable == tablet
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{5, 10},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 2) Single-point overlaps at start/end
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{4, 5}, // touches start, non-inclusive, skip
{10, 11}, // touches end
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 3) Tablet fully inside a large SSTable
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 20},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 4) Multiple SSTables fully contained in tablet
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{6, 7},
{7, 8},
{8, 9},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 3);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 5) Two overlapping but not fully contained SSTables
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 6}, // overlaps at left
{9, 15}, // overlaps at right
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 6) Unsorted input (helper sorts) + mixed overlaps
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{50}, dht::token{100}});
// Intentionally unsorted by first token
auto ssts = make_sstables_with_ranges(env,
{
{120, 130},
{0, 10},
{60, 70}, // fully contained
{40, 55}, // partial
{95, 105}, // partial
{80, 90}, // fully contained
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 7) Empty SSTable list
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
std::vector<shared_sstable> ssts;
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 8) Tablet outside all SSTables
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
auto ssts = make_sstables_with_ranges(env,
{
{1, 2},
{3, 4},
{10, 20},
{300, 400},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
// 9) Boundary adjacency with multiple fragments
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{100}, dht::token{200}});
auto ssts = make_sstables_with_ranges(env,
{
{50, 100}, // touches start -> non-inclusive, skip
{100, 120}, // starts at start -> partially contained
{180, 200}, // ends at end -> fully contained
{200, 220}, // touches end -> partial
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 1);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
// 10) Large SSTable set where early break should occur
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{1000}, dht::token{2000}});
auto ssts = make_sstables_with_ranges(env,
{
{100, 200},
{300, 400},
{900, 950},
{1001, 1100}, // fully contained
{1500, 1600}, // fully contained
{2101, 2200}, // entirely after -> should trigger early break in ascending scan
{1999, 2100}, // overlap, partially contained
{3000, 3100},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 2);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
// 10) https://github.com/scylladb/scylladb/pull/26980 example, tested
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{4}, dht::token{5}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 5},
{0, 3},
{2, 5},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
// None fully contained; three partial overlaps
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 2);
}
});
}
SEASTAR_TEST_CASE(test_streaming_ranges_distribution_in_tablets) {
return test_env::do_with_async([](test_env& env) {
{
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}}, dht::token_range{dht::token{11}, dht::token{15}});
auto ssts = make_sstables_with_ranges(env,
{
{5, 10},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
}
{
// Multiple tablets with a hole between [10,11]
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}},
dht::token_range{dht::token{5}, dht::token{9}},
dht::token_range{dht::token{12}, dht::token{15}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 4}, // T.start==S.start, but non-inclusive -> partial
{5, 9}, // same as above
{6, 8}, // fully in second tablet
{10, 11}, // falls in the hole, should be rejected
{8, 13}, // overlaps second and third tablets (partial in both)
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 2);
REQUIRE_WITH_CONTEXT(res[2].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
}
{
// SSTables outside any tablet range
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{20}, dht::token{25}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 5}, // before
{30, 35}, // after
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
}
{
// Edge case: SSTable touching tablet boundary
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{5}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{4, 5}, // touches start, non-inclusive, skip
{10, 11}, // touches end
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
}
{
// No tablets, but some SSTables
auto collection = get_tablet_sstable_collection();
auto ssts = make_sstables_with_ranges(env,
{
{0, 5},
{10, 15},
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
BOOST_REQUIRE_EQUAL(res.size(), 0); // no tablets → nothing to classify
}
{
// No SSTables, but some tablets
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{10}, dht::token{15}});
std::vector<shared_sstable> ssts; // empty
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 0);
}
{
// No tablets and no SSTables
auto collection = get_tablet_sstable_collection();
std::vector<shared_sstable> ssts; // empty
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
BOOST_REQUIRE_EQUAL(res.size(), 0);
}
{
// SSTable spanning two tablets
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{4}}, dht::token_range{dht::token{5}, dht::token{9}});
auto ssts = make_sstables_with_ranges(env,
{
{2, 7}, // spans both tablets
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
// Tablet [0,4] sees partial overlap
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
// Tablet [5,9] sees partial overlap
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
}
{
// SSTable spanning three tablets with a hole in between
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{3}},
dht::token_range{dht::token{4}, dht::token{6}},
dht::token_range{dht::token{8}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{2, 9}, // spans across tablets 1,2,3 and hole [7]
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[2].sstables_partially_contained, 1);
}
{
// SSTable fully covering one tablet and partially overlapping another
auto collection = get_tablet_sstable_collection(dht::token_range{dht::token{0}, dht::token{5}}, dht::token_range{dht::token{6}, dht::token{10}});
auto ssts = make_sstables_with_ranges(env,
{
{0, 7}, // fully covers first tablet, partial in second
});
auto res = get_sstables_for_tablets_for_tests(ssts, std::move(collection)).get();
REQUIRE_WITH_CONTEXT(res[0].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[0].sstables_partially_contained, 1);
REQUIRE_WITH_CONTEXT(res[1].sstables_fully_contained, 0);
REQUIRE_WITH_CONTEXT(res[1].sstables_partially_contained, 1);
}
});
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -395,7 +395,7 @@ SEASTAR_TEST_CASE(test_builder_with_concurrent_drop) {
assert_that(msg).is_rows().is_empty();
msg = e.execute_cql("select * from system_distributed.view_build_status").get();
assert_that(msg).is_rows().is_empty();
});
}, 30);
});
}

View File

@@ -25,7 +25,7 @@ from typing import Any, Optional, override
import pytest
import requests
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.cluster import NoHostAvailable, Session
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.query import SimpleStatement, named_tuple_factory
from ccmlib.scylla_node import ScyllaNode, NodeError
@@ -1135,6 +1135,14 @@ class TestCQLAudit(AuditTester):
session.execute("DROP TABLE test1")
def _get_attempt_count(self, session: Session, *, execution_profile=EXEC_PROFILE_DEFAULT, consistency_level: ConsistencyLevel = ConsistencyLevel.ONE) -> int:
# dtest env is using FlakyRetryPolicy which has `max_retries` attribute
cl_profile = session.execution_profile_clone_update(execution_profile, consistency_level=consistency_level)
policy = cl_profile.retry_policy
retries = getattr(policy, "max_retries", None)
assert retries is not None
return 1 + retries
def _test_insert_failure_doesnt_report_success_assign_nodes(self, session: Session = None):
all_nodes: set[ScyllaNode] = set(self.cluster.nodelist())
assert len(all_nodes) == 7
@@ -1154,6 +1162,7 @@ class TestCQLAudit(AuditTester):
for i in range(256):
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES ({i}, 1337)", consistency_level=ConsistencyLevel.THREE)
session.execute(stmt)
attempt_count = self._get_attempt_count(session, consistency_level=ConsistencyLevel.THREE)
token = rows_to_list(session.execute(f"SELECT token(k) FROM ks.test1 WHERE k = {i}"))[0][0]
@@ -1168,9 +1177,9 @@ class TestCQLAudit(AuditTester):
audit_partition_nodes = [address_to_node[address] for address in audit_nodes]
insert_node = address_to_node[insert_node.pop()]
kill_node = address_to_node[partitions.pop()]
return audit_partition_nodes, insert_node, kill_node, stmt.query_string
return audit_partition_nodes, insert_node, kill_node, stmt.query_string, attempt_count
return [], [], None, None
return [], [], None, None, None
@pytest.mark.exclude_errors("audit - Unexpected exception when writing log with: node_ip")
def test_insert_failure_doesnt_report_success(self):
@@ -1192,7 +1201,7 @@ class TestCQLAudit(AuditTester):
with self.assert_exactly_n_audit_entries_were_added(session, 1):
conn.execute(stmt)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail, query_fail_count = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
# TODO: remove the loop when scylladb#24473 is fixed
# We call get_host_id only to cache host_id
@@ -1231,8 +1240,8 @@ class TestCQLAudit(AuditTester):
# If any audit mode is not done yet, continue polling.
all_modes_done = True
for mode, rows in rows_dict.items():
rows_with_error = list(filter(lambda r: r.error, rows))
if len(rows_with_error) == 6:
rows_with_error = [row for row in rows if row.error and row.operation == query_to_fail]
if len(rows_with_error) == query_fail_count:
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
assert rows_with_error[0].error is True
assert rows_with_error[0].consistency == "THREE"

View File

@@ -16,16 +16,26 @@ from test.cluster.util import get_topology_coordinator, new_test_keyspace, recon
logger = logging.getLogger(__name__)
# This test makes sure that view building is done mainly in the streaming scheduling group
# and not the gossip scheduling group. We do that by measuring the time each group was
# busy during the view building process and confirming that the gossip group was busy
# much less than the streaming group.
# Reproduces https://github.com/scylladb/scylladb/issues/21232
# This test makes sure that view building is done mainly in the streaming
# scheduling group. We check that by grepping all relevant logs in TRACE mode
# and verifying that they come from the streaming scheduling group.
#
# For more context, see: https://github.com/scylladb/scylladb/issues/21232.
# This test reproduces the issue in non-tablet mode.
@pytest.mark.asyncio
@skip_mode('debug', 'the test needs to do some work which takes too much time in debug mode')
async def test_view_building_scheduling_group(manager: ManagerClient):
server = await manager.server_add()
# Note: The view building coordinator works in the gossiping scheduling group,
# and we intentionally omit it here.
# Note: We include "view" for keyspaces that don't use the view building coordinator
# and will follow the legacy path instead.
loggers = ["view_building_worker", "view_consumer", "view_update_generator", "view"]
# Flatten the list of lists.
cmdline = sum([["--logger-log-level", f"{logger}=trace"] for logger in loggers], [])
server = await manager.server_add(cmdline=cmdline)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (p int, c int, PRIMARY KEY (p, c))")
@@ -35,21 +45,30 @@ async def test_view_building_scheduling_group(manager: ManagerClient):
batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(inserts) + "\nAPPLY BATCH\n"
await manager.cql.run_async(batch)
metrics_before = await manager.metrics.query(server.ip_addr)
ms_gossip_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_before = metrics_before.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv AS SELECT p, c FROM {ks}.tab WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
await wait_for_view(cql, 'mv', 1)
metrics_after = await manager.metrics.query(server.ip_addr)
ms_gossip_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'gossip'})
ms_streaming_after = metrics_after.get('scylla_scheduler_runtime_ms', {'group': 'streaming'})
ms_streaming = ms_streaming_after - ms_streaming_before
ms_statement = ms_gossip_after - ms_gossip_before
ratio = ms_statement / ms_streaming
print(f"ms_streaming: {ms_streaming}, ms_statement: {ms_statement}, ratio: {ratio}")
assert ratio < 0.1
logger_alternative = "|".join(loggers)
pattern = rf"\[shard [0-9]+:(.+)\] ({logger_alternative}) - "
results = await log.grep(pattern, from_mark=mark)
# Sanity check. If there are no logs, something's wrong.
assert len(results) > 0
# In case of non-tablet keyspaces, we won't use the view building coordinator.
# Instead, view updates will follow the legacy path. Along the way, we'll observe
# this message, which will be printed using another scheduling group, so let's
# filter it out.
predicate = lambda result: f"Building view {ks}.mv, starting at token" not in result[0]
results = list(filter(predicate, results))
# Take the first parenthesized match for each result, i.e. the scheduling group.
sched_groups = [matches[1] for _, matches in results]
assert all(sched_group == "strm" for sched_group in sched_groups)
# A sanity check test ensures that starting and shutting down Scylla when view building is
# disabled is conducted properly and we don't run into any issues.

View File

@@ -25,12 +25,14 @@ import json
from cassandra.auth import PlainTextAuthProvider
import threading
import random
import re
from test.cluster.util import get_replication
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.pylib.tablets import get_all_tablet_replicas
from test.cluster.conftest import skip_mode
from test.pylib.tablets import get_tablet_replica
logger = logging.getLogger(__name__)
@@ -969,3 +971,118 @@ async def test_alternator_concurrent_rmw_same_partition_different_server(manager
t.join()
finally:
table.delete()
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
"""
Reproducer for issue #27353.
LWT requires that storage_proxy::cas() is invoked on a valid shard — the one
returned by sharder.try_get_shard_for_reads() for a tablets-based table.
The bug: if the current shard is invalid and we jump to the valid shard, that
new shard may become invalid again by the time we attempt to capture the ERM.
This leads to a failure of the CAS path.
The fix: retry the validity check and jump again if the current shard is already
invalid. We should exit the loop once the shard is valid *and* we hold a strong pointer
to the ERM — which prevents further tablet movements until the ERM is released.
This problem is specific to BatchWriteItem; other commands are already handled
correctly.
"""
config = alternator_config.copy()
config['alternator_write_isolation'] = 'always_use_lwt'
cmdline = [
'--logger-log-level', 'alternator-executor=trace',
'--logger-log-level', 'alternator_controller=trace',
'--logger-log-level', 'paxos=trace'
]
server = await manager.server_add(config=config, cmdline=cmdline)
alternator = get_alternator(server.ip_addr)
logger.info("Creating alternator test table")
table = alternator.create_table(TableName=unique_table_name(),
Tags=[{'Key': 'system:initial_tablets', 'Value': '1'}],
BillingMode='PAY_PER_REQUEST',
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'N'}])
table_name = table.name
ks_name = 'alternator_' + table_name
last_token = 7 # Any token works since we have only one tablet
(src_host_id, src_shard) = await get_tablet_replica(manager, server, ks_name, table_name, last_token)
dst_shard = 0 if src_shard == 1 else 1
logger.info("Inject 'intranode_migration_streaming_wait'")
await manager.api.enable_injection(server.ip_addr,
"intranode_migration_streaming_wait",
one_shot=False)
logger.info("Start tablet migration")
intranode_migration_task = asyncio.create_task(
manager.api.move_tablet(server.ip_addr, ks_name, table_name,
src_host_id, src_shard,
src_host_id, dst_shard, last_token))
logger.info("Open server logs")
log = await manager.server_open_log(server.server_id)
logger.info("Wait for intranode_migration_streaming_wait")
await log.wait_for("intranode_migration_streaming: waiting")
logger.info("Inject 'alternator_executor_batch_write_wait'")
await manager.api.enable_injection(server.ip_addr,
"alternator_executor_batch_write_wait",
one_shot=False,
parameters={
'table': table_name,
'keyspace': ks_name,
'shard': dst_shard
})
m = await log.mark()
# Start a background thread, which tries to hit the alternator_executor_batch_write_wait
# injection on the destination shard.
logger.info("Start a batch_write thread")
stop_event = threading.Event()
def run_batch():
alternator = get_alternator(server.ip_addr)
table = alternator.Table(table_name)
while not stop_event.is_set():
with table.batch_writer() as batch:
batch.put_item(Item={'p': 1, 'x': 'hellow world'})
t = ThreadWrapper(target=run_batch)
t.start()
logger.info("Waiting for 'alternator_executor_batch_write_wait: hit'")
await log.wait_for("alternator_executor_batch_write_wait: hit", from_mark=m)
# We have a batch request with "streaming" cas_shard on the destination shard.
# This means we have already made a decision to jump to the src_shard.
# Now we're releasing the tablet migration so that it reaches write_both_read_new and
# and invaldiates this decision.
m = await log.mark()
await manager.api.message_injection(server.ip_addr, "intranode_migration_streaming_wait")
# The next barrier must be for the write_both_read_new, we need a guarantee
# that the src_shard observed it
logger.info("Waiting for the next barrier")
await log.wait_for(re.escape(f"[shard {src_shard}: gms] raft_topology - raft_topology_cmd::barrier_and_drain done"),
from_mark=m)
# Now we have a guarantee that a new barrier succeeded on the src_shard,
# this means the src_shard has already transitioned to write_both_read_new,
# and our batch write will have to jump back to the destination shard.
logger.info("Release the 'alternator_executor_batch_write_wait'")
await manager.api.message_injection(server.ip_addr, "alternator_executor_batch_write_wait")
logger.info("Waiting for migratino task to finish")
await intranode_migration_task
stop_event.set()
t.join()

View File

@@ -220,14 +220,14 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 0, 100)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
skipped_bytes = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes = get_incremental_repair_sst_read_bytes(servers[0])
# Nothing to skip. Repair all data.
assert skipped_bytes == 0
assert read_bytes > 0
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
skipped_bytes2 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes2 = get_incremental_repair_sst_read_bytes(servers[0])
# Skip all. Nothing to repair
@@ -236,7 +236,7 @@ async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient
await insert_keys(cql, ks, 200, 300)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
skipped_bytes3 = get_incremental_repair_sst_skipped_bytes(servers[0])
read_bytes3 = get_incremental_repair_sst_read_bytes(servers[0])
# Both skipped and read bytes should grow
@@ -272,7 +272,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert get_sstables_repaired_at(map0, token) == sstables_repaired_at
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
map1 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map1={map1}')
# Check sstables_repaired_at is increased by 1
@@ -288,7 +288,7 @@ async def test_tablet_incremental_repair(manager: ManagerClient):
assert len(enable) == 1
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
map2 = await load_tablet_sstables_repaired_at(manager, cql, servers[0], hosts[0], table_id)
logging.info(f'map2={map2}')
# Check sstables_repaired_at is increased by 1
@@ -313,7 +313,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
# Repair should not finish with error
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
try:
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, timeout=10)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental', timeout=10)
assert False # Check the tablet repair is not supposed to finish
except TimeoutError:
logger.info("Repair timeout as expected")
@@ -329,7 +329,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 0 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
@@ -355,7 +355,7 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
else:
assert False # Wrong ops
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 1 skip 1 mark
for log in logs:
@@ -394,7 +394,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -402,7 +402,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -436,7 +436,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
# Insert more keys
@@ -445,7 +445,7 @@ async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_spl
current_key += nr_keys
# Second repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 3
# Insert more keys and flush to get 2 more sstables
for _ in range(2):
@@ -505,7 +505,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
await manager.server_start(servers[1].server_id)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
scylla_path = get_scylla_path(cql)
@@ -521,8 +521,8 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -532,7 +532,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
# Second repair
await inject_error_on(manager, "repair_tablet_no_update_sstables_repair_at", servers)
# some sstable repaired_at = 3, but sstables_repaired_at = 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
await inject_error_off(manager, "repair_tablet_no_update_sstables_repair_at", servers)
scylla_path = get_scylla_path(cql)
@@ -561,8 +561,8 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 2
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 2
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -574,7 +574,7 @@ async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_
last_tokens = [t.last_token for t in replicas]
for t in last_tokens[0::2]:
logging.info(f"Start repair for token={t}");
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t) # sstables_repaired_at 3
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", t, incremental_mode='incremental') # sstables_repaired_at 3
scylla_path = get_scylla_path(cql)
@@ -595,7 +595,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token) # sstables_repaired_at 1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
# Insert more keys
await insert_keys(cql, ks, current_key, current_key + nr_keys)
@@ -659,13 +659,18 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
assert read1 == 0
assert skip2 == 0
assert read2 > 0
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
await do_repair_and_check('incremental', 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
def check2(skip1, read1, skip2, read2):
assert skip1 == skip2
assert read1 == read2
await do_repair_and_check('disabled', 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
# FIXME: Incremental repair is disabled by default due to
# https://github.com/scylladb/scylladb/issues/26041 and
# https://github.com/scylladb/scylladb/issues/27414
await do_repair_and_check(None, 0, rf'Starting tablet repair by API .* incremental_mode=disabled.*', check2)
def check3(skip1, read1, skip2, read2):
assert skip1 < skip2
assert read1 == read2
@@ -677,14 +682,14 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
await do_repair_and_check('full', 1, rf'Starting tablet repair by API .* incremental_mode=full.*', check4)
@pytest.mark.asyncio
async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
for s in servers:
time1 += get_repair_tablet_time_ms(s)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
for s in servers:
time2 += get_repair_tablet_time_ms(s)
@@ -694,7 +699,7 @@ async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
# Reproducer for https://github.com/scylladb/scylladb/issues/26346
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
async def test_incremental_repair_finishes_when_tablet_skips_end_repair_stage(manager):
servers = await manager.servers_add(3, auto_rack_dc="dc1")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
@@ -719,7 +724,7 @@ async def test_repair_finishes_when_tablet_skips_end_repair_stage(manager):
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_rejoin_do_tablet_operation(manager):
async def test_incremental_repair_rejoin_do_tablet_operation(manager):
cmdline = ['--logger-log-level', 'raft_topology=debug']
servers = await manager.servers_add(3, auto_rack_dc="dc1", cmdline=cmdline)

View File

@@ -10,6 +10,7 @@ import logging
from test.pylib.rest_client import inject_error_one_shot
from test.cluster.util import new_test_keyspace
from test.pylib.util import gather_safely
logger = logging.getLogger(__name__)
@@ -33,25 +34,12 @@ async def test_broken_bootstrap(manager: ManagerClient):
except Exception:
pass
await manager.server_stop(server_b.server_id)
await manager.server_stop(server_a.server_id)
stop_event = asyncio.Event()
async def worker():
logger.info("Worker started")
while not stop_event.is_set():
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
await asyncio.sleep(0.1)
logger.info("Worker stopped")
await gather_safely(*(manager.server_stop(srv.server_id) for srv in [server_a, server_b]))
await manager.server_start(server_a.server_id)
await manager.driver_connect()
worker_task = asyncio.create_task(worker())
await asyncio.sleep(20)
stop_event.set()
await worker_task
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i

View File

@@ -4,7 +4,8 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from typing import Any
from cassandra.query import ConsistencyLevel
from cassandra.query import ConsistencyLevel, SimpleStatement
from cassandra.policies import FallthroughRetryPolicy
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
from test.pylib.manager_client import ManagerClient
@@ -1596,7 +1597,7 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
async def truncate_table():
await asyncio.sleep(10)
logger.info("Executing truncate during bootstrap")
await cql.run_async(f"TRUNCATE {ks}.test USING TIMEOUT 1m")
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
truncate_task = asyncio.create_task(truncate_table())
logger.info("Adding fourth node")

View File

@@ -67,11 +67,11 @@ nodetool_cmd.conf = False
# Run the external "nodetool" executable (can be overridden by the NODETOOL
# environment variable). Only call this if the REST API doesn't work.
def run_nodetool(cql, *args):
def run_nodetool(cql, *args, **subprocess_kwargs):
# TODO: We may need to change this function or its callers to add proper
# support for testing on multi-node clusters.
host = cql.cluster.contact_points[0]
subprocess.run([nodetool_cmd(), '-h', host, *args])
return subprocess.run([nodetool_cmd(), '-h', host, *args], **subprocess_kwargs)
def flush(cql, table):
ks, cf = table.split('.')
@@ -157,6 +157,28 @@ def disablebinary(cql):
else:
run_nodetool(cql, "disablebinary")
def getlogginglevel(cql, logger):
if has_rest_api(cql):
resp = requests.get(f'{rest_api_url(cql)}/system/logger/{logger}')
if resp.ok:
return resp.text.strip()
raise RuntimeError(f"failed to fetch logging level for {logger}: {resp.status_code} {resp.text}")
result = run_nodetool(
cql,
"getlogginglevels",
capture_output=True,
text=True,
check=True,
)
for line in result.stdout.splitlines():
stripped = line.strip()
parts = stripped.split()
if len(parts) >= 2 and parts[0] == logger:
return parts[-1]
raise RuntimeError(f"logger {logger} not found in getlogginglevels output")
def setlogginglevel(cql, logger, level):
if has_rest_api(cql):
requests.post(f'{rest_api_url(cql)}/system/logger/{logger}', params={'level': level})

View File

@@ -10,6 +10,7 @@ import re
import requests
import socket
import struct
from test.cqlpy import nodetool
from test.cqlpy.util import cql_session
def get_protocol_error_metrics(host) -> int:
@@ -58,11 +59,50 @@ def try_connect(host, port, creds, protocol_version):
with cql_with_protocol(host, port, creds, protocol_version) as session:
return 1 if session else 0
@pytest.fixture
def debug_exceptions_logging(request, cql):
def _read_level() -> str | None:
try:
level = nodetool.getlogginglevel(cql, "exception")
if level:
level = level.strip().strip('"').lower()
return level
except Exception as exc:
print(f"Failed to read exception logger level: {exc}")
return None
def _set_and_verify(level: str) -> bool:
try:
nodetool.setlogginglevel(cql, "exception", level)
except Exception as exc:
print(f"Failed to set exception logger level to '{level}': {exc}")
return False
observed = _read_level()
if observed == level:
return True
print(f"Exception logger level observed as '{observed}' while expecting '{level}'")
return False
def _restore_logging():
if not enabled and previous_level is None:
return
target_level = previous_level or "info"
_set_and_verify(target_level)
previous_level = _read_level()
enabled = _set_and_verify("debug")
yield
_restore_logging()
# If there is a protocol version mismatch, the server should
# raise a protocol error, which is counted in the metrics.
def test_protocol_version_mismatch(scylla_only, request, host):
run_count = 100
cpp_exception_threshold = 10
def test_protocol_version_mismatch(scylla_only, debug_exceptions_logging, request, host):
run_count = 200
cpp_exception_threshold = 20
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)
@@ -244,8 +284,8 @@ def _protocol_error_impl(
s.close()
def _test_impl(host, flag):
run_count = 100
cpp_exception_threshold = 10
run_count = 200
cpp_exception_threshold = 20
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)
@@ -267,47 +307,47 @@ def no_ssl(request):
yield
# Malformed BATCH with an invalid kind triggers a protocol error.
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, host):
def test_invalid_kind_in_batch_message(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_bad_batch")
# Send OPTIONS during AUTHENTICATE to trigger auth-state error.
def test_unexpected_message_during_auth(scylla_only, no_ssl, host):
def test_unexpected_message_during_auth(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_unexpected_auth")
# STARTUP with an invalid/missing string-map entry should produce a protocol error.
def test_process_startup_invalid_string_map(scylla_only, no_ssl, host):
def test_process_startup_invalid_string_map(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_startup_invalid_string_map")
# STARTUP with unknown COMPRESSION option should produce a protocol error.
def test_unknown_compression_algorithm(scylla_only, no_ssl, host):
def test_unknown_compression_algorithm(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_unknown_compression")
# QUERY long-string truncation: declared length > provided bytes triggers protocol error.
def test_process_query_internal_malformed_query(scylla_only, no_ssl, host):
def test_process_query_internal_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_query_internal_malformed_query")
# QUERY options malformed: PAGE_SIZE flag set but page_size truncated triggers protocol error.
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, host):
def test_process_query_internal_fail_read_options(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_query_internal_fail_read_options")
# PREPARE long-string truncation: declared length > provided bytes triggers protocol error.
def test_process_prepare_malformed_query(scylla_only, no_ssl, host):
def test_process_prepare_malformed_query(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_prepare_malformed_query")
# EXECUTE cache-key malformed: short-bytes length > provided bytes triggers protocol error.
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, host):
def test_process_execute_internal_malformed_cache_key(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_execute_internal_malformed_cache_key")
# REGISTER malformed string list: declared string length > provided bytes triggers protocol error.
def test_process_register_malformed_string_list(scylla_only, no_ssl, host):
def test_process_register_malformed_string_list(scylla_only, no_ssl, debug_exceptions_logging, host):
_test_impl(host, "trigger_process_register_malformed_string_list")
# Test if the protocol exceptions do not decrease after running the test happy path.
# This is to ensure that the protocol exceptions are not cleared or reset
# during the test execution.
def test_no_protocol_exceptions(scylla_only, no_ssl, host):
run_count = 100
cpp_exception_threshold = 10
def test_no_protocol_exceptions(scylla_only, no_ssl, debug_exceptions_logging, host):
run_count = 200
cpp_exception_threshold = 20
cpp_exception_metrics_before = get_cpp_exceptions_metrics(host)
protocol_exception_metrics_before = get_protocol_error_metrics(host)

View File

@@ -1128,11 +1128,8 @@ private:
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
_auth_service.start(perm_cache_config, std::ref(_qp), std::ref(group0_client), std::ref(_mnotifier), std::ref(_mm), auth_config, maintenance_socket_enabled::no, std::ref(_auth_cache)).get();
const uint64_t niceness = 19;
auto hashing_worker = utils::alien_worker(startlog, niceness, "pwd-hash");
_auth_service.start(perm_cache_config, std::ref(_qp), std::ref(group0_client), std::ref(_mnotifier), std::ref(_mm), auth_config, maintenance_socket_enabled::no, std::ref(_auth_cache), std::ref(hashing_worker)).get();
_auth_service.invoke_on_all([this] (auth::service& auth) {
return auth.start(_mm.local(), _sys_ks.local());
}).get();

View File

@@ -163,6 +163,11 @@ public:
_sst->_shards.push_back(this_shard_id());
}
void set_first_and_last_keys(const dht::decorated_key& first_key, const dht::decorated_key& last_key) {
_sst->_first = first_key;
_sst->_last = last_key;
}
void rewrite_toc_without_component(component_type component) {
SCYLLA_ASSERT(component != component_type::TOC);
_sst->_recognized_components.erase(component);

View File

@@ -109,7 +109,7 @@ def validate_status_output(res, keyspace, nodes, ownership, resolve, effective_o
assert load_unit is not None
assert load == "{:.2f}".format(int(node.load) / load_multiplier[load_unit])
if token_count_unknown:
tokens == "?"
assert tokens == "?"
else:
assert int(tokens) == len(node.tokens)
if effective_ownership_unknown:

View File

@@ -109,6 +109,7 @@ class ResourceGather(ABC):
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise

View File

@@ -789,7 +789,7 @@ class ScyllaServer:
while time.time() < self.start_time + self.TOPOLOGY_TIMEOUT and not self.stop_event.is_set():
assert self.cmd is not None
if self.cmd.returncode:
if self.cmd.returncode is not None:
self.cmd = None
if expected_error is not None:
with self.log_filename.open("r", encoding="utf-8") as log_file:

View File

@@ -654,7 +654,7 @@ void cluster_repair_operation(scylla_rest_client& client, const bpo::variables_m
for (const auto& table : tables.empty() ? ks_to_cfs[keyspace] : tables) {
repair_params["table"] = table;
try {
sstring task_id = client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"].GetString();
sstring task_id = rjson::to_sstring(client.post("/storage_service/tablets/repair", repair_params).GetObject()["tablet_task_id"]);
log("Starting repair with task_id={} keyspace={} table={}", task_id, keyspace, table);

View File

@@ -1,4 +1,4 @@
FROM docker.io/fedora:42
FROM registry.fedoraproject.org/fedora:43
ARG CLANG_BUILD="SKIP"
ARG CLANG_ARCHIVES

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-42-20251122
docker.io/scylladb/scylla-toolchain:fedora-43-20251208

View File

@@ -65,7 +65,7 @@ SCYLLA_BUILD_DIR_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_BUILD_DIR}"
SCYLLA_NINJA_FILE_FULLPATH="${SCYLLA_DIR}"/"${SCYLLA_NINJA_FILE}"
# Which LLVM release to build in order to compile Scylla
LLVM_CLANG_TAG=20.1.8
LLVM_CLANG_TAG=21.1.6
CLANG_ARCHIVE=$(cd "${SCYLLA_DIR}" && realpath -m "${CLANG_ARCHIVE}")
@@ -186,7 +186,3 @@ if [[ $? -ne 0 ]]; then
fi
set -e
tar -C / -xpzf "${CLANG_ARCHIVE}"
dnf remove -y clang clang-libs
# above package removal might have removed those symbolic links, which will cause ccache not to work later on. Manually restore them.
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang
ln -sf /usr/bin/ccache /usr/lib64/ccache/clang++

View File

@@ -29,11 +29,8 @@ class counted_data_source_impl : public data_source_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () {
return fun();
}).finally([this] () {
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};
@@ -60,11 +57,8 @@ class counted_data_sink_impl : public data_sink_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
return futurize_invoke([this] () {
_cpu_concurrency.units.return_all();
}).then([fun = std::move(fun)] () mutable {
return fun();
}).finally([this] () {
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
});
};

View File

@@ -832,6 +832,12 @@ to_bytes(bytes_view x) {
return bytes(x.begin(), x.size());
}
inline
bytes
to_bytes(std::string_view x) {
return to_bytes(to_bytes_view(x));
}
inline
bytes_opt
to_bytes_opt(bytes_view_opt bv) {

View File

@@ -15,6 +15,7 @@ target_sources(utils
buffer_input_stream.cc
build_id.cc
config_file.cc
crypt_sha512.cc
directories.cc
disk-error-handler.cc
disk_space_monitor.cc

381
utils/crypt_sha512.cc Normal file
View File

@@ -0,0 +1,381 @@
/*
* This file originates from musl libc (git.musl-libc.org).
* Modifications have been made and are licensed under the following terms:
* Copyright (C) 2025-present ScyllaDB
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*
* public domain sha512 crypt implementation
*
* original sha crypt design: http://people.redhat.com/drepper/SHA-crypt.txt
* in this implementation at least 32bit int is assumed,
* key length is limited, the $6$ prefix is mandatory, '\n' and ':' is rejected
* in the salt and rounds= setting must contain a valid iteration count,
* on error "*" is returned.
*/
#include <ctype.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include "crypt_sha512.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
/* public domain sha512 implementation based on fips180-3 */
/* >=2^64 bits messages are not supported (about 2000 peta bytes) */
struct sha512 {
uint64_t len; /* processed message length */
uint64_t h[8]; /* hash state */
uint8_t buf[128]; /* message block buffer */
};
static uint64_t ror(uint64_t n, int k) { return (n >> k) | (n << (64-k)); }
#define Ch(x,y,z) (z ^ (x & (y ^ z)))
#define Maj(x,y,z) ((x & y) | (z & (x | y)))
#define S0(x) (ror(x,28) ^ ror(x,34) ^ ror(x,39))
#define S1(x) (ror(x,14) ^ ror(x,18) ^ ror(x,41))
#define R0(x) (ror(x,1) ^ ror(x,8) ^ (x>>7))
#define R1(x) (ror(x,19) ^ ror(x,61) ^ (x>>6))
static const uint64_t K[80] = {
0x428a2f98d728ae22ULL, 0x7137449123ef65cdULL, 0xb5c0fbcfec4d3b2fULL, 0xe9b5dba58189dbbcULL,
0x3956c25bf348b538ULL, 0x59f111f1b605d019ULL, 0x923f82a4af194f9bULL, 0xab1c5ed5da6d8118ULL,
0xd807aa98a3030242ULL, 0x12835b0145706fbeULL, 0x243185be4ee4b28cULL, 0x550c7dc3d5ffb4e2ULL,
0x72be5d74f27b896fULL, 0x80deb1fe3b1696b1ULL, 0x9bdc06a725c71235ULL, 0xc19bf174cf692694ULL,
0xe49b69c19ef14ad2ULL, 0xefbe4786384f25e3ULL, 0x0fc19dc68b8cd5b5ULL, 0x240ca1cc77ac9c65ULL,
0x2de92c6f592b0275ULL, 0x4a7484aa6ea6e483ULL, 0x5cb0a9dcbd41fbd4ULL, 0x76f988da831153b5ULL,
0x983e5152ee66dfabULL, 0xa831c66d2db43210ULL, 0xb00327c898fb213fULL, 0xbf597fc7beef0ee4ULL,
0xc6e00bf33da88fc2ULL, 0xd5a79147930aa725ULL, 0x06ca6351e003826fULL, 0x142929670a0e6e70ULL,
0x27b70a8546d22ffcULL, 0x2e1b21385c26c926ULL, 0x4d2c6dfc5ac42aedULL, 0x53380d139d95b3dfULL,
0x650a73548baf63deULL, 0x766a0abb3c77b2a8ULL, 0x81c2c92e47edaee6ULL, 0x92722c851482353bULL,
0xa2bfe8a14cf10364ULL, 0xa81a664bbc423001ULL, 0xc24b8b70d0f89791ULL, 0xc76c51a30654be30ULL,
0xd192e819d6ef5218ULL, 0xd69906245565a910ULL, 0xf40e35855771202aULL, 0x106aa07032bbd1b8ULL,
0x19a4c116b8d2d0c8ULL, 0x1e376c085141ab53ULL, 0x2748774cdf8eeb99ULL, 0x34b0bcb5e19b48a8ULL,
0x391c0cb3c5c95a63ULL, 0x4ed8aa4ae3418acbULL, 0x5b9cca4f7763e373ULL, 0x682e6ff3d6b2b8a3ULL,
0x748f82ee5defb2fcULL, 0x78a5636f43172f60ULL, 0x84c87814a1f0ab72ULL, 0x8cc702081a6439ecULL,
0x90befffa23631e28ULL, 0xa4506cebde82bde9ULL, 0xbef9a3f7b2c67915ULL, 0xc67178f2e372532bULL,
0xca273eceea26619cULL, 0xd186b8c721c0c207ULL, 0xeada7dd6cde0eb1eULL, 0xf57d4f7fee6ed178ULL,
0x06f067aa72176fbaULL, 0x0a637dc5a2c898a6ULL, 0x113f9804bef90daeULL, 0x1b710b35131c471bULL,
0x28db77f523047d84ULL, 0x32caab7b40c72493ULL, 0x3c9ebe0a15c9bebcULL, 0x431d67c49c100d4cULL,
0x4cc5d4becb3e42b6ULL, 0x597f299cfc657e2aULL, 0x5fcb6fab3ad6faecULL, 0x6c44198c4a475817ULL
};
static void processblock(struct sha512 *s, const uint8_t *buf)
{
uint64_t W[80], t1, t2, a, b, c, d, e, f, g, h;
int i;
for (i = 0; i < 16; i++) {
W[i] = (uint64_t)buf[8*i]<<56;
W[i] |= (uint64_t)buf[8*i+1]<<48;
W[i] |= (uint64_t)buf[8*i+2]<<40;
W[i] |= (uint64_t)buf[8*i+3]<<32;
W[i] |= (uint64_t)buf[8*i+4]<<24;
W[i] |= (uint64_t)buf[8*i+5]<<16;
W[i] |= (uint64_t)buf[8*i+6]<<8;
W[i] |= buf[8*i+7];
}
for (; i < 80; i++)
W[i] = R1(W[i-2]) + W[i-7] + R0(W[i-15]) + W[i-16];
a = s->h[0];
b = s->h[1];
c = s->h[2];
d = s->h[3];
e = s->h[4];
f = s->h[5];
g = s->h[6];
h = s->h[7];
for (i = 0; i < 80; i++) {
t1 = h + S1(e) + Ch(e,f,g) + K[i] + W[i];
t2 = S0(a) + Maj(a,b,c);
h = g;
g = f;
f = e;
e = d + t1;
d = c;
c = b;
b = a;
a = t1 + t2;
}
s->h[0] += a;
s->h[1] += b;
s->h[2] += c;
s->h[3] += d;
s->h[4] += e;
s->h[5] += f;
s->h[6] += g;
s->h[7] += h;
}
static void pad(struct sha512 *s)
{
unsigned r = s->len % 128;
s->buf[r++] = 0x80;
if (r > 112) {
memset(s->buf + r, 0, 128 - r);
r = 0;
processblock(s, s->buf);
}
memset(s->buf + r, 0, 120 - r);
s->len *= 8;
s->buf[120] = s->len >> 56;
s->buf[121] = s->len >> 48;
s->buf[122] = s->len >> 40;
s->buf[123] = s->len >> 32;
s->buf[124] = s->len >> 24;
s->buf[125] = s->len >> 16;
s->buf[126] = s->len >> 8;
s->buf[127] = s->len;
processblock(s, s->buf);
}
static void sha512_init(struct sha512 *s)
{
s->len = 0;
s->h[0] = 0x6a09e667f3bcc908ULL;
s->h[1] = 0xbb67ae8584caa73bULL;
s->h[2] = 0x3c6ef372fe94f82bULL;
s->h[3] = 0xa54ff53a5f1d36f1ULL;
s->h[4] = 0x510e527fade682d1ULL;
s->h[5] = 0x9b05688c2b3e6c1fULL;
s->h[6] = 0x1f83d9abfb41bd6bULL;
s->h[7] = 0x5be0cd19137e2179ULL;
}
static void sha512_sum(struct sha512 *s, uint8_t *md)
{
int i;
pad(s);
for (i = 0; i < 8; i++) {
md[8*i] = s->h[i] >> 56;
md[8*i+1] = s->h[i] >> 48;
md[8*i+2] = s->h[i] >> 40;
md[8*i+3] = s->h[i] >> 32;
md[8*i+4] = s->h[i] >> 24;
md[8*i+5] = s->h[i] >> 16;
md[8*i+6] = s->h[i] >> 8;
md[8*i+7] = s->h[i];
}
}
static void sha512_update(struct sha512 *s, const void *m, unsigned long len)
{
const uint8_t *p = (const uint8_t *)m;
unsigned r = s->len % 128;
s->len += len;
if (r) {
if (len < 128 - r) {
memcpy(s->buf + r, p, len);
return;
}
memcpy(s->buf + r, p, 128 - r);
len -= 128 - r;
p += 128 - r;
processblock(s, s->buf);
}
for (; len >= 128; len -= 128, p += 128)
processblock(s, p);
memcpy(s->buf, p, len);
}
static const unsigned char b64[] =
"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
static char *to64(char *s, unsigned int u, int n)
{
while (--n >= 0) {
*s++ = b64[u % 64];
u /= 64;
}
return s;
}
/* key limit is not part of the original design, added for DoS protection.
* rounds limit has been lowered (versus the reference/spec), also for DoS
* protection. runtime is O(klen^2 + klen*rounds) */
#define KEY_MAX 256
#define SALT_MAX 16
#define ROUNDS_DEFAULT 5000
#define ROUNDS_MIN 1000
#define ROUNDS_MAX 9999999
/* hash n bytes of the repeated md message digest */
static void hashmd(struct sha512 *s, unsigned int n, const void *md)
{
unsigned int i;
for (i = n; i > 64; i -= 64)
sha512_update(s, md, 64);
sha512_update(s, md, i);
}
static seastar::future<char *> sha512crypt(const char *key, const char *setting, char *output)
{
struct sha512 ctx;
unsigned char md[64], kmd[64], smd[64];
unsigned int i, r, klen, slen;
char rounds[20] = "";
const char *salt;
char *p;
/* reject large keys */
for (i = 0; i <= KEY_MAX && key[i]; i++);
if (i > KEY_MAX)
co_return nullptr;
klen = i;
/* setting: $6$rounds=n$salt$ (rounds=n$ and closing $ are optional) */
if (strncmp(setting, "$6$", 3) != 0)
co_return nullptr;
salt = setting + 3;
r = ROUNDS_DEFAULT;
if (strncmp(salt, "rounds=", sizeof "rounds=" - 1) == 0) {
unsigned long u;
char *end;
/*
* this is a deviation from the reference:
* bad rounds setting is rejected if it is
* - empty
* - unterminated (missing '$')
* - begins with anything but a decimal digit
* the reference implementation treats these bad
* rounds as part of the salt or parse them with
* strtoul semantics which may cause problems
* including non-portable hashes that depend on
* the host's value of ULONG_MAX.
*/
salt += sizeof "rounds=" - 1;
if (!isdigit(*salt))
co_return nullptr;
u = strtoul(salt, &end, 10);
if (*end != '$')
co_return nullptr;
salt = end+1;
if (u < ROUNDS_MIN)
r = ROUNDS_MIN;
else if (u > ROUNDS_MAX)
co_return nullptr;
else
r = u;
/* needed when rounds is zero prefixed or out of bounds */
sprintf(rounds, "rounds=%u$", r);
}
for (i = 0; i < SALT_MAX && salt[i] && salt[i] != '$'; i++)
/* reject characters that interfere with /etc/shadow parsing */
if (salt[i] == '\n' || salt[i] == ':')
co_return nullptr;
slen = i;
/* B = sha(key salt key) */
sha512_init(&ctx);
sha512_update(&ctx, key, klen);
sha512_update(&ctx, salt, slen);
sha512_update(&ctx, key, klen);
sha512_sum(&ctx, md);
/* A = sha(key salt repeat-B alternate-B-key) */
sha512_init(&ctx);
sha512_update(&ctx, key, klen);
sha512_update(&ctx, salt, slen);
hashmd(&ctx, klen, md);
for (i = klen; i > 0; i >>= 1)
if (i & 1)
sha512_update(&ctx, md, sizeof md);
else
sha512_update(&ctx, key, klen);
sha512_sum(&ctx, md);
/* DP = sha(repeat-key), this step takes O(klen^2) time */
sha512_init(&ctx);
for (i = 0; i < klen; i++)
sha512_update(&ctx, key, klen);
sha512_sum(&ctx, kmd);
/* DS = sha(repeat-salt) */
sha512_init(&ctx);
for (i = 0; i < 16 + md[0]; i++)
sha512_update(&ctx, salt, slen);
sha512_sum(&ctx, smd);
/* iterate A = f(A,DP,DS), this step takes O(rounds*klen) time */
for (i = 0; i < r; i++) {
sha512_init(&ctx);
if (i % 2)
hashmd(&ctx, klen, kmd);
else
sha512_update(&ctx, md, sizeof md);
if (i % 3)
sha512_update(&ctx, smd, slen);
if (i % 7)
hashmd(&ctx, klen, kmd);
if (i % 2)
sha512_update(&ctx, md, sizeof md);
else
hashmd(&ctx, klen, kmd);
sha512_sum(&ctx, md);
co_await seastar::coroutine::maybe_yield();
}
/* output is $6$rounds=n$salt$hash */
p = output;
p += sprintf(p, "$6$%s%.*s$", rounds, slen, salt);
#if 1
static const unsigned char perm[][3] = {
{0,21,42},{22,43,1},{44,2,23},{3,24,45},{25,46,4},
{47,5,26},{6,27,48},{28,49,7},{50,8,29},{9,30,51},
{31,52,10},{53,11,32},{12,33,54},{34,55,13},{56,14,35},
{15,36,57},{37,58,16},{59,17,38},{18,39,60},{40,61,19},
{62,20,41} };
for (i=0; i<21; i++) p = to64(p,
(md[perm[i][0]]<<16)|(md[perm[i][1]]<<8)|md[perm[i][2]], 4);
#else
p = to64(p, (md[0]<<16)|(md[21]<<8)|md[42], 4);
p = to64(p, (md[22]<<16)|(md[43]<<8)|md[1], 4);
p = to64(p, (md[44]<<16)|(md[2]<<8)|md[23], 4);
p = to64(p, (md[3]<<16)|(md[24]<<8)|md[45], 4);
p = to64(p, (md[25]<<16)|(md[46]<<8)|md[4], 4);
p = to64(p, (md[47]<<16)|(md[5]<<8)|md[26], 4);
p = to64(p, (md[6]<<16)|(md[27]<<8)|md[48], 4);
p = to64(p, (md[28]<<16)|(md[49]<<8)|md[7], 4);
p = to64(p, (md[50]<<16)|(md[8]<<8)|md[29], 4);
p = to64(p, (md[9]<<16)|(md[30]<<8)|md[51], 4);
p = to64(p, (md[31]<<16)|(md[52]<<8)|md[10], 4);
p = to64(p, (md[53]<<16)|(md[11]<<8)|md[32], 4);
p = to64(p, (md[12]<<16)|(md[33]<<8)|md[54], 4);
p = to64(p, (md[34]<<16)|(md[55]<<8)|md[13], 4);
p = to64(p, (md[56]<<16)|(md[14]<<8)|md[35], 4);
p = to64(p, (md[15]<<16)|(md[36]<<8)|md[57], 4);
p = to64(p, (md[37]<<16)|(md[58]<<8)|md[16], 4);
p = to64(p, (md[59]<<16)|(md[17]<<8)|md[38], 4);
p = to64(p, (md[18]<<16)|(md[39]<<8)|md[60], 4);
p = to64(p, (md[40]<<16)|(md[61]<<8)|md[19], 4);
p = to64(p, (md[62]<<16)|(md[20]<<8)|md[41], 4);
#endif
p = to64(p, md[63], 2);
*p = 0;
co_return output;
}
seastar::future<const char *> __crypt_sha512(const char *key, const char *setting, char *output)
{
static const char testkey[] = "Xy01@#\x01\x02\x80\x7f\xff\r\n\x81\t !";
static const char testsetting[] = "$6$rounds=1234$abc0123456789$";
static const char testhash[] = "$6$rounds=1234$abc0123456789$BCpt8zLrc/RcyuXmCDOE1ALqMXB2MH6n1g891HhFj8.w7LxGv.FTkqq6Vxc/km3Y0jE0j24jY5PIv/oOu6reg1";
char testbuf[128];
char *p, *q;
p = co_await sha512crypt(key, setting, output);
/* self test and stack cleanup */
q = co_await sha512crypt(testkey, testsetting, testbuf);
if (!p || q != testbuf || memcmp(testbuf, testhash, sizeof testhash))
co_return "*";
co_return p;
}

13
utils/crypt_sha512.hh Normal file
View File

@@ -0,0 +1,13 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/future.hh>
seastar::future<const char *> __crypt_sha512(const char *key, const char *setting, char *output);

View File

@@ -204,7 +204,7 @@ public:
public:
template <typename Clock, typename Duration>
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr) {
future<> wait_for_message(std::chrono::time_point<Clock, Duration> timeout, abort_source* as = nullptr, std::source_location loc = std::source_location::current()) {
if (!_shared_data) {
on_internal_error(errinj_logger, "injection_shared_data is not initialized");
}
@@ -234,7 +234,8 @@ public:
throw;
}
catch (const std::exception& e) {
on_internal_error(errinj_logger, "Error injection wait_for_message timeout: " + std::string(e.what()));
on_internal_error(errinj_logger, fmt::format("Error injection [{}] wait_for_message timeout: Called from `{}` @ {}:{}:{:d}: {}",
_shared_data->injection_name, loc.function_name(), loc.file_name(), loc.line(), loc.column(), e.what()));
}
++_read_messages_counter;
}

View File

@@ -174,13 +174,6 @@ future<> print_with_extra_array(const rjson::value& value,
seastar::output_stream<char>& os,
size_t max_nested_level = default_max_nested_level);
// Returns a string_view to the string held in a JSON value (which is
// assumed to hold a string, i.e., v.IsString() == true). This is a view
// to the existing data - no copying is done.
inline std::string_view to_string_view(const rjson::value& v) {
return std::string_view(v.GetString(), v.GetStringLength());
}
// Copies given JSON value - involves allocation
rjson::value copy(const rjson::value& value);
@@ -236,6 +229,27 @@ rjson::value parse_yieldable(chunked_content&&, size_t max_nested_level = defaul
rjson::value from_string(const char* str, size_t size);
rjson::value from_string(std::string_view view);
// Returns a string_view to the string held in a JSON value (which is
// assumed to hold a string, i.e., v.IsString() == true). This is a view
// to the existing data - no copying is done.
inline std::string_view to_string_view(const rjson::value& v) {
return std::string_view(v.GetString(), v.GetStringLength());
}
// Those functions must be called on json string object.
// They make a copy of underlying data so it's safe to destroy
// rjson::value afterwards.
//
// Rapidjson's GetString method alone is not good enough
// for string conversion because it needs to scan the string
// unnecessarily and GetStringLength could be used to avoid that.
inline sstring to_sstring(const rjson::value& str) {
return sstring(str.GetString(), str.GetStringLength());
}
inline std::string to_string(const rjson::value& str) {
return std::string(str.GetString(), str.GetStringLength());
}
// Returns a pointer to JSON member if it exists, nullptr otherwise
rjson::value* find(rjson::value& value, std::string_view name);
const rjson::value* find(const rjson::value& value, std::string_view name);
@@ -377,7 +391,7 @@ rjson::value from_string_map(const std::map<sstring, sstring>& map);
sstring quote_json_string(const sstring& value);
inline bytes base64_decode(const value& v) {
return ::base64_decode(std::string_view(v.GetString(), v.GetStringLength()));
return ::base64_decode(to_string_view(v));
}
// A writer which allows writing json into an std::ostream in a streaming manner.

View File

@@ -10,6 +10,7 @@
#include "utils/http.hh"
#include "utils/s3/client.hh"
#include "utils/s3/default_aws_retry_strategy.hh"
#include "utils/rjson.hh"
#include <rapidjson/document.h>
#include <rapidjson/error/en.h>
#include <seastar/http/client.hh>
@@ -72,7 +73,7 @@ future<> instance_profile_credentials_provider::update_credentials() {
}
s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sstring& creds_response) {
rapidjson::Document document;
rjson::document document;
document.Parse(creds_response.data());
if (document.HasParseError()) {
@@ -81,9 +82,9 @@ s3::aws_credentials instance_profile_credentials_provider::parse_creds(const sst
}
// Retrieve credentials
return {.access_key_id = document["AccessKeyId"].GetString(),
.secret_access_key = document["SecretAccessKey"].GetString(),
.session_token = document["Token"].GetString(),
return {.access_key_id = rjson::to_string(document["AccessKeyId"]),
.secret_access_key = rjson::to_string(document["SecretAccessKey"]),
.session_token = rjson::to_string(document["Token"]),
// Set the expiration to one minute earlier to ensure credentials are renewed slightly before they expire
.expires_at = seastar::lowres_clock::now() + std::chrono::seconds(session_duration - 60)};
}

View File

@@ -152,7 +152,7 @@ seastar::future<bool> client::check_status() {
}
auto resp = co_await std::move(f);
auto json = rjson::parse(std::move(resp.content));
co_return json.IsString() && json.GetString() == std::string_view("SERVING");
co_return json.IsString() && rjson::to_string_view(json) == "SERVING";
}
seastar::future<> client::close() {