Compare commits

...

42 Commits

Author SHA1 Message Date
Jenkins Promoter
83babc20e3 Update ScyllaDB version to: 2025.4.0-rc2 2025-10-15 15:43:09 +03:00
Ernest Zaslavsky
04b9e98ef8 s3_client: track memory starvation in background filling fiber
Introduce a counter metric to monitor instances where the background
filling fiber is blocked due to insufficient memory in the S3 client.

Closes scylladb/scylladb#26466

(cherry picked from commit 413739824f)

Closes scylladb/scylladb#26555
2025-10-15 12:03:09 +02:00
Michał Chojnowski
de8c2a8196 test/boost/sstable_compressor_factory_test: fix thread-unsafe usage of Boost.Test
It turns out that Boost assertions are thread-unsafe,
(and can't be used from multiple threads concurrently).
This causes the test to fail with cryptic log corruptions sometimes.
Fix that by switching to thread-safe checks.

Fixes scylladb/scylladb#24982

Closes scylladb/scylladb#26472

(cherry picked from commit 7c6e84e2ec)

Closes scylladb/scylladb#26554
2025-10-15 12:08:54 +03:00
Jenkins Promoter
dd2e8a2105 Update pgo profiles - aarch64 2025-10-15 05:03:22 +03:00
Jenkins Promoter
90fd618967 Update pgo profiles - x86_64 2025-10-15 04:31:33 +03:00
Piotr Wieczorek
c191c31682 alternator: Correct RCU undercount in BatchGetItem
The `describe_multi_item` function treated the last reference-captured
argument as the number of used RCU half units. The caller
`batch_get_item`, however, expected this parameter to hold an item size.
This RCU value was then passed to
`rcu_consumed_capacity_counter::get_half_units`, treating the
already-calculated RCU integer as if it were a size in bytes.

This caused a second conversion that undercounted the true RCU. During
conversion, the number of bytes is divided by `RCU_BLOCK_SIZE_LENGTH`
(=4KB), so the double conversion divided the number of bytes by 16 MB.

The fix removes the second conversion in `describe_multi_item` and
changes the API of `describe_multi_item`.

Fixes: https://github.com/scylladb/scylladb/pull/25847

Closes scylladb/scylladb#25842

(cherry picked from commit a55c5e9ec7)

Closes scylladb/scylladb#26539
2025-10-14 11:53:09 +03:00
Dawid Mędrek
a4fd7019e3 replica/database: Fix description of validate_tablet_views_indexes
The current description is not accurate: the function doesn't throw
an exception if there's an invalid materialized view. Instead, it
simply logs the keyspaces that violate the requirement.

Furthermore, the experimental feature `views-with-tablets` is no longer
necessary for considering a materialized view as valid. It was dropped
in scylladb/scylladb@b409e85c20. The
replacement for it is the cluster feature `VIEWS_WITH_TABLETS`.

Fixes scylladb/scylladb#26420

Closes scylladb/scylladb#26421

(cherry picked from commit a9577e4d52)

Closes scylladb/scylladb#26476
2025-10-14 11:52:34 +03:00
Pavel Emelyanov
e18072d4b8 Merge '[Backport 2025.4] service/qos: set long timeout for auth queries on SL cache update' from Scylladb[bot]
pass an appropriate query state for auth queries called from service
level cache reload. we use the function qos_query_state to select a
query_state based on caller context - for internal queries, we set a
very long timeout.

the service level cache reload is called from group0 reload. we want it
to have a long timeout instead of the default 5 seconds for auth
queries, because we don't have strict latency requirement on the one
hand, and on the other hand a timeout exception is undesired in the
group0 reload logic and can break group0 on the node.

Fixes https://github.com/scylladb/scylladb/issues/25290

backport possible to improve stability

- (cherry picked from commit a1161c156f)

- (cherry picked from commit 3c3dd4cf9d)

- (cherry picked from commit ad1a5b7e42)

Parent PR: #26180

Closes scylladb/scylladb#26479

* github.com:scylladb/scylladb:
  service/qos: set long timeout for auth queries on SL cache update
  auth: add query_state parameter to query functions
  auth: refactor query_all_directly_granted
2025-10-13 15:26:21 +03:00
Robert Bindar
7353aa5aa5 Make scylla_io_setup detect request size for best write IOPS
We noticed during work on scylladb/seastar#2802 that on i7i family
(later proved that it's valid for i4i family as well),
the disks are reporting the physical sector sizes incorrectly
as 512bytes, whilst we proved we can render much better write IOPS with
4096bytes.

This is not the case on AWS i3en family where the reported 512bytes
physical sector size is also the size we can achieve the best write IOPS.

This patch works around this issue by changing `scylla_io_setup` to parse
the instance type out of `/sys/devices/virtual/dmi/id/product_name`
and run iotune with the correct request size based on the instance type.

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>

Closes scylladb/scylladb#25315

(cherry picked from commit 2c74a6981b)

Closes scylladb/scylladb#26474
2025-10-13 15:25:16 +03:00
Michał Chojnowski
ec0b31b193 docs: fix a parameter name in API calls in sstable-dictionary-compression.rst
The correct argument name is `cf`, not `table`.

Fixes scylladb/scylladb#25275

Closes scylladb/scylladb#26447

(cherry picked from commit 87e3027c81)

Closes scylladb/scylladb#26495
2025-10-12 21:10:25 +03:00
Patryk Jędrzejczak
b5c3e2465f test: test_raft_no_quorum: test_can_restart: deflake the read barrier call
Expecting the group 0 read barrier to succeed with a timeout of 1s, just
after restarting 3 out of 5 voters, turned out to be flaky. In some
unlikely scenarios, such as multiple vote splits, the Raft leader
election could finish after the read barrier times out.

To deflake the test, we increase the timeout of Raft operations back to
300s for read barriers we expect to succeed.

Fixes #26457

Closes scylladb/scylladb#26489

(cherry picked from commit 5f68b9dc6b)

Closes scylladb/scylladb#26522
2025-10-12 21:02:02 +03:00
Asias He
3cae4a21ab repair: Rename incremental mode name
Using the name regular as the incremental mode could be confusing, since
regular might be interpreted as the non-incremental repair. It is better
to use incremental directly.

Before:

- regular (standard incremental repair)
- full (full incremental repair)
- disabled (incremental repair disabled)

After:

- incremental (standard incremental repair)
- full (full incremental repair)
- disabled (incremental repair disabled)

Fixes #26503

Closes scylladb/scylladb#26504

(cherry picked from commit 13dd88b010)

Closes scylladb/scylladb#26521
2025-10-12 21:01:05 +03:00
Ernest Zaslavsky
5c6335e029 s3_client: fix when condition to prevent infinite locking
Refine condition variable predicate in filling fiber to avoid
indefinite waiting when `close` is invoked.

Closes scylladb/scylladb#26449

(cherry picked from commit c2bab430d7)

Closes scylladb/scylladb#26497
2025-10-12 16:19:48 +03:00
Avi Kivity
de4975d181 dist: scylla_raid_setup: don't override XFS block size on modern kernels
In 6977064693 ("dist: scylla_raid_setup:
reduce xfs block size to 1k"), we reduced the XFS block size to 1k when
possible. This is because commitlog wants to write the smallest amount
of padding it can, and older Linux could only write a multiple of the
block size. Modern Linux [1] can O_DIRECT overwrite a range smaller than
a filesystem block.

However, this doesn't play well with some SSDs that have 512 byte
logical sector size and 4096 byte physical sector size - it causes them
to issue read-modify-writes.

To improve the situation, if we detect that the kernel is recent enough,
format the filesystem with its default block size, which should be optimal.

Note that commitlog will still issue sub-4k writes, which can translate
to RMW. There, we believe that the amplification is reduced since
sequential sub-physical-sector writes can be merged, and that the overhead
from commitlog space amplification is worse than the RMW overhead.

Tested on AWS i4i.large. fsqual report:

```
memory DMA alignment:    512
disk DMA alignment:      512
filesystem block size:   4096
context switch per write io (size-changing, append, blocksize 4096, iodepth 1): 0.0003 (GOOD)
context switch per write io (size-changing, append, blocksize 4096, iodepth 3): 0.7961 (BAD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 3): 0 (GOOD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 7): 0.0001 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.125 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 4096, iodepth 1): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 4096, iodepth 3): 0.8006 (BAD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 3): 0.0001 (GOOD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 7): 0 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.125 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per read io (size-changing, append, blocksize 512, iodepth 30): 0 (GOOD)
```

The sub-block overwrite cases are GOOD.

In comparison, the fsqual report for 1k (similar):

```
memory DMA alignment:    512
disk DMA alignment:      512
filesystem block size:   1024
context switch per write io (size-changing, append, blocksize 1024, iodepth 1): 0.0005 (GOOD)
context switch per write io (size-changing, append, blocksize 1024, iodepth 3): 0.7948 (BAD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 3): 0.0015 (GOOD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 7): 0.0022 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.4999 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 1024, iodepth 1): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 1024, iodepth 3): 0.798 (BAD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 3): 0.0012 (GOOD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 7): 0.0019 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.5 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per read io (size-changing, append, blocksize 512, iodepth 30): 0 (GOOD)
```

Fixes #25441.

[1] ed1128c2d0

Closes scylladb/scylladb#25445

(cherry picked from commit 5d1846d783)

Closes scylladb/scylladb#26471
2025-10-12 16:17:16 +03:00
Piotr Dulikowski
1f73e18eaf Merge '[Backport 2025.4] db/view: Require rf_rack_valid_keyspaces when creating materialized view' from Scylladb[bot]
Materialized views are currently in the experimental phase and using them
in tablet-based keyspaces requires starting Scylla with an experimental feature,
`views-with-tablets`. Any attempts to create a materialized view or secondary
index when it's not enabled will fail with an appropriate error.

After considerable effort, we're drawing close to bringing views out of the
experimental phase, and the experimental feature will no longer be needed.
However, materialized views in tablet-based keyspaces will still be restricted,
and creating them will only be possible after enabling the configuration option
`rf_rack_valid_keyspaces`. That's what we do in this PR.

In this patch, we adjust existing tests in the tree to work with the new
restriction. That shouldn't have been necessary because we've already seemingly
adjusted all of them to work with the configuration option, but some tests hid
well. We fix that mistake now.

After that, we introduce the new restriction. What's more, when starting Scylla,
we verify that there is no materialized view that would violate the contract.
If there are some that do, we list them, notify the user, and refuse to start.

High-level implementation strategy:

1. Name the restrictions in form of a function.
2. Adjust existing tests.
3. Restrict materialized views by both the experimental feature
   and the configuration option. Add validation test.
4. Drop the requirement for the experimental feature. Adjust the added test
   and add a new one.
5. Update the user documentation.

Fixes scylladb/scylladb#23030

Backport: 2025.4, as we are aiming to support materialized views for tablets from that version.

- (cherry picked from commit a1254fb6f3)

- (cherry picked from commit d6fcd18540)

- (cherry picked from commit 994f09530f)

- (cherry picked from commit 6322b5996d)

- (cherry picked from commit 71606ffdda)

- (cherry picked from commit 00222070cd)

- (cherry picked from commit 288be6c82d)

- (cherry picked from commit b409e85c20)

Parent PR: #25802

Closes scylladb/scylladb#26416

* github.com:scylladb/scylladb:
  view: Stop requiring experimental feature
  db/view: Verify valid configuration for tablet-based views
  db/view: Require rf_rack_valid_keyspaces when creating view
  test/cluster/random_failures: Skip creating secondary indexes
  test/cluster/mv: Mark test_mv_rf_change as skipped
  test/cluster: Adjust MV tests to RF-rack-validity
  test/boost/schema_loader_test.cc: Explicitly enable rf_rack_valid_keyspaces
  db/view: Name requirement for views with tablets
2025-10-12 08:20:20 +02:00
Michał Jadwiszczak
931f9ca3db db/view/view_building_worker: update state again if some batch was finished during the update
There was a race between loop in `view_building_worker::run_view_building_state_observer()`
and a moment when a batch was finishing its work (`.finally()` callback
in `view_building_worker::batch::start()`).

State observer waits on `_vb_state_machine.event` CV and when it's
awoken, it takes group0 read apply mutex and updates its state. While
updating the state, the observer looks at `batch::state` field and
reacts to it accordingly.
On the other hand, when a batch finishes its work, it sets `state` field
to `batch_state::finished` and does a broadcast on
`_vb_state_machine.event` CV.
So if the batch will execute the callback in `.finally()` while the
observer is updating its state, the observer may miss the event on the
CV and it will never notice that the batch was finished.

This patch fixes this by adding a `some_batch_finished` flag. Even if
the worker won't see an event on the CV, it will notice that the flag
was set and it will do next iteration.

Fixes scylladb/scylladb#26204

Closes scylladb/scylladb#26289

(cherry picked from commit 8d0d53016c)

Closes scylladb/scylladb#26500
2025-10-10 09:53:22 +02:00
Piotr Dulikowski
3775e8e49a Merge '[Backport 2025.4] db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground' from Scylladb[bot]
db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground
This patch moves `discover_existing_staging_sstables()` to be executed
from main level, instead of running it on the background fiber.

This method need to be run only once during the startup to collect
existing staging sstables, so there is no need to do it in the
background. This change will increase debugability of any further issues
related to it (like https://github.com/scylladb/scylladb/issues/26403).

Fixes https://github.com/scylladb/scylladb/issues/26417

The patch should be backported to 2025.4

- (cherry picked from commit 575dce765e)

- (cherry picked from commit 84e4e34d81)

Parent PR: #26446

Closes scylladb/scylladb#26501

* github.com:scylladb/scylladb:
  db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground
  db/view/view_building_worker: futurize and rename `start_background_fibers()`
2025-10-10 09:52:45 +02:00
Michał Jadwiszczak
f4d9513e0f db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground
This patch moves `discover_existing_staging_sstables()` to be executed
from main level, instead of running it on the background fiber.

This method need to be run only once during the startup to collect
existing staging sstables, so there is no need to do it in the
background. This change will increase debugability of any further issues
related to it (like scylladb/scylladb#26403).

Fixes scylladb/scylladb#26417

(cherry picked from commit 84e4e34d81)
2025-10-09 22:39:33 +00:00
Michał Jadwiszczak
5eeb1e3e76 db/view/view_building_worker: futurize and rename start_background_fibers()
Next commit will move `discover_existing_staging_sstables()`
to the foreground, so to prepare for this we need to futurize
`start_background_fibers()` method and change its name to better reflect
its purpose.

(cherry picked from commit 575dce765e)
2025-10-09 22:39:32 +00:00
Patryk Jędrzejczak
989aa0b237 raft topology: make the voter handler consider only group 0 members
In the Raft-based recovery procedure, we create a new group 0 and add
live nodes to it one by one. This means that for some time there are
nodes which belong to the topology, but not to the new group 0. The
voter handler running on the recovery leader incorrectly considers these
nodes while choosing voters.

The consequences:
- misleading logs, for example, "making servers {<ID of a non-member>}
  voters", where the non-member won't become a voter anyway,
- increased chance of majority loss during the recovery procedure, for
  example, all 3 nodes that first joined the new group 0 are in the same
  dc and rack, but only one of them becomes a voter because the voter
  handler tries to make non-members in other dcs/racks voters.

Fixes #26321

Closes scylladb/scylladb#26327

(cherry picked from commit 67d48a459f)

Closes scylladb/scylladb#26428
2025-10-09 18:17:49 +02:00
Michael Litvak
eba0a2cf72 service/qos: set long timeout for auth queries on SL cache update
pass an appropriate query state for auth queries called from service
level cache reload. we use the function qos_query_state to select a
query_state based on caller context - for internal queries, we set a
very long timeout.

the service level cache reload is called from group0 reload. we want it
to have a long timeout instead of the default 5 seconds for auth
queries, because we don't have strict latency requirement on the one
hand, and on the other hand a timeout exception is undesired in the
group0 reload logic and can break group0 on the node.

Fixes scylladb/scylladb#25290

(cherry picked from commit ad1a5b7e42)
2025-10-09 12:48:45 +00:00
Michael Litvak
3a9eb9b65f auth: add query_state parameter to query functions
add a query_state parameter to several auth functions that execute
internal queries. currently the queries use the
internal_distributed_query_state() query state, and we maintain this as
default, but we want also to be able to pass a query state from the
caller.

in particular, the auth queries currently use a timeout of 5 seconds,
and we will want to set a different timeout when executed in some
different context.

(cherry picked from commit 3c3dd4cf9d)
2025-10-09 12:48:45 +00:00
Michael Litvak
f75541b7b3 auth: refactor query_all_directly_granted
rewrite query_all_directly_granted to use execute_internal instead of
query_internal in a style that is more consistent with the rest of the
module.

This will also be useful for a later change because execute_internal
accepts an additional parameter of query_state.

(cherry picked from commit a1161c156f)
2025-10-09 12:48:45 +00:00
Michał Chojnowski
879db5855d utils/config_file: fix a missing allowed_values propagation in one of named_value constructors
In one of the constructors of `named_value`, the `allowed_values`
argument isn't used.

(This means that if some config entry uses this constructor,
the values aren't validated on the config layer,
and might give some lower layer a bad surprise).

Fix that.

Fixes scyllladb/scylladb#26371

Closes scylladb/scylladb#26196

(cherry picked from commit 3b338e36c2)

Closes scylladb/scylladb#26425
2025-10-09 13:19:41 +03:00
Michał Chojnowski
22d3ee5670 sstables/trie: actually apply BYPASS CACHE to index reads
BYPASS CACHE is implemented for `bti_index_reader` by
giving it its own private `cached_file` wrappers over
Partitions.db and Rows.db, instead of passing it
the shared `cached_file` owned by the sstable.

But due to an oversight, the private `cached_file`s aren't
constructed on top of the raw Partitions.db and Rows.db
files, but on top of `cached_file_impl` wrappers around
those files. Which means that BYPASS CACHE doesn't
actually do its job.

Tests based on `scylla_index_page_cache_*` metrics
and on CQL tracing still see the reads from the private
files as "cache misses", but those misses are served
from the shared cached files anyway, so the tests don't see
the problem. In this commit we extend `test_bti_index.py`
with a check that looks at reactor's `io_queue` metrics
instead, and catches the problem.

Fixes scylladb/scylladb#26372

Closes scylladb/scylladb#26373

(cherry picked from commit dbddba0794)

Closes scylladb/scylladb#26424
2025-10-09 13:17:29 +03:00
Dawid Mędrek
2bdf792f8e view: Stop requiring experimental feature
We modify the requirements for using materialized views in tablet-based
keyspaces. Before, it was necessary to enable the configuration option
`rf_rack_valid_keyspaces`, having the cluster feature `VIEWS_WITH_TABLETS`
enabled, and using the experimental feature `views-with-tablets`.
We drop the last requirement.

We adjust code to that change and provide a new validation test.
We also update the user documentation to reflect the changes.

Fixes scylladb/scylladb#23030

(cherry picked from commit b409e85c20)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
2e2d1f17bb db/view: Verify valid configuration for tablet-based views
Creating a materialized view or a secondary index in a tablet-based
keyspace requires that the user enabled two options:

* experimental feature `views-with-tablets`,
* configuration option `rf_rack_vaid_keyspaces`.

Because the latter has only become a necessity recently (in this series),
it's possible that there are already existing materialized views that
violate it.

We add a new check at start-up that iterates over existing views and
makes sure that that is not the case. Otherwise, Scylla notifies the user
of the problem.

(cherry picked from commit 288be6c82d)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
e9aba62cc5 db/view: Require rf_rack_valid_keyspaces when creating view
We extend the requirements for being able to create materialized views
and secondary indexes in tablet-based keyspaces. It's now necessary to
enable the configuration option `rf_rack_valid_keyspaces`. This is
a stepping stone towards bringing materialized views and secondary
indexes with tablets out of the experimental phase.

We add a validation test to verify the changes.

Refs scylladb/scylladb#23030

(cherry picked from commit 00222070cd)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
a7d0cf6dd0 test/cluster/random_failures: Skip creating secondary indexes
Materialized views are going to require the configuration option
`rf_rack_valid_keyspaces` when being created in tablet-based keyspaces.
Since random-failure tests still haven't been adjusted to work with it,
and because it's not trivial, we skip the cases when we end up creating
or dropping an index.

(cherry picked from commit 71606ffdda)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
6e94c075e3 test/cluster/mv: Mark test_mv_rf_change as skipped
The test will not work with `rf_rack_valid_keyspaces`. Since the option
is going to become a requirement for using views with tablets, the test
will need to be rewritten to take that into consideration. Since that
adjustment doesn't seem trivial, we mark the test as skipped for the
time being.

(cherry picked from commit 6322b5996d)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
f90ca413a0 test/cluster: Adjust MV tests to RF-rack-validity
Some of the new tests covering materialized views explicitly disabled
the configuration option `rf_rack_valid_keyspaces`. It's going to become
a new requirement for views with tablets, so we adjust those tests and
enable the option. There is one exception, the test:

`cluster/mv/test_mv_topology_change.py::test_mv_rf_change`

We handle it separately in the following commit.

(cherry picked from commit 994f09530f)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
5e0f5f4b44 test/boost/schema_loader_test.cc: Explicitly enable rf_rack_valid_keyspaces
The test cases in the file aren't run via an existing interface like
`do_with_cql_env`, but they rely on a more direct approach -- calling
one of the schema loader tools. Because of that, they manage the
`db::config` object on their own and don't enable the configuration
option `rf_rack_valid_keyspaces`.

That hasn't been a problem so far since the test doesn't attempt to
create RF-rack-invalid keyspaces anyway. However, in an upcoming commit,
we're going to further restrict views with tablets and require that the
option is enabled.

To prepare for that, we enable the option in all test cases. It's only
necessary in a small subset of them, but it won't hurt the enforce it
everywhere, so let's do that.

Refs scylladb/scylladb#23958

(cherry picked from commit d6fcd18540)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
5d32fef3ae db/view: Name requirement for views with tablets
We add a named requirement, a function, for materialized views with tablets.
It decides whether we can create views and secondary indexes in a given
keyspace. It's a stepping stone towards modifying the requirements for it.

This way, we keep the code in one place, so it's not possible to forget
to modify it somewhere. It also makes it more organized and concise.

(cherry picked from commit a1254fb6f3)
2025-10-06 13:19:53 +00:00
Botond Dénes
1b5c46a796 Merge '[Backport 2025.4] test: dtest: test_limits.py: migrate from dtest' from Dario Mirovic
Backport motivation:
scylla-dtest PR [limits_test.py: remove tests already ported to scylladb repo](https://github.com/scylladb/scylla-dtest/pull/6232) that removes migrated tests got merged before branch-2025.4 separation
scylladb PR [test: dtest: test_limits.py: migrate from dtest](https://github.com/scylladb/scylladb/pull/26077) got merged after branch-2025.4 separation
This caused the tests to be fully removed from branch-2025.4. This backport PR makes sure the tests are present in scylladb branch-2025.4.

This PR migrates limits tests from dtest to this repository.

One reason is that there is an ongoing effort to migrate tests from dtest to here.

Debug logs are enabled on `test_max_cells` for `lsa-timing` logger, to have more information about memory reclaim operation times and memory chunk sizes. This will allow analysis of their value distributions, which can be helpful with debugging if the issue reoccurs.

Also, scylladb keeps sql files with metrics which, with some modifications, can be used to track metrics over time for some tests. This would show if there are pauses and spikes or the test performance is more or less consistent over time.

scylla-dtest PR that removes migrated tests:
[limits_test.py: remove tests already ported to scylladb repo #6232](https://github.com/scylladb/scylla-dtest/pull/6232)

Fixes #25097

- (cherry picked from commit 82e9623911)
- (cherry picked from commit 70128fd5c7)
- (cherry picked from commit 554fd5e801)
- (cherry picked from commit b3347bcf84)

Parent PR: #26077

Closes scylladb/scylladb#26359

* github.com:scylladb/scylladb:
  test: dtest: limits_test.py: test_max_cells log level
  test: dtest: limits_test.py: make the tests work
  test: dtest: test_limits.py: remove test that are not being migrated
  test: dtest: copy unmodified limits_test.py
2025-10-06 15:46:49 +03:00
Jenkins Promoter
f2c5874fa9 Update ScyllaDB version to: 2025.4.0-rc1 2025-10-03 21:26:12 +03:00
Dario Mirovic
664cdd3d99 test: dtest: limits_test.py: test_max_cells log level
Set `lsa-timing` logger log level to `debug`. This will help with
the analysis of the whole spectrum of memory reclaim operation
times and memory sizes.

Refs #25097

(cherry picked from commit b3347bcf84)
2025-10-01 22:40:34 +02:00
Dario Mirovic
4ea6c51fb1 test: dtest: limits_test.py: make the tests work
Remove unused imports and markers.
Remove Apache license header.

Enable the test in suite.yaml for `dev` and `debug` modes.

Refs #25097

(cherry picked from commit 554fd5e801)
2025-10-01 22:40:29 +02:00
Dario Mirovic
eb9babfd4a test: dtest: test_limits.py: remove test that are not being migrated
Refs #25097

(cherry picked from commit 70128fd5c7)
2025-10-01 22:40:24 +02:00
Dario Mirovic
558f460517 test: dtest: copy unmodified limits_test.py
Copy limits_test.py from scylla-dtest to test/cluster/dtest/limits_test.py.
Add license header.

Disable it for `debug`, `dev`, and `release` mode.

Refs #25097

(cherry picked from commit 82e9623911)
2025-10-01 22:40:16 +02:00
Jenkins Promoter
a9f4024c1b Update pgo profiles - aarch64 2025-10-01 04:42:23 +03:00
Jenkins Promoter
6969918d31 Update pgo profiles - x86_64 2025-10-01 04:20:49 +03:00
Luis Freitas
d69edfcd34 Update ScyllaDB version to: 2025.4.0-rc0 2025-09-30 18:51:59 +03:00
58 changed files with 916 additions and 177 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2025.4.0-dev
VERSION=2025.4.0-rc2
if test -f version
then

View File

@@ -3636,16 +3636,16 @@ future<std::vector<rjson::value>> executor::describe_multi_item(schema_ptr schem
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
uint64_t& rcu_half_units) {
noncopyable_function<void(uint64_t)> item_callback) {
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build();
std::vector<rjson::value> ret;
for (auto& result_row : result_set->rows()) {
rjson::value item = rjson::empty_object();
rcu_consumed_capacity_counter consumed_capacity;
describe_single_item(*selection, result_row, *attrs_to_get, item, &consumed_capacity._total_bytes);
rcu_half_units += consumed_capacity.get_half_units();
uint64_t item_length_in_bytes = 0;
describe_single_item(*selection, result_row, *attrs_to_get, item, &item_length_in_bytes);
item_callback(item_length_in_bytes);
ret.push_back(std::move(item));
co_await coroutine::maybe_yield();
}
@@ -4584,7 +4584,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
}
};
std::vector<table_requests> requests;
std::vector<std::vector<uint64_t>> responses_sizes;
uint batch_size = 0;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs(get_table_from_batch_request(_proxy, it));
@@ -4612,11 +4611,10 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
// If we got here, all "requests" are valid, so let's start the
// requests for the different partitions all in parallel.
std::vector<future<std::vector<rjson::value>>> response_futures;
responses_sizes.resize(requests.size());
size_t responses_sizes_pos = 0;
for (const auto& rs : requests) {
responses_sizes[responses_sizes_pos].resize(rs.requests.size());
size_t pos = 0;
std::vector<uint64_t> consumed_rcu_half_units_per_table(requests.size());
for (size_t i = 0; i < requests.size(); i++) {
const table_requests& rs = requests[i];
bool is_quorum = rs.cl == db::consistency_level::LOCAL_QUORUM;
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_histogram.add(rs.requests.size());
for (const auto &r : rs.requests) {
@@ -4639,16 +4637,17 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
query::tombstone_limit(_proxy.get_tombstone_limit()));
command->allow_limit = db::allow_per_partition_rate_limit::yes;
const auto item_callback = [is_quorum, &rcus_per_table = consumed_rcu_half_units_per_table[i]](uint64_t size) {
rcus_per_table += rcu_consumed_capacity_counter::get_half_units(size, is_quorum);
};
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, &response_size = responses_sizes[responses_sizes_pos][pos]] (service::storage_proxy::coordinator_query_result qr) mutable {
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, item_callback = std::move(item_callback)] (service::storage_proxy::coordinator_query_result qr) mutable {
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), response_size);
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), std::move(item_callback));
});
pos++;
response_futures.push_back(std::move(f));
}
responses_sizes_pos++;
}
// Wait for all requests to complete, and then return the response.
@@ -4660,14 +4659,11 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
rjson::value response = rjson::empty_object();
rjson::add(response, "Responses", rjson::empty_object());
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
size_t rcu_half_units;
auto fut_it = response_futures.begin();
responses_sizes_pos = 0;
rjson::value consumed_capacity = rjson::empty_array();
for (const auto& rs : requests) {
for (size_t i = 0; i < requests.size(); i++) {
const table_requests& rs = requests[i];
std::string table = table_name(*rs.schema);
size_t pos = 0;
rcu_half_units = 0;
for (const auto &r : rs.requests) {
auto& pk = r.first;
auto& cks = r.second;
@@ -4682,7 +4678,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
for (rjson::value& json : results) {
rjson::push_back(response["Responses"][table], std::move(json));
}
rcu_half_units += rcu_consumed_capacity_counter::get_half_units(responses_sizes[responses_sizes_pos][pos], rs.cl == db::consistency_level::LOCAL_QUORUM);
} catch(...) {
eptr = std::current_exception();
// This read of potentially several rows in one partition,
@@ -4706,8 +4701,8 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second));
}
}
pos++;
}
uint64_t rcu_half_units = consumed_rcu_half_units_per_table[i];
_stats.rcu_half_units_total += rcu_half_units;
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->rcu_half_units_total += rcu_half_units;
@@ -4717,7 +4712,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
rjson::add(entry, "CapacityUnits", rcu_half_units*0.5);
rjson::push_back(consumed_capacity, std::move(entry));
}
responses_sizes_pos++;
}
if (should_add_rcu) {

View File

@@ -228,12 +228,15 @@ public:
const std::optional<attrs_to_get>&,
uint64_t* = nullptr);
// Converts a multi-row selection result to JSON compatible with DynamoDB.
// For each row, this method calls item_callback, which takes the size of
// the item as the parameter.
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
uint64_t& rcu_half_units);
noncopyable_function<void(uint64_t)> item_callback = {});
static void describe_single_item(const cql3::selection::selection&,
const std::vector<managed_bytes_opt>&,

View File

@@ -2924,7 +2924,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -233,9 +233,9 @@ future<role_set> ldap_role_manager::query_granted(std::string_view grantee_name,
}
future<role_to_directly_granted_map>
ldap_role_manager::query_all_directly_granted() {
ldap_role_manager::query_all_directly_granted(::service::query_state& qs) {
role_to_directly_granted_map result;
auto roles = co_await query_all();
auto roles = co_await query_all(qs);
for (auto& role: roles) {
auto granted_set = co_await query_granted(role, recursive_role_query::no);
for (auto& granted: granted_set) {
@@ -247,8 +247,8 @@ ldap_role_manager::query_all_directly_granted() {
co_return result;
}
future<role_set> ldap_role_manager::query_all() {
return _std_mgr.query_all();
future<role_set> ldap_role_manager::query_all(::service::query_state& qs) {
return _std_mgr.query_all(qs);
}
future<> ldap_role_manager::create_role(std::string_view role_name) {
@@ -311,12 +311,12 @@ future<bool> ldap_role_manager::can_login(std::string_view role_name) {
}
future<std::optional<sstring>> ldap_role_manager::get_attribute(
std::string_view role_name, std::string_view attribute_name) {
return _std_mgr.get_attribute(role_name, attribute_name);
std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
return _std_mgr.get_attribute(role_name, attribute_name, qs);
}
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name) {
return _std_mgr.query_attribute_for_all(attribute_name);
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) {
return _std_mgr.query_attribute_for_all(attribute_name, qs);
}
future<> ldap_role_manager::set_attribute(

View File

@@ -75,9 +75,9 @@ class ldap_role_manager : public role_manager {
future<role_set> query_granted(std::string_view, recursive_role_query) override;
future<role_to_directly_granted_map> query_all_directly_granted() override;
future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
future<role_set> query_all() override;
future<role_set> query_all(::service::query_state&) override;
future<bool> exists(std::string_view) override;
@@ -85,9 +85,9 @@ class ldap_role_manager : public role_manager {
future<bool> can_login(std::string_view) override;
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view) override;
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view, ::service::query_state&) override;
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view) override;
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view, ::service::query_state&) override;
future<> set_attribute(std::string_view, std::string_view, std::string_view, ::service::group0_batch& mc) override;

View File

@@ -78,11 +78,11 @@ future<role_set> maintenance_socket_role_manager::query_granted(std::string_view
return operation_not_supported_exception<role_set>("QUERY GRANTED");
}
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted() {
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state&) {
return operation_not_supported_exception<role_to_directly_granted_map>("QUERY ALL DIRECTLY GRANTED");
}
future<role_set> maintenance_socket_role_manager::query_all() {
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state&) {
return operation_not_supported_exception<role_set>("QUERY ALL");
}
@@ -98,11 +98,11 @@ future<bool> maintenance_socket_role_manager::can_login(std::string_view role_na
return make_ready_future<bool>(true);
}
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<std::optional<sstring>>("GET ATTRIBUTE");
}
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name) {
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<role_manager::attribute_vals>("QUERY ATTRIBUTE");
}

View File

@@ -53,9 +53,9 @@ public:
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
virtual future<role_set> query_all() override;
virtual future<role_set> query_all(::service::query_state&) override;
virtual future<bool> exists(std::string_view role_name) override;
@@ -63,9 +63,9 @@ public:
virtual future<bool> can_login(std::string_view role_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;

View File

@@ -17,12 +17,17 @@
#include <seastar/core/format.hh>
#include <seastar/core/sstring.hh>
#include "auth/common.hh"
#include "auth/resource.hh"
#include "cql3/description.hh"
#include "seastarx.hh"
#include "exceptions/exceptions.hh"
#include "service/raft/raft_group0_client.hh"
namespace service {
class query_state;
};
namespace auth {
struct role_config final {
@@ -167,9 +172,9 @@ public:
/// (role2, role3)
/// }
///
virtual future<role_to_directly_granted_map> query_all_directly_granted() = 0;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state& = internal_distributed_query_state()) = 0;
virtual future<role_set> query_all() = 0;
virtual future<role_set> query_all(::service::query_state& = internal_distributed_query_state()) = 0;
virtual future<bool> exists(std::string_view role_name) = 0;
@@ -186,12 +191,12 @@ public:
///
/// \returns the value of the named attribute, if one is set.
///
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) = 0;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
///
/// \returns a mapping of each role's value for the named attribute, if one is set for the role.
///
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name) = 0;
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
/// Sets `attribute_name` with `attribute_value` for `role_name`.
/// \returns an exceptional future with nonexistant_role if the role does not exist.

View File

@@ -663,21 +663,30 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
});
}
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted() {
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
const sstring query = seastar::format("SELECT * FROM {}.{}",
get_auth_ks_name(_qp),
meta::role_members_table::name);
const auto results = co_await _qp.execute_internal(
query,
db::consistency_level::ONE,
qs,
cql3::query_processor::cache_internal::yes);
role_to_directly_granted_map roles_map;
co_await _qp.query_internal(query, [&roles_map] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
roles_map.insert({row.get_as<sstring>("member"), row.get_as<sstring>("role")});
co_return stop_iteration::no;
});
std::transform(
results->begin(),
results->end(),
std::inserter(roles_map, roles_map.begin()),
[] (const cql3::untyped_result_set_row& row) {
return std::make_pair(row.get_as<sstring>("member"), row.get_as<sstring>("role")); }
);
co_return roles_map;
}
future<role_set> standard_role_manager::query_all() {
future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
const sstring query = seastar::format("SELECT {} FROM {}.{}",
meta::roles_table::role_col_name,
get_auth_ks_name(_qp),
@@ -695,7 +704,7 @@ future<role_set> standard_role_manager::query_all() {
const auto results = co_await _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_query_state(),
qs,
cql3::query_processor::cache_internal::yes);
role_set roles;
@@ -727,11 +736,11 @@ future<bool> standard_role_manager::can_login(std::string_view role_name) {
});
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
meta::role_attributes_table::name);
const auto result_set = co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
if (!result_set->empty()) {
const cql3::untyped_result_set_row &row = result_set->one();
co_return std::optional<sstring>(row.get_as<sstring>("value"));
@@ -739,11 +748,11 @@ future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_
co_return std::optional<sstring>{};
}
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name) {
return query_all().then([this, attribute_name] (role_set roles) {
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles)] (attribute_vals &role_to_att_val) {
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name] (sstring role) {
return get_attribute(role, attribute_name).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name, ::service::query_state& qs) {
return query_all(qs).then([this, attribute_name, &qs] (role_set roles) {
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles), &qs] (attribute_vals &role_to_att_val) {
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name, &qs] (sstring role) {
return get_attribute(role, attribute_name, qs).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
if (att_val) {
role_to_att_val.emplace(std::move(role), std::move(*att_val));
}
@@ -788,7 +797,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
future<std::vector<cql3::description>> standard_role_manager::describe_role_grants() {
std::vector<cql3::description> result{};
const auto grants = co_await query_all_directly_granted();
const auto grants = co_await query_all_directly_granted(internal_distributed_query_state());
result.reserve(grants.size());
for (const auto& [grantee_role, granted_role] : grants) {

View File

@@ -66,9 +66,9 @@ public:
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
virtual future<role_set> query_all() override;
virtual future<role_set> query_all(::service::query_state&) override;
virtual future<bool> exists(std::string_view role_name) override;
@@ -76,9 +76,9 @@ public:
virtual future<bool> can_login(std::string_view role_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;

View File

@@ -10,6 +10,8 @@
#include <seastar/core/coroutine.hh>
#include "create_index_statement.hh"
#include "db/config.hh"
#include "db/view/view.hh"
#include "exceptions/exceptions.hh"
#include "prepared_statement.hh"
#include "types/types.hh"
@@ -92,9 +94,13 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
}
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
throw exceptions::invalid_request_exception(format("Secondary indexes are not supported on base tables with tablets (keyspace '{}')", keyspace()));
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
validate_for_local_index(*schema);
std::vector<::shared_ptr<index_target>> targets;

View File

@@ -113,8 +113,7 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
if (rs->uses_tablets()) {
warnings.push_back(
"Tables in this keyspace will be replicated using Tablets "
"and will not support Materialized Views, Secondary Indexes and counters features. "
"To use Materialized Views, Secondary Indexes or counters, drop this keyspace and re-create it "
"and will not support counters features. To use counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
if (ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");

View File

@@ -152,9 +152,13 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
schema_ptr schema = validation::validate_column_family(db, _base_name.get_keyspace(), _base_name.get_column_family());
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
throw exceptions::invalid_request_exception(format("Materialized views are not supported on base tables with tablets"));
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
if (schema->is_counter()) {
throw exceptions::invalid_request_exception(format("Materialized views are not supported on counter tables"));
}

View File

@@ -1756,7 +1756,7 @@ std::map<sstring, db::experimental_features_t::feature> db::experimental_feature
{"broadcast-tables", feature::BROADCAST_TABLES},
{"keyspace-storage-options", feature::KEYSPACE_STORAGE_OPTIONS},
{"tablets", feature::UNUSED},
{"views-with-tablets", feature::VIEWS_WITH_TABLETS}
{"views-with-tablets", feature::UNUSED}
};
}

View File

@@ -136,8 +136,7 @@ struct experimental_features_t {
UDF,
ALTERNATOR_STREAMS,
BROADCAST_TABLES,
KEYSPACE_STORAGE_OPTIONS,
VIEWS_WITH_TABLETS
KEYSPACE_STORAGE_OPTIONS
};
static std::map<sstring, feature> map(); // See enum_option.
static std::vector<enum_option<experimental_features_t>> all();

View File

@@ -26,6 +26,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <flat_map>
#include "db/config.hh"
#include "db/view/base_info.hh"
#include "db/view/view_build_status.hh"
#include "db/view/view_consumer.hh"
@@ -3715,5 +3716,22 @@ sstring build_status_to_sstring(build_status status) {
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", (int)status));
}
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name) {
const bool tablet_views_enabled = db.features().views_with_tablets;
// Note: if the configuration option `rf_rack_valid_keyspaces` is enabled, we can be
// sure that all tablet-based keyspaces are RF-rack-valid. We check that
// at start-up and then we don't allow for creating RF-rack-invalid keyspaces.
const bool rf_rack_valid_keyspaces = db.get_config().rf_rack_valid_keyspaces();
const bool required_config = tablet_views_enabled && rf_rack_valid_keyspaces;
const bool uses_tablets = db.find_keyspace(keyspace_name).get_replication_strategy().uses_tablets();
if (!required_config && uses_tablets) {
throw std::logic_error("Materialized views and secondary indexes are not supported on base tables with tablets. "
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and make sure "
"that the cluster feature `VIEWS_WITH_TABLETS` is enabled.");
}
}
} // namespace view
} // namespace db

View File

@@ -309,6 +309,18 @@ endpoints_to_update get_view_natural_endpoint(
bool use_tablets_basic_rack_aware_view_pairing,
replica::cf_stats& cf_stats);
/// Verify that the provided keyspace is eligible for storing materialized views.
///
/// Result:
/// * If the keyspace is eligible, no effect.
/// * If the keyspace is not eligible, an exception is thrown. Its type is not specified,
/// and the user of this function cannot make any assumption about it. The carried exception
/// message will be worded in a way that can be directly passed on to the end user.
///
/// Preconditions:
/// * The provided `keyspace_name` must correspond to an existing keyspace.
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name);
}
}

View File

@@ -127,8 +127,9 @@ view_building_worker::view_building_worker(replica::database& db, db::system_key
init_messaging_service();
}
void view_building_worker::start_background_fibers() {
future<> view_building_worker::init() {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await discover_existing_staging_sstables();
_staging_sstables_registrator = run_staging_sstables_registrator();
_view_building_state_observer = run_view_building_state_observer();
_mnotifier.register_listener(this);
@@ -195,8 +196,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
}
future<> view_building_worker::run_staging_sstables_registrator() {
co_await discover_existing_staging_sstables();
while (!_as.abort_requested()) {
try {
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
@@ -340,6 +339,7 @@ future<> view_building_worker::run_view_building_state_observer() {
while (!_as.abort_requested()) {
bool sleep = false;
_state.some_batch_finished = false;
try {
vbw_logger.trace("view_building_state_observer() iteration");
auto read_apply_mutex_holder = co_await _group0_client.hold_read_apply_mutex(_as);
@@ -349,7 +349,12 @@ future<> view_building_worker::run_view_building_state_observer() {
_as.check();
read_apply_mutex_holder.return_all();
co_await _vb_state_machine.event.wait();
// A batch could finished its work while the worker was
// updating the state. In that case we should do another iteration.
if (!_state.some_batch_finished) {
co_await _vb_state_machine.event.wait();
}
} catch (abort_requested_exception&) {
} catch (broken_condition_variable&) {
} catch (...) {
@@ -657,6 +662,7 @@ future<> view_building_worker::local_state::clear_state() {
finished_tasks.clear();
aborted_tasks.clear();
state_updated_cv.broadcast();
some_batch_finished = false;
vbw_logger.debug("View building worker state was cleared.");
}
@@ -676,6 +682,7 @@ void view_building_worker::batch::start() {
return do_work();
}).finally([this] () {
state = batch_state::finished;
_vbw.local()._state.some_batch_finished = true;
_vbw.local()._vb_state_machine.event.broadcast();
});
}

View File

@@ -111,6 +111,7 @@ class view_building_worker : public seastar::peering_sharded_service<view_buildi
std::unordered_set<utils::UUID> finished_tasks;
std::unordered_set<utils::UUID> aborted_tasks;
bool some_batch_finished = false;
condition_variable state_updated_cv;
// Clears completed/aborted tasks and creates batches (without starting them) for started tasks.
@@ -166,7 +167,7 @@ public:
view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier,
service::raft_group0_client& group0_client, view_update_generator& vug, netw::messaging_service& ms,
view_building_state_machine& vbsm);
void start_background_fibers();
future<> init();
future<> register_staging_sstable_tasks(std::vector<sstables::shared_sstable> ssts, table_id table_id);

View File

@@ -131,6 +131,28 @@ def configure_iotune_open_fd_limit(shards_count):
logging.error(f"Required FDs count: {precalculated_fds_count}, default limit: {fd_limits}!")
sys.exit(1)
def force_random_request_size_of_4k():
"""
It is a known bug that on i4i, i7i, i8g, i8ge instances, the disk controller reports the wrong
physical sector size as 512bytes, but the actual physical sector size is 4096bytes. This function
helps us work around that issue until AWS manages to get a fix for it. It returns 4096 if it
detect it's running on one of the affected instance types, otherwise it returns None and IOTune
will use the physical sector size reported by the disk.
"""
path="/sys/devices/virtual/dmi/id/product_name"
try:
with open(path, "r") as f:
instance_type = f.read().strip()
except FileNotFoundError:
logging.warning(f"Couldn't find {path}. Falling back to IOTune using the physical sector size reported by disk.")
return
prefixes = ["i7i", "i4i", "i8g", "i8ge"]
if any(instance_type.startswith(p) for p in prefixes):
return 4096
def run_iotune():
if "SCYLLA_CONF" in os.environ:
conf_dir = os.environ["SCYLLA_CONF"]
@@ -173,6 +195,8 @@ def run_iotune():
configure_iotune_open_fd_limit(cpudata.nr_shards())
if (reqsize := force_random_request_size_of_4k()):
iotune_args += ["--random-write-io-buffer-size", f"{reqsize}"]
try:
subprocess.check_call([bindir() + "/iotune",
"--format", "envfile",

View File

@@ -17,6 +17,7 @@ import stat
import logging
import pyudev
import psutil
import platform
from pathlib import Path
from scylla_util import *
from subprocess import run, SubprocessError
@@ -102,6 +103,21 @@ def is_selinux_enabled():
return True
return False
def is_kernel_version_at_least(major, minor):
"""Check if the Linux kernel version is at least major.minor"""
try:
kernel_version = platform.release()
# Extract major.minor from version string like "5.15.0-56-generic"
version_parts = kernel_version.split('.')
if len(version_parts) >= 2:
kernel_major = int(version_parts[0])
kernel_minor = int(version_parts[1])
return (kernel_major, kernel_minor) >= (major, minor)
except (ValueError, IndexError):
# If we can't parse the version, assume older kernel for safety
pass
return False
if __name__ == '__main__':
if os.getuid() > 0:
print('Requires root permission.')
@@ -231,8 +247,17 @@ if __name__ == '__main__':
# see https://git.kernel.org/pub/scm/fs/xfs/xfsprogs-dev.git/tree/mkfs/xfs_mkfs.c .
# and it also cannot be smaller than the sector size.
block_size = max(1024, sector_size)
run('udevadm settle', shell=True, check=True)
run(f'mkfs.xfs -b size={block_size} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
# On Linux 5.12+, sub-block overwrites are supported well, so keep the default block
# size, which will play better with the SSD.
if is_kernel_version_at_least(5, 12):
block_size_opt = ""
else:
block_size_opt = f"-b size={block_size}"
run(f'mkfs.xfs {block_size_opt} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
run('udevadm settle', shell=True, check=True)
if is_debian_variant():

View File

@@ -202,12 +202,9 @@ enabled. If you plan to use any of the features listed below, CREATE your keyspa
:ref:`with tablets disabled <tablets-enable-tablets>`.
* Counters
* Materialized Views (MV) ``*``
* Secondary indexes (SI, as it depends on MV) ``*``
``*`` You can enable experimental support for MV and SI using
the ``--experimental-features=views-with-tablets`` configuration option.
See :ref:`Views with tablets <admin-views-with-tablets>` for details.
To enable materialized views and secondary indexes for tablet keyspaces, use
the `--rf-rack-valid-keyspaces` See :ref:`Views with tablets <admin-views-with-tablets>` for details.
Resharding in keyspaces with tablets enabled has the following limitations:

View File

@@ -341,17 +341,13 @@ credentials and endpoint.
Views with Tablets
------------------
By default, Materialized Views (MV) and Secondary Indexes (SI)
are disabled in keyspaces that use tablets.
Support for MV and SI with tablets is experimental and must be explicitly
enabled in the ``scylla.yaml`` configuration file by specifying
the ``views-with-tablets`` option:
Materialized Views (MV) and Secondary Indexes (SI) are enabled in keyspaces that use tablets
only when :term:`RF-rack-valid keyspaces <RF-rack-valid keyspace>` are enforced. That can be
done in the ``scylla.yaml`` configuration file by specifying
.. code-block:: yaml
experimental_features:
- views-with-tablets
rf_rack_valid_keyspaces: true
Monitoring

View File

@@ -53,7 +53,7 @@ ScyllaDB nodetool cluster repair command supports the following options:
nodetool cluster repair --tablet-tokens 1,10474535988
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
For example:

View File

@@ -38,14 +38,14 @@ Manual Dictionary Training
You can manually trigger dictionary training using the REST API::
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&table=mytable"
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&cf=mytable"
Estimating Compression Ratios
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To choose the best compression configuration, you can estimate compression ratios using the REST API::
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&table=mytable"
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&cf=mytable"
This will return a report with estimated compression ratios for various combinations of compression
parameters (algorithm, chunk size, zstd level, dictionary).

View File

@@ -76,7 +76,7 @@ struct repair_row_level_start_response {
namespace locator {
enum class tablet_repair_incremental_mode : uint8_t {
regular,
incremental,
full,
disabled,
};

View File

@@ -99,9 +99,6 @@ std::set<sstring> get_disabled_features_from_db_config(const db::config& cfg, st
if (!cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
disabled.insert("KEYSPACE_STORAGE_OPTIONS"s);
}
if (!cfg.check_experimental(db::experimental_features_t::feature::VIEWS_WITH_TABLETS)) {
disabled.insert("VIEWS_WITH_TABLETS"s);
}
if (cfg.force_gossip_topology_changes()) {
if (cfg.enable_tablets_by_default()) {
throw std::runtime_error("Tablets cannot be enabled with gossip topology changes. Use either --tablets-mode-for-new-keyspaces=enabled|enforced or --force-gossip-topology-changes, but not both.");

View File

@@ -754,7 +754,7 @@ tablet_task_type tablet_task_type_from_string(const sstring& name) {
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<locator::tablet_repair_incremental_mode, sstring> tablet_repair_incremental_mode_to_name = {
{locator::tablet_repair_incremental_mode::disabled, "disabled"},
{locator::tablet_repair_incremental_mode::regular, "regular"},
{locator::tablet_repair_incremental_mode::incremental, "incremental"},
{locator::tablet_repair_incremental_mode::full, "full"},
};

View File

@@ -162,11 +162,11 @@ sstring tablet_task_type_to_string(tablet_task_type);
tablet_task_type tablet_task_type_from_string(const sstring&);
// - regular (regular incremental repair): The incremental repair logic is enabled.
// - incremental (incremental repair): The incremental repair logic is enabled.
// Unrepaired sstables will be included for repair. Repaired sstables will be
// skipped. The incremental repair states will be updated after repair.
// - full (full incremental repair): The incremental repair logic is enabled.
// - full (full repair): The incremental repair logic is enabled.
// Both repaired and unrepaired sstables will be included for repair. The
// incremental repair states will be updated after repair.
@@ -175,12 +175,12 @@ tablet_task_type tablet_task_type_from_string(const sstring&);
// sstables_repaired_at in system.tablets table, will not be updated after
// repair.
enum class tablet_repair_incremental_mode : uint8_t {
regular,
incremental,
full,
disabled,
};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::regular};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);

View File

@@ -2208,6 +2208,11 @@ sharded<locator::shared_token_metadata> token_metadata;
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(cfg->rf_rack_valid_keyspaces(), token_metadata.local().get());
// Materialized views and secondary indexes are still restricted and require specific configuration
// options to work. Make sure that if there are existing views or indexes, they don't violate
// the requirements imposed on them.
db.local().validate_tablet_views_indexes();
// Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
@@ -2426,7 +2431,7 @@ sharded<locator::shared_token_metadata> token_metadata;
checkpoint(stop_signal, "starting view building worker's background fibers");
with_scheduling_group(maintenance_scheduling_group, [&] {
view_building_worker.local().start_background_fibers();
return view_building_worker.local().init();
}).get();
auto drain_view_buiding_worker = defer_verbose_shutdown("draining view building worker", [&] {
view_building_worker.invoke_on_all(&db::view::view_building_worker::drain).get();

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:103bd12a1f0feb60d814da074b81ebafaa13059d1267ee3612c48a8bc96798b6
size 6242980
oid sha256:5e35a15a32060d47846c2a5ab29373639e651ac112cc0785306789b8273c63dc
size 6299456

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:2cb637e741a2b9badc96f3f175f15db257b9273ea43040289de7d72657b5505a
size 6240824
oid sha256:71a3e8a3a0e68d35c2e14b553a81e1bc55f6adb73a1988e17e4326923020db2c
size 6316028

View File

@@ -12,6 +12,7 @@
#include <fmt/ranges.h>
#include <fmt/std.h>
#include <seastar/core/rwlock.hh>
#include "db/view/view.hh"
#include "locator/network_topology_strategy.hh"
#include "locator/tablets.hh"
#include "locator/token_metadata_fwd.hh"
@@ -86,6 +87,7 @@
#include "tracing/trace_keyspace_helper.hh"
#include <algorithm>
#include <flat_set>
using namespace std::chrono_literals;
using namespace db;
@@ -3483,6 +3485,37 @@ void database::check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces
}
}
void database::validate_tablet_views_indexes() const {
dblog.info("Verifying that all existing materialized views are valid");
const data_dictionary::database& db = this->as_data_dictionary();
std::flat_set<std::string_view> invalid_keyspaces;
for (const view_ptr& view : get_views()) {
const auto& ks = view->ks_name();
try {
db::view::validate_view_keyspace(db, ks);
} catch (...) {
invalid_keyspaces.emplace(ks);
}
}
if (invalid_keyspaces.empty()) {
dblog.info("All existing materialized views are valid");
return;
}
// `std::flat_set` guarantees iteration in the increasing order.
const std::string ks_list = invalid_keyspaces
| std::views::join_with(std::string_view(", "))
| std::ranges::to<std::string>();
dblog.warn("Some of the existing keyspaces violate the requirements "
"for using materialized views or secondary indexes. Those features require enabling "
"the configuration option `rf_rack_valid_keyspaces` and the cluster feature "
"`VIEWS_WITH_TABLETS`. The keyspaces that violate that condition: {}", ks_list);
}
utils::chunked_vector<uint64_t> compute_random_sorted_ints(uint64_t max_value, uint64_t n_values) {
static thread_local std::minstd_rand rng{std::random_device{}()};
std::uniform_int_distribution<uint64_t> dist(0, max_value);

View File

@@ -2091,6 +2091,20 @@ public:
// * the `locator::topology` instance corresponding to the passed `locator::token_metadata_ptr`
// must contain a complete list of racks and data centers in the cluster.
void check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces, const locator::token_metadata_ptr) const;
/// Verify that all existing materialized views are valid.
///
/// We consider a materialized view valid if one of the following
/// conditions is satisfied:
/// * it resides in a vnode-based keyspace,
/// * it resides in a tablet-based keyspace, the cluster feature `VIEWS_WITH_TABLETS`
/// is enabled, and the configuration option `rf_rack_valid_keyspaces` is enabled.
///
/// Result:
/// * Depending on whether there are invalid materialized views, the function will
/// log that either everything's OK, or that there are some keyspaces that violate
/// the requirement.
void validate_tablet_views_indexes() const;
private:
// SSTable sampling might require considerable amounts of memory,
// so we want to limit the number of concurrent sampling operations.

View File

@@ -319,7 +319,7 @@ future<> service_level_controller::update_service_levels_cache(qos::query_contex
});
}
future<> service_level_controller::auth_integration::reload_cache() {
future<> service_level_controller::auth_integration::reload_cache(qos::query_context ctx) {
SCYLLA_ASSERT(this_shard_id() == global_controller);
const auto _ = _stop_gate.hold();
@@ -336,11 +336,12 @@ future<> service_level_controller::auth_integration::reload_cache() {
}
auto units = co_await get_units(_sl_controller._global_controller_db->notifications_serializer, 1);
auto& qs = qos_query_state(ctx);
auto& role_manager = _auth_service.underlying_role_manager();
const auto all_roles = co_await role_manager.query_all();
const auto hierarchy = co_await role_manager.query_all_directly_granted();
const auto all_roles = co_await role_manager.query_all(qs);
const auto hierarchy = co_await role_manager.query_all_directly_granted(qs);
// includes only roles with attached service level
const auto attributes = co_await role_manager.query_attribute_for_all("service_level");
const auto attributes = co_await role_manager.query_attribute_for_all("service_level", qs);
std::map<sstring, service_level_options> effective_sl_map;
@@ -403,7 +404,7 @@ future<> service_level_controller::update_cache(update_both_cache_levels update_
}
if (_auth_integration) {
co_await _auth_integration->reload_cache();
co_await _auth_integration->reload_cache(ctx);
}
}

View File

@@ -173,7 +173,7 @@ public:
future<std::vector<cql3::description>> describe_attached_service_levels();
/// Must be executed on shard 0.
future<> reload_cache();
future<> reload_cache(qos::query_context ctx);
void clear_cache();
};

View File

@@ -497,7 +497,15 @@ future<> group0_voter_handler::update_nodes(
};
// Helper for adding a single node to the nodes list
auto add_node = [&nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
auto add_node = [this, &nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
// Some topology members may not belong to the new group 0 in the Raft-based recovery procedure.
if (!group0_config.contains(id)) {
if (!_gossiper.get_recovery_leader()) {
rvlogger.warn("node {} in state {} is not a part of the group 0 configuration {}, ignoring",
id, rs.state, group0_config);
}
return;
}
const auto is_voter = group0_config.can_vote(id);
const auto is_leader = (id == leader_id);
nodes.emplace(id, group0_voter_calculator::node_descriptor{

View File

@@ -3642,7 +3642,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
auto cached_partitions_file = caching == use_caching::yes
? _cached_partitions_file
: seastar::make_shared<cached_file>(
_partitions_file,
uncached_partitions_file(),
_manager.get_cache_tracker().get_index_cached_file_stats(),
_manager.get_cache_tracker().get_lru(),
_manager.get_cache_tracker().region(),
@@ -3652,7 +3652,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
auto cached_rows_file = caching == use_caching::yes
? _cached_rows_file
: seastar::make_shared<cached_file>(
_rows_file,
uncached_rows_file(),
_manager.get_cache_tracker().get_index_cached_file_stats(),
_manager.get_cache_tracker().get_lru(),
_manager.get_cache_tracker().region(),

View File

@@ -335,6 +335,85 @@ def test_simple_batch_get_items(test_table_sb):
assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name
assert 2 == response['ConsumedCapacity'][0]['CapacityUnits']
# This test reproduces a bug where the consumed capacity was divided by 16 MB,
# instead of 4 KB. The general formula for RCU per item is the same as for
# GetItem, namely:
#
# CEIL(ItemSizeInBytes / 4096) * (1 if strong consistency, 0.5 if eventual
# consistency)
#
# The RCU is calculated for each item individually, and the results are summed
# for the total cost of the BatchGetItem. In this case, the larger item is
# rounded up to 68KB, giving 17 RCUs, and the smaller item to 20KB, which
# results in 5 RCUs, making the total consumed capacity for this operation
# 22 RCUs.
def test_batch_get_items_large(test_table_sb):
p1 = random_string()
c1 = random_bytes()
test_table_sb.put_item(Item={'p': p1, 'c': c1, 'a': 'a' * 64 * KB})
p2 = random_string()
c2 = random_bytes()
test_table_sb.put_item(Item={'p': p2, 'c': c2, 'a': 'a' * 16 * KB})
response = test_table_sb.meta.client.batch_get_item(RequestItems = {
test_table_sb.name: {'Keys': [{'p': p1, 'c': c1}, {'p': p2, 'c': c2}], 'ConsistentRead': True}}, ReturnConsumedCapacity='TOTAL')
assert 'ConsumedCapacity' in response
assert 'TableName' in response['ConsumedCapacity'][0]
assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name
assert 22 == response['ConsumedCapacity'][0]['CapacityUnits']
# Helper function to generate item_count items and batch write them to the
# table. Returns the list of generated items.
def prepare_items(table, item_factory, item_count=10):
items = []
with table.batch_writer() as writer:
for i in range(item_count):
item = item_factory(i)
items.append(item)
writer.put_item(Item=item)
return items
# This test verifies if querying two tables, each containing multiple ~30 byte
# items, reports the RCU correctly. A single item should consume 1 RCU, because
# the items' sizes are rounded up separately to 1 KB (ConsistentReads), and
# RCU should be reported per table. A variant of test_batch_get_items_large.
def test_batch_get_items_many_small(test_table_s, test_table_sb):
# Each item should be about 30 bytes.
items_sb = prepare_items(test_table_sb, lambda i: {'p': f'item_{i}_' + random_string(), 'c': random_bytes()})
items_s = prepare_items(test_table_s, lambda i: {'p': f'item_{i}_' + random_string()})
response = test_table_sb.meta.client.batch_get_item(RequestItems = {
test_table_sb.name: {'Keys': items_sb, 'ConsistentRead': True},
test_table_s.name: {'Keys': items_s, 'ConsistentRead': True},
}, ReturnConsumedCapacity='TOTAL')
assert 'ConsumedCapacity' in response
assert len(response['ConsumedCapacity']) == 2
expected_tables = {test_table_sb.name, test_table_s.name}
for consumption_per_table in response['ConsumedCapacity']:
assert 'TableName' in consumption_per_table
assert consumption_per_table['CapacityUnits'] == 10, f"Table {consumption_per_table['TableName']} reported {consumption_per_table['CapacityUnits']} RCUs, expected 10"
assert consumption_per_table['TableName'] in expected_tables
expected_tables.remove(consumption_per_table['TableName'])
assert not expected_tables
# This test verifies if querying a single partition reports the RCU correctly.
# This test is similar to test_batch_get_items_many_small.
def test_batch_get_items_many_small_single_partition(test_table_sb):
# Each item should be about 20 bytes.
pk = random_string()
items_sb = prepare_items(test_table_sb, lambda _: {'p': pk, 'c': random_bytes()})
response = test_table_sb.meta.client.batch_get_item(RequestItems = {
test_table_sb.name: {'Keys': items_sb, 'ConsistentRead': True},
}, ReturnConsumedCapacity='TOTAL')
assert 'ConsumedCapacity' in response
assert 'TableName' in response['ConsumedCapacity'][0]
assert response['ConsumedCapacity'][0]['TableName'] == test_table_sb.name
assert 10 == response['ConsumedCapacity'][0]['CapacityUnits']
# Validate that when getting a batch of requests
# From multiple tables we get an RCU for each of the tables
# We also validate that the eventual consistency return half the units

View File

@@ -25,17 +25,23 @@
SEASTAR_THREAD_TEST_CASE(test_empty) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
BOOST_REQUIRE_THROW(tools::load_schemas(dbcfg, "").get(), std::exception);
BOOST_REQUIRE_THROW(tools::load_schemas(dbcfg, ";").get(), std::exception);
}
SEASTAR_THREAD_TEST_CASE(test_keyspace_only) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};").get().size(), 0);
}
SEASTAR_THREAD_TEST_CASE(test_single_table) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf (pk int PRIMARY KEY, v int)").get().size(), 1);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf (pk int PRIMARY KEY, v map<int, int>)").get().size(), 1);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
@@ -43,6 +49,8 @@ SEASTAR_THREAD_TEST_CASE(test_single_table) {
SEASTAR_THREAD_TEST_CASE(test_keyspace_replication_strategy) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'mydc1': 1, 'mydc2': 4}; CREATE TABLE ks.cf (pk int PRIMARY KEY, v int);").get().size(), 1);
@@ -50,6 +58,8 @@ SEASTAR_THREAD_TEST_CASE(test_keyspace_replication_strategy) {
SEASTAR_THREAD_TEST_CASE(test_multiple_tables) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf1 (pk int PRIMARY KEY, v int); CREATE TABLE ks.cf2 (pk int PRIMARY KEY, v int)").get().size(), 2);
BOOST_REQUIRE_EQUAL(tools::load_schemas(dbcfg, "CREATE TABLE ks.cf1 (pk int PRIMARY KEY, v int); CREATE TABLE ks.cf2 (pk int PRIMARY KEY, v int);").get().size(), 2);
BOOST_REQUIRE_EQUAL(tools::load_schemas(
@@ -70,6 +80,8 @@ SEASTAR_THREAD_TEST_CASE(test_multiple_tables) {
SEASTAR_THREAD_TEST_CASE(test_udts) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
BOOST_REQUIRE_EQUAL(tools::load_schemas(
dbcfg,
"CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}; "
@@ -107,6 +119,8 @@ SEASTAR_THREAD_TEST_CASE(test_udts) {
SEASTAR_THREAD_TEST_CASE(test_dropped_columns) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
BOOST_REQUIRE_EQUAL(tools::load_schemas(
dbcfg,
"CREATE TABLE ks.cf (pk int PRIMARY KEY, v1 int); "
@@ -177,6 +191,7 @@ void check_views(std::vector<schema_ptr> schemas, std::vector<view_type> views_t
SEASTAR_THREAD_TEST_CASE(test_materialized_view) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
check_views(
tools::load_schemas(
@@ -219,6 +234,7 @@ SEASTAR_THREAD_TEST_CASE(test_materialized_view) {
SEASTAR_THREAD_TEST_CASE(test_index) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
check_views(
tools::load_schemas(
@@ -269,6 +285,7 @@ SEASTAR_THREAD_TEST_CASE(test_index) {
SEASTAR_THREAD_TEST_CASE(test_mv_index) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
check_views(
tools::load_schemas(
@@ -308,6 +325,7 @@ void check_schema_columns(const schema& a, const schema& b, bool check_key_colum
void check_sstable_schema(sstables::test_env& env, std::filesystem::path sst_path, const utils::chunked_vector<mutation>& mutations, bool has_scylla_metadata) {
db::config dbcfg;
dbcfg.rf_rack_valid_keyspaces(true);
auto schema = tools::load_schema_from_sstable(dbcfg, sst_path).get();

View File

@@ -13,6 +13,7 @@
#include "sstables/sstable_compressor_factory.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/test_utils.hh"
BOOST_AUTO_TEST_SUITE(sstable_compressor_factory_test)
@@ -27,7 +28,7 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
// Create a compressor factory.
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
tests::require(shard_to_numa_mapping.size() == smp::count);
auto config = default_sstable_compressor_factory::config{
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
};
@@ -68,8 +69,8 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
// Check that the dictionary used by this shard lies on the same NUMA node.
// This is important to avoid cross-node memory accesses on the hot path.
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
tests::require_equal(our_numa_node, compressor_numa_node);
tests::require_equal(our_numa_node, decompressor_numa_node);
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
@@ -79,22 +80,22 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
auto compressed_size = compressor->compress(
reinterpret_cast<const char*>(message.data()), message.size(),
reinterpret_cast<char*>(compressed.data()), compressed.size());
BOOST_REQUIRE_GE(compressed_size, 0);
tests::require_greater_equal(compressed_size, 0);
compressed.resize(compressed_size);
// Validate that the recommeded dict was actually used.
BOOST_CHECK(compressed.size() < message.size() / 10);
tests::require_less(compressed.size(), message.size() / 10);
auto decompressed = std::vector<char>(message.size());
auto decompressed_size = decompressor->uncompress(
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
BOOST_REQUIRE_GE(decompressed_size, 0);
tests::require_greater_equal(decompressed_size, 0);
decompressed.resize(decompressed_size);
// Validate that the roundtrip through compressor and decompressor
// resulted in the original message.
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
tests::require(std::equal(message.begin(), message.end(), decompressed.begin(), decompressed.end()));
})).get();
}
@@ -102,11 +103,11 @@ void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
// of NUMA nodes.
// This isn't that important, but we don't want to duplicate dictionaries
// within a NUMA node unnecessarily.
BOOST_CHECK_EQUAL(
tests::require_equal(
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
BOOST_CHECK_EQUAL(
tests::require_equal(
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);

View File

@@ -0,0 +1,286 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import math
import pytest
from dtest_class import Tester, create_ks
logger = logging.getLogger(__name__)
# Those are ideal values according to c* specifications
# they should pass
LIMIT_64_K = 64 * 1024
LIMIT_32K = 32 * 1024
LIMIT_128K = 128 * 1024
LIMIT_2GB = 2 * 1024 * 1024 * 1024
MAX_KEY_SIZE = LIMIT_64_K
MAX_BLOB_SIZE = 8388608 # theoretical limit LIMIT_2GB
MAX_COLUMNS = LIMIT_128K
MAX_TUPLES = LIMIT_32K
MAX_BATCH_SIZE = 50 * 1024
MAX_CELLS_COLUMNS = LIMIT_32K
MAX_CELLS_BATCH_SIZE = 50
MAX_CELLS = 16777216
# Those are values used to validate the tests code
# MAX_KEY_SIZE = 1000
# MAX_BLOB_SIZE = 1000
# MAX_COLUMNS = 1000
# MAX_TUPLES = 1000
# MAX_BATCH_SIZE = 1000
# MAX_CELLS_COLUMNS = 100
# MAX_CELLS_BATCH_SIZE = 100
# MAX_CELLS = 1000
@pytest.mark.single_node
class TestLimits(Tester):
def prepare(self):
"""
Sets up node to test against.
"""
cluster = self.cluster
return cluster
def _do_test_max_key_length(self, session, node, size, expect_failure=False):
print("Testing max key length for {}.{}".format(size, " Expected failure..." if expect_failure else ""))
key_name = "k" * size
c = f"CREATE TABLE test1 ({key_name} int PRIMARY KEY)"
if expect_failure:
expected_error = r"Key size too large: \d+ > 65535"
self.ignore_log_patterns += [expected_error]
with pytest.raises(Exception, match=expected_error):
session.execute(c)
return
session.execute(c)
session.execute("insert into ks.test1 (%s) values (1);" % key_name)
session.execute("insert into ks.test1 (%s) values (2);" % key_name)
node.flush()
# Select
res = session.execute(
"""
SELECT * FROM ks.test1
WHERE %s=1
"""
% key_name
)
assert len(res.current_rows) == 1
res = session.execute(
"""
SELECT * FROM ks.test1
WHERE %s=2
"""
% key_name
)
assert len(res.current_rows) == 1
session.execute("""DROP TABLE test1""")
def test_max_key_length(self):
cluster = self.prepare()
cluster.populate(1).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
create_ks(session, "ks", 1)
# biggest that will currently work in scylla
# key_name = "k" * 65526
self._do_test_max_key_length(session, node, MAX_KEY_SIZE, expect_failure=True)
self._do_test_max_key_length(session, node, MAX_KEY_SIZE - 9, expect_failure=True)
self._do_test_max_key_length(session, node, MAX_KEY_SIZE - 10)
size = MAX_KEY_SIZE // 2
while size >= 1:
self._do_test_max_key_length(session, node, size)
size >>= 3
def _do_test_blob_size(self, session, node, size):
print("Testing blob size %i" % size)
blob_a = "a" * size
blob_b = "b" * size
session.execute(
"""
CREATE TABLE test1 (
user ascii PRIMARY KEY,
payload blob,
)
"""
)
session.execute("insert into ks.test1 (user, payload) values ('tintin', textAsBlob('%s'));" % blob_a)
session.execute("insert into ks.test1 (user, payload) values ('milou', textAsBlob('%s'));" % blob_b)
node.flush()
# Select
res = session.execute(
"""
SELECT * FROM ks.test1
WHERE user='tintin'
"""
)
assert len(list(res)) == 1
res = session.execute(
"""
SELECT * FROM ks.test1
WHERE user='milou'
"""
)
assert len(list(res)) == 1
session.execute("""DROP TABLE test1""")
def test_max_column_value_size(self):
cluster = self.prepare()
cluster.populate(1).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
create_ks(session, "ks", 1)
size = 1
for i in range(int(math.log(MAX_BLOB_SIZE, 2))):
size <<= 1
self._do_test_blob_size(session, node, size - 1)
def _do_test_max_tuples(self, session, node, count):
print("Testing max tuples for %i" % count)
t = ""
v = ""
for i in range(count):
t += "int, "
v += "1, "
t = t[:-2]
v = v[:-2]
c = (
"""
CREATE TABLE stuff (
k int PRIMARY KEY,
v frozen<tuple<%s>>
);
"""
% t
)
session.execute(c)
c = "INSERT INTO stuff (k, v) VALUES(0, (%s));" % v
session.execute(c)
c = "SELECT * FROM STUFF;"
res = session.execute(c)
assert len(res.current_rows) == 1
session.execute("""DROP TABLE stuff""")
def test_max_tuple(self):
cluster = self.prepare()
cluster.populate(1).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
create_ks(session, "ks", 1)
count = 1
for i in range(int(math.log(MAX_TUPLES, 2))):
count <<= 1
self._do_test_max_tuples(session, node, count - 1)
def _do_test_max_batch_size(self, session, node, size):
print("Testing max batch size for size=%i" % size)
c = """
CREATE TABLE stuff (
k int PRIMARY KEY,
v text
);
"""
session.execute(c)
c = "BEGIN UNLOGGED BATCH\n"
row_size = 1000
overhead = 100
blob = (row_size - overhead) * "x"
rows = size // row_size
for i in range(rows):
c += "INSERT INTO stuff (k, v) VALUES(%i, '%s')\n" % (i, blob)
c += "APPLY BATCH;\n"
session.execute(c)
c = "SELECT * FROM STUFF;"
res = session.execute(c)
assert len(list(res)) == rows
session.execute("""DROP TABLE STUFF""")
def test_max_batch_size(self):
cluster = self.prepare()
cluster.populate(1).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
create_ks(session, "ks", 1)
size = 1
for i in range(int(math.log(MAX_BATCH_SIZE, 2))):
size <<= 1
self._do_test_max_batch_size(session, node, size - 1)
def _do_test_max_cell_count(self, session, cells):
print("Testing max cells count for %i" % cells)
keys = ""
keys_create = ""
columns = MAX_CELLS_COLUMNS
for i in range(columns):
keys += "key" + str(i) + ", "
keys_create += "key" + str(i) + " int, "
values = "1, " * columns
c = """CREATE TABLE test1 (%s blub int PRIMARY KEY,)""" % keys_create
session.execute(c)
batch_size = MAX_CELLS_BATCH_SIZE
rows = cells // columns
c = "BEGIN UNLOGGED BATCH\n"
for i in range(rows):
c += "insert into ks.test1 (%s blub) values (%s %i);\n" % (keys, values, i)
if i == rows - 1 or (i + 1) % batch_size == 0:
c += "APPLY BATCH;\n"
session.execute(c)
c = "BEGIN UNLOGGED BATCH\n"
session.execute("""DROP TABLE test1""")
def test_max_cells(self):
if self.cluster.scylla_mode == "debug":
pytest.skip("client times out in debug mode")
cluster = self.prepare()
cluster.set_configuration_options(values={"query_tombstone_page_limit": 9999999, "batch_size_warn_threshold_in_kb": 1024 * 1024, "batch_size_fail_threshold_in_kb": 1024 * 1024, "commitlog_segment_size_in_mb": 64})
cluster.populate(1).start(jvm_args=["--smp", "1", "--memory", "2G", "--logger-log-level", "lsa-timing=debug"])
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
create_ks(session, "ks", 1)
cells = 1
for i in range(int(math.log(MAX_CELLS, 2))):
cells <<= 1
self._do_test_max_cell_count(session, cells - 1)

View File

@@ -0,0 +1,120 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import pytest
from cassandra.cluster import Session as CassandraSession
from cassandra.protocol import InvalidRequest
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.parametrize("schema_kind", ["view", "index"])
# Views no longer depend on the experimental feature `views-with-tablets`,
# but let's keep these test cases to make sure it's really not needed anymore.
@pytest.mark.parametrize("views_with_tablets", [False, True])
@pytest.mark.parametrize("rf_rack_valid_keyspaces", [False, True])
async def test_mv_and_index_restrictions_in_tablet_keyspaces(manager: ManagerClient, schema_kind: str,
views_with_tablets: bool, rf_rack_valid_keyspaces: bool):
"""
Verify that creating a materialized view or a secondary index in a tablet-based keyspace
is only possible when both the configuration option `rf_rack_valid_keyspaces` is enabled.
"""
async def create_mv_or_index(cql: CassandraSession):
if schema_kind == "view":
await cql.run_async("CREATE MATERIALIZED VIEW ks.mv "
"AS SELECT * FROM ks.t "
"WHERE p IS NOT NULL AND v IS NOT NULL "
"PRIMARY KEY (v, p)")
elif schema_kind == "index":
await cql.run_async("CREATE INDEX myindex ON ks.t(v)")
else:
assert False, "Unknown schema kind"
async def try_pass(cql: CassandraSession):
try:
await cql.run_async(f"CREATE KEYSPACE ks WITH replication = "
"{'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
"AND tablets = {'enabled': true}")
await cql.run_async(f"CREATE TABLE ks.t (p int PRIMARY KEY, v int)")
await create_mv_or_index(cql)
finally:
await cql.run_async(f"DROP KEYSPACE IF EXISTS ks")
async def try_fail(cql: CassandraSession):
err = "Materialized views and secondary indexes are not supported on base tables with tablets. " \
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and " \
"make sure that the cluster feature `VIEWS_WITH_TABLETS` is enabled."
with pytest.raises(InvalidRequest, match=err):
await try_pass(cql)
feature = ["views-with-tablets"] if views_with_tablets else []
config = {"experimental_features": feature, "rf_rack_valid_keyspaces": rf_rack_valid_keyspaces}
srv = await manager.server_add(config=config)
# Necessary because we're restarting the node multiple times.
cql, _ = await manager.get_ready_cql([srv])
logger.debug("Obtained CassandraSession object")
# We just want to validate the statements. We don't need to wait.
assert hasattr(cql.cluster, "max_schema_agreement_wait")
cql.cluster.max_schema_agreement_wait = 0
logger.debug("Set max_schema_agreement_wait to 0")
if rf_rack_valid_keyspaces:
await try_pass(cql)
logger.debug("try_pass finished successfully")
else:
await try_fail(cql)
logger.debug("try_fail finished successfully")
@pytest.mark.asyncio
@pytest.mark.parametrize("view_type", ["view", "index"])
async def test_view_startup(manager: ManagerClient, view_type: str):
"""
Verify that starting a node with materialized views in a tablet-based
keyspace when the configuration option `rf_rack_valid_keyspaces` is disabled
leads to a warning.
"""
srv = await manager.server_add(config={"rf_rack_valid_keyspaces": True})
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE ks WITH replication = "
"{'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
"AND tablets = {'enabled': true}")
await cql.run_async("CREATE TABLE ks.t (p int PRIMARY KEY, v int)")
if view_type == "view":
await cql.run_async("CREATE MATERIALIZED VIEW ks.mv "
"AS SELECT * FROM ks.t "
"WHERE p IS NOT NULL AND v IS NOT NULL "
"PRIMARY KEY (v, p)")
elif view_type == "index":
await cql.run_async("CREATE INDEX i ON ks.t(v)")
else:
logger.error(f"Unexpected view type: {view_type}")
assert False
await manager.server_stop(srv.server_id)
await manager.server_update_config(srv.server_id, "rf_rack_valid_keyspaces", False)
log = await manager.server_open_log(srv.server_id)
mark = await log.mark()
start_task = asyncio.create_task(manager.server_start(srv.server_id))
err = "Some of the existing keyspaces violate the requirements for using materialized " \
"views or secondary indexes. Those features require enabling the configuration " \
"option `rf_rack_valid_keyspaces` and the cluster feature `VIEWS_WITH_TABLETS`. " \
"The keyspaces that violate that condition: ks"
await log.wait_for(err, from_mark=mark)
await start_task

View File

@@ -256,6 +256,11 @@ async def test_mv_pairing_during_replace(manager: ManagerClient):
@pytest.mark.asyncio
@pytest.mark.parametrize("delayed_replica", ["base", "mv"])
@pytest.mark.parametrize("altered_dc", ["dc1", "dc2"])
# FIXME: The test relies on cross-rack tablet migrations. They're forbidden when the configuration option
# `rf_rack_valid_keyspaces` is enabled. On the other hand, materialized views in tablet-based keyspaces
# require the configuration option to be used.
# Hence, we need to rewrite this test.
@pytest.mark.skip
@skip_mode('release', 'error injections are not supported in release mode')
async def test_mv_rf_change(manager: ManagerClient, delayed_replica: str, altered_dc: str):
servers = []
@@ -331,8 +336,8 @@ async def test_mv_first_replica_in_dc(manager: ManagerClient, delayed_replica: s
# If we run the test with more than 1 shard and the tablet for the view table gets allocated on the same shard as the tablet of the base table,
# we'll perform an intranode migration of one of these tablets to the other shard. This migration can be confused with the migration to the
# new dc in the "first_migration_done()" below. To avoid this, run servers with only 1 shard.
servers.append(await manager.server_add(cmdline=['--smp', '1'], config={'rf_rack_valid_keyspaces': False}, property_file={'dc': f'dc1', 'rack': 'myrack1'}))
servers.append(await manager.server_add(cmdline=['--smp', '1'], config={'rf_rack_valid_keyspaces': False}, property_file={'dc': f'dc2', 'rack': 'myrack1'}))
servers.append(await manager.server_add(cmdline=['--smp', '1'], property_file={'dc': f'dc1', 'rack': 'myrack1'}))
servers.append(await manager.server_add(cmdline=['--smp', '1'], property_file={'dc': f'dc2', 'rack': 'myrack1'}))
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE IF NOT EXISTS ks WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} AND tablets = {'initial': 1}")

View File

@@ -614,8 +614,15 @@ CLUSTER_EVENTS: tuple[ClusterEventType, ...] = (
sleep_for_30_seconds,
add_new_table,
drop_table,
add_index,
drop_index,
# FIXME: We omit creating or dropping indexes because the random_failures
# tests still haven't been adjusted to work with `rf_rack_valid_keyspaces`.
# That option is a requirement for using materialized views
# in tablet-based keyspaces, so let's skip them.
#
# add_index,
# drop_index,
add_new_keyspace,
drop_keyspace,
add_cdc,

View File

@@ -25,6 +25,7 @@ run_first:
skip_in_release:
- test_raft_cluster_features
- test_cluster_features
- dtest/limits_test
skip_in_debug:
- test_shutdown_hang
- test_replace

View File

@@ -14,10 +14,16 @@ from typing import Any
from test.cluster.conftest import skip_mode
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetrics
# main logger
logger = logging.getLogger(__name__)
async def get_metrics(manager: ManagerClient, servers: list[ServerInfo]) -> list[ScyllaMetrics]:
return await asyncio.gather(*[manager.metrics.query(s.ip_addr) for s in servers])
def get_io_read_ops(metrics: list[ScyllaMetrics]) -> int:
return int(sum([m.get("scylla_io_queue_total_read_ops") for m in metrics]))
async def live_update_config(manager: ManagerClient, servers: list[ServerInfo], key: str, value: Any):
cql, hosts = await manager.get_ready_cql(servers)
await asyncio.gather(*[manager.server_update_config(s.server_id, key, value) for s in servers])
@@ -98,7 +104,9 @@ async def test_bti_index_enable(manager: ManagerClient) -> None:
async def test_bti_usage_during_reads(should_use_bti: bool, use_cache: bool):
select = select_with_cache if use_cache else select_without_cache
metrics_before = await get_metrics(manager, servers)
select_result = cql.execute(select, (chosen_pk, chosen_ck), trace=True)
metrics_after = await get_metrics(manager, servers)
row = select_result.one()
assert row.pk == chosen_pk
assert row.ck == chosen_ck
@@ -113,14 +121,25 @@ async def test_bti_index_enable(manager: ManagerClient) -> None:
seen_partitions = seen_partitions or "Partitions.db" in event.description
seen_rows = seen_rows or "Rows.db" in event.description
seen_index = seen_index or "Index.db" in event.description
if should_use_bti:
assert not seen_index, "Index.db was used despite BTI preference"
assert seen_partitions, "Partitions.db was not used despite BTI preference"
assert seen_rows, "Rows.db was not used despite BTI preference"
else:
assert seen_index, "Index.db was not used despite BIG preference"
assert not seen_partitions, "Partitions.db was used despite BIG preference"
assert not seen_rows, "Rows.db was used despite BIG preference"
if not use_cache:
if should_use_bti:
assert not seen_index, "Index.db was used despite BTI preference"
assert seen_partitions, "Partitions.db was not used despite BTI preference"
assert seen_rows, "Rows.db was not used despite BTI preference"
else:
assert seen_index, "Index.db was not used despite BIG preference"
assert not seen_partitions, "Partitions.db was used despite BIG preference"
assert not seen_rows, "Rows.db was used despite BIG preference"
# Test that BYPASS CACHE does force disk reads.
io_read_ops = get_io_read_ops(metrics_after) - get_io_read_ops(metrics_before)
if should_use_bti:
# At least one read for Partitions.db, Rows.db, Data.db
assert io_read_ops >= 3
else:
# At least one read in Index.db (main index), Index.db (promoted index), Data.db
assert io_read_ops >= 3
logger.info("Step 3: Checking for BTI files (should not exist, because cluster feature is suppressed)")
await test_files_presence(bti_should_exist=False, big_should_exist=True)
@@ -143,7 +162,10 @@ async def test_bti_index_enable(manager: ManagerClient) -> None:
await asyncio.gather(*[manager.api.keyspace_upgrade_sstables(s.ip_addr, ks_name) for s in servers])
logger.info("Step 7: Checking for BTI files (should exist)")
await test_files_presence(bti_should_exist=True, big_should_exist=False)
await test_bti_usage_during_reads(should_use_bti=True, use_cache=False)
await test_bti_usage_during_reads(should_use_bti=True, use_cache=True)
# Test that BYPASS CACHE does its thing.
for _ in range(3):
await test_bti_usage_during_reads(should_use_bti=True, use_cache=False)
await test_bti_usage_during_reads(should_use_bti=True, use_cache=True)
manager.driver_close()

View File

@@ -655,7 +655,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
assert read1 == 0
assert skip2 == 0
assert read2 > 0
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=regular.*', check1)
await do_repair_and_check(None, 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check1)
def check2(skip1, read1, skip2, read2):
assert skip1 == skip2
@@ -665,7 +665,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
def check3(skip1, read1, skip2, read2):
assert skip1 < skip2
assert read1 == read2
await do_repair_and_check('regular', 1, rf'Starting tablet repair by API .* incremental_mode=regular.*', check3)
await do_repair_and_check('incremental', 1, rf'Starting tablet repair by API .* incremental_mode=incremental.*', check3)
def check4(skip1, read1, skip2, read2):
assert skip1 == skip2

View File

@@ -236,15 +236,20 @@ async def test_can_restart(manager: ManagerClient, raft_op_timeout: int) -> None
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', raft_op_timeout)
for srv in servers))
logger.info(f"Restarting {servers}")
for idx, srv in enumerate(servers):
logger.info(f"Restarting {servers[:2]} with no group 0 quorum")
for idx, srv in enumerate(servers[:2]):
await manager.server_start(srv.server_id)
# Make sure that the first two nodes restart without group 0 quorum.
if idx < 2:
with pytest.raises(Exception, match="raft operation \\[read_barrier\\] timed out, "
"there is no raft quorum, total voters count 5, "
f"alive voters count {idx + 1}"):
await read_barrier(manager.api, srv.ip_addr)
else:
with pytest.raises(Exception, match="raft operation \\[read_barrier\\] timed out, "
"there is no raft quorum, total voters count 5, "
f"alive voters count {idx + 1}"):
await read_barrier(manager.api, srv.ip_addr)
# Increase the timeout back to 300s to ensure the new group 0 leader is elected before the first read barrier below
# times out.
await asyncio.gather(*(manager.server_update_config(srv.server_id, 'group0_raft_op_timeout_in_ms', 300000)
for srv in servers))
logger.info(f"Restarting {servers[2:]} with group 0 quorum")
for srv in servers[2:]:
await manager.server_start(srv.server_id)
await read_barrier(manager.api, srv.ip_addr)

View File

@@ -307,12 +307,23 @@ async def test_alter_base_schema_while_build_in_progress(manager: ManagerClient,
@pytest.mark.asyncio
@skip_mode("release", "error injections are not supported in release mode")
async def test_change_rf_while_build_in_progress(manager: ManagerClient, change: str):
node_count = 4
servers = await manager.servers_add(node_count, config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
if change == "increase":
node_count = 2
rack_layout = ["rack1", "rack2"]
elif change == "decrease":
node_count = 3
rack_layout = ["rack1", "rack1", "rack2"]
else:
assert False
property_file = [{"dc": "dc1", "rack": rack} for rack in rack_layout]
servers = await manager.servers_add(node_count, config={"enable_tablets": "true"}, cmdline=cmdline_loggers,
property_file=property_file)
cql, _ = await manager.get_ready_cql(servers)
await disable_tablet_load_balancing_on_all_servers(manager)
rf = 3
rf = node_count - 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'enabled': true}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
await populate_base_table(cql, ks, "tab")
@@ -326,7 +337,7 @@ async def test_change_rf_while_build_in_progress(manager: ManagerClient, change:
await wait_for_some_view_build_tasks_to_get_stuck(manager, marks)
new_rf = rf + 1 if change == "increase" else rf - 1
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'datacenter1': {new_rf}}}")
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {new_rf}}}")
await unpause_view_building_tasks(manager)
@@ -337,8 +348,18 @@ async def test_change_rf_while_build_in_progress(manager: ManagerClient, change:
@pytest.mark.asyncio
@skip_mode("release", "error injections are not supported in release mode")
async def test_node_operation_during_view_building(manager: ManagerClient, operation: str):
node_count = 4 if operation == "remove" or operation == "decommission" else 3
servers = await manager.servers_add(node_count, config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
if operation == "remove" or operation == "decommission":
node_count = 4
rack_layout = ["rack1", "rack2", "rack3", "rack3"]
else:
node_count = 3
rack_layout = ["rack1", "rack2", "rack3"]
property_file = [{"dc": "dc1", "rack": rack} for rack in rack_layout]
servers = await manager.servers_add(node_count, config={"enable_tablets": "true"},
cmdline=cmdline_loggers,
property_file=property_file)
cql, _ = await manager.get_ready_cql(servers)
await disable_tablet_load_balancing_on_all_servers(manager)
@@ -354,7 +375,8 @@ async def test_node_operation_during_view_building(manager: ManagerClient, opera
await wait_for_some_view_build_tasks_to_get_stuck(manager, marks)
if operation == "add":
await manager.server_add(config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
property_file = servers[-1].property_file()
await manager.server_add(config={"enable_tablets": "true"}, cmdline=cmdline_loggers, property_file=property_file)
node_count = node_count + 1
elif operation == "remove":
await manager.server_stop_gracefully(servers[-1].server_id)
@@ -364,9 +386,11 @@ async def test_node_operation_during_view_building(manager: ManagerClient, opera
await manager.decommission_node(servers[-1].server_id)
node_count = node_count - 1
elif operation == "replace":
property_file = servers[-1].property_file()
await manager.server_stop_gracefully(servers[-1].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[-1].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg, config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, cmdline=cmdline_loggers)
await manager.server_add(replace_cfg, config={"enable_tablets": "true"}, cmdline=cmdline_loggers,
property_file=property_file)
await unpause_view_building_tasks(manager)
await wait_for_view(cql, 'mv_cf_view', node_count)

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "auth/common.hh"
#include "auth/standard_role_manager.hh"
#include "auth/ldap_role_manager.hh"
#include "auth/password_authenticator.hh"
@@ -407,7 +408,7 @@ SEASTAR_TEST_CASE(ldap_delegates_query_all) {
auto m = make_ldap_manager(env);
m->start().get();
create_ldap_roles(env, *m);
const auto roles = m->query_all().get();
const auto roles = m->query_all(auth::internal_distributed_query_state()).get();
BOOST_REQUIRE_EQUAL(1, roles.count("role1"));
BOOST_REQUIRE_EQUAL(1, roles.count("role2"));
BOOST_REQUIRE_EQUAL(1, roles.count("jsmith"));
@@ -442,7 +443,7 @@ SEASTAR_TEST_CASE(ldap_delegates_attributes) {
do_with_mc(env, [&] (service::group0_batch& b) {
m->create("r", auth::role_config{}, b).get();
});
BOOST_REQUIRE(!m->get_attribute("r", "a").get());
BOOST_REQUIRE(!m->get_attribute("r", "a", auth::internal_distributed_query_state()).get());
do_with_mc(env, [&] (service::group0_batch& b) {
m->set_attribute("r", "a", "3", b).get();
});
@@ -451,7 +452,7 @@ SEASTAR_TEST_CASE(ldap_delegates_attributes) {
do_with_mc(env, [&] (service::group0_batch& b) {
m->remove_attribute("r", "a", b).get();
});
BOOST_REQUIRE(!m->get_attribute("r", "a").get());
BOOST_REQUIRE(!m->get_attribute("r", "a", auth::internal_distributed_query_state()).get());
});
}

View File

@@ -1098,6 +1098,11 @@ private:
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
_db.local().check_rf_rack_validity(cfg->rf_rack_valid_keyspaces(), _token_metadata.local().get());
// Materialized views and secondary indexes are still restricted and require specific configuration
// options to work. Make sure that if there are existing views or indexes, they don't violate
// the requirements imposed on them.
_db.local().validate_tablet_views_indexes();
utils::loading_cache_config perm_cache_config;
perm_cache_config.max_size = cfg->permissions_cache_max_entries();
perm_cache_config.expiry = std::chrono::milliseconds(cfg->permissions_validity_in_ms());

View File

@@ -424,7 +424,7 @@ def test_repair_keyspace(nodetool):
]},
["error processing arguments: nodetool cluster repair repairs only tablet keyspaces. To repair vnode keyspaces use nodetool repair."])
@pytest.mark.parametrize("mode", ["disabled", "regular", "full"])
@pytest.mark.parametrize("mode", ["disabled", "incremental", "full"])
def test_repair_incremenatal_repair(nodetool, mode):
id1 = "ef1b7a61-66c8-494c-bb03-6f65724e6eee"
res = nodetool("cluster", "repair", "--incremental-mode", mode, "ks", "table1", expected_requests=[

View File

@@ -563,9 +563,9 @@ void cluster_repair_operation(scylla_rest_client& client, const bpo::variables_m
if (vm.contains("incremental-mode")) {
auto mode = vm["incremental-mode"].as<sstring>();
const std::unordered_set<sstring> supported_mode{"disabled", "regular", "full"};
const std::unordered_set<sstring> supported_mode{"disabled", "incremental", "full"};
if (!supported_mode.contains(mode)) {
throw std::invalid_argument("nodetool cluster repair --incremental-mode only supports: disabled, regular, full");
throw std::invalid_argument("nodetool cluster repair --incremental-mode only supports: disabled, incremental, full");
}
repair_params["incremental_mode"] = mode;
}
@@ -3729,7 +3729,7 @@ For more information, see: {}"
typed_option<std::vector<sstring>>("in-dc", "Constrain repair to specific datacenter(s)"),
typed_option<std::vector<sstring>>("in-hosts", "Constrain repair to the specific host(s)"),
typed_option<std::vector<sstring>>("tablet-tokens", "Tokens owned by the tablets to repair."),
typed_option<sstring>("incremental-mode", "Specify the incremental repair mode: disabled, regular, full"),
typed_option<sstring>("incremental-mode", "Specify the incremental repair mode: disabled, incremental, full"),
},
{
typed_option<sstring>("keyspace", "The keyspace to repair, if missing all keyspaces are repaired", 1),

View File

@@ -205,7 +205,7 @@ public:
}
named_value(config_file* file, std::string_view name, liveness liveness_, value_status vs, const T& t = T(), std::string_view desc = {},
std::initializer_list<T> allowed_values = {})
: named_value(file, name, {}, liveness_, vs, t, desc) {
: named_value(file, name, {}, liveness_, vs, t, desc, std::move(allowed_values)) {
}
named_value(config_file* file, std::string_view name, std::string_view alias, value_status vs, const T& t = T(), std::string_view desc = {},
std::initializer_list<T> allowed_values = {})

View File

@@ -235,6 +235,11 @@ void client::group_client::register_metrics(std::string class_name, std::string
sm::description("Total time spend writing data to objects"), {ep_label, sg_label}),
sm::make_counter("total_read_prefetch_bytes", [this] { return prefetch_bytes; },
sm::description("Total number of bytes requested from object"), {ep_label, sg_label}),
sm::make_counter("downloads_blocked_on_memory",
[this] { return downloads_blocked_on_memory; },
sm::description("Counts the number of times S3 client downloads were delayed due to insufficient memory availability"),
{ep_label, sg_label})
});
}
@@ -1142,13 +1147,15 @@ class client::chunked_download_source final : public seastar::data_source_impl {
s3l.trace("Fiber starts cycle for object '{}'", _object_name);
while (!_is_finished) {
try {
if (_buffers_size >= _max_buffers_size * _buffers_low_watermark) {
co_await _bg_fiber_cv.when([this] { return _buffers_size < _max_buffers_size * _buffers_low_watermark; });
if (!_is_finished && _buffers_size >= _max_buffers_size * _buffers_low_watermark) {
co_await _bg_fiber_cv.when([this] { return _is_finished || (_buffers_size < _max_buffers_size * _buffers_low_watermark); });
}
if (auto units = try_get_units(_client->_memory, _socket_buff_size); !_buffers.empty() && !units) {
if (auto units = try_get_units(_client->_memory, _socket_buff_size); !_is_finished && !_buffers.empty() && !units) {
auto& gc = _client->find_or_create_client();
++gc.downloads_blocked_on_memory;
co_await _bg_fiber_cv.when([this] {
return _buffers.empty() || try_get_units(_client->_memory, _socket_buff_size);
return _is_finished || _buffers.empty() || try_get_units(_client->_memory, _socket_buff_size);
});
}

View File

@@ -131,6 +131,7 @@ class client : public enable_shared_from_this<client> {
io_stats read_stats;
io_stats write_stats;
uint64_t prefetch_bytes = 0;
uint64_t downloads_blocked_on_memory = 0;
seastar::metrics::metric_groups metrics;
group_client(std::unique_ptr<http::experimental::connection_factory> f, unsigned max_conn, const aws::retry_strategy& retry_strategy);
void register_metrics(std::string class_name, std::string host);