Compare commits

...

152 Commits

Author SHA1 Message Date
Michał Chojnowski
db0e209a9b sstables/trie: fix an assertion violation in bti_partition_index_writer_impl::write_last_key
_last_key is a multi-fragment buffer.

Some prefix of _last_key (up to _last_key_mismatch) is
unneeded because it's already a part of the trie.
Some suffix of _last_key (after needed_prefix) is unneeded
because _last_key can be differentiated from its neighbors even without it.

The job of write_last_key() is to find the middle fragments,
(containing the range `[_last_key_mismatch, needed_prefix)`)
trim the first and last of the middle fragments appropriately,
and feed them to the trie writer.

But there's an error in the current logic,
in the case where `_last_key_mismatch` falls on a fragment boundary.
To describe it with an example, if the key is fragmented like
`aaa|bbb|ccc`, `_last_key_mismatch == 3`, and `needed_prefix == 7`,
then the intended output to the trie writer is `bbb|c`,
but the actual output is `|bbb|c`. (I.e. the first fragment is empty).

Technically the trie writer could handle empty fragments,
but it has an assertion against them, because they are a questionable thing.

Fix that.

We also extend bti_index_test so that it's able to hit the assert
violation (before the patch). The reason why it wasn't able to do that
before the patch is that the violation requires decorated keys to differ
on the _first_ byte of a partition key column, but the keys generated
by the test only differed on the last byte of the column.
(Because the test was using sequential integers to make the values more
human-readable during debugging). So we modify the key generation
to use random values that can differ on any position.

Fixes scylladb/scylladb#26819

Closes scylladb/scylladb#26839

(cherry picked from commit b82c2aec96)

Closes scylladb/scylladb#26903
2025-11-11 10:40:01 +03:00
Raphael S. Carvalho
4361e41728 sstables_loader: Don't bypass synchronization with busy topology
The patch c543059f86 fixed the synchronization issue between tablet
split and load-and-stream. The synchronization worked only with
raft topology, and therefore was disabled with gossip.
To do the check, storage_service::raft_topology_change_enabled()
but the topology kind is only available/set on shard 0, so it caused
the synchronization to be bypassed when load-and-stream runs on
any shard other than 0.

The reason the reproducer didn't catch it is that it was restricted
to single cpu. It will now run with multi cpu and catch the
problem observed.

Fixes #22707

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes scylladb/scylladb#26730

(cherry picked from commit 7f34366b9d)

Closes scylladb/scylladb#26851
2025-11-11 10:39:39 +03:00
Ran Regev
1857721717 nodetool refresh primary-replica-only
Fixes: #26440

1. Added description to primary-replica-only option
2. Fixed code text to better reflect the constrained cheked in the code
   itself. namely: that both primary replica only and scope must be
applied only if load and steam is applied too, and that they are mutual
exclusive to each other.
Note: when https://github.com/scylladb/scylladb/issues/26584 is
implemented (with #26609) there will be a need to align the docs as
well - namely, primary-replica-only and scope will no longer be
mutual exclusive

Signed-off-by: Ran Regev <ran.regev@scylladb.com>

Closes scylladb/scylladb#26480

(cherry picked from commit aaf53e9c42)

Closes scylladb/scylladb#26906
2025-11-07 16:49:31 +02:00
Pavel Emelyanov
9d997458d7 Merge 'Alternator: allow warning on auth errors before enabling enforcement' from Nadav Har'El
An Alternator user was recently "bit" when switching `alternator_enforce_authorization` from "false" to "true": ְְְAfter the configuration change, all application requests suddenly failed because unbeknownst to the user, their application used incorrect secret keys.

This series introduces a solution for users who want to **safely** switch `alternator_enforce_authorization`  from "false" to "true": Before switching from "false" to "true", the user can temporarily switch a new option, `alternator_warn_authorization`, to true. In this "warn" mode, authentication and authorization errors are counted in metrics (`scylla_alternator_authentication_failures` and `scylla_alternator_authorization_failures`) and logged as WARNings, but the user's application continues to work. The user can use these metrics or log messages to learn of errors in their application's setup, fix them, and only do the switch of `alternator_enforce_authorization` when the metrics or log messages show there are no more errors.

The first patch is the implementation of the the feature - the new configuration option, the metrics and the log messages,  the second patch is a test for the new feature, and the third patch is documentation recommending how to use the warn mode and the associated metrics or log messages to safely switch `alternaor_enforce_authorization` from false to true.

Fixes #25308

This is a feature that users need, so it should probably be backported to live branches.

Closes scylladb/scylladb#25457

* github.com:scylladb/scylladb:
  docs/alternator: explain alternator_warn_authorization
  test/alternator: tests for new auth failure metrics and log messages
  alternator: add alternator_warn_authorization config

(cherry picked from commit 59019bc9a9)

Closes scylladb/scylladb#26894
2025-11-07 16:48:50 +02:00
Botond Dénes
8e7652d986 Merge '[Backport 2025.4] storage_proxy: use gates to track write handlers destruction' from Scylladb[bot]
In [#26408](https://github.com/scylladb/scylladb/pull/26408) a `write_handler_destroy_promise` class was introduced to wait for `abstract_write_response_handler` instances destruction. We strived to minimize the memory footprint of `abstract_write_response_handler`, with `write_handler_destroy_promise`-es we required only a single additional int. It turned our that in some cases a lot of write handlers can be scheduled for deletion at the same time, in such cases the vector can become big and cause 'oversized allocation' seastar warnings.

Another concern with `write_handler_destroy_promise`-es [was that they were more complicated than it was worth](https://github.com/scylladb/scylladb/pull/26408#pullrequestreview-3361001103).

In this commit we replace `write_handler_destroy_promise` with simple gates. One or more gates can be attached to an `abstract_write_response_handler` to wait for its destruction. We use `utils::small_vector` to store the attached gates. The limit 2 was chosen because we expect two gates at the same time in most cases. One is `storage_proxy::_write_handlers_gate`, which is used to wait for all handlers in `cancel_all_write_response_handlers`. Another one can be attached by a caller of `cancel_write_handlers`. Nothing stops several cancel_write_handlers to be called at the same time, but it should be rare.

The `sizeof(utils::small_vector) == 40`, this is `40.0 / 488 * 100 ~ 8%` increase in `sizeof(abstract_write_response_handler)`, which seems acceptable.

Fixes [scylladb/scylladb#26788](https://github.com/scylladb/scylladb/issues/26788)

backport: need to backport to 2025.4 (LWT for tablets release)

- (cherry picked from commit 4578304b76)

- (cherry picked from commit 5bda226ff6)

Parent PR: #26827

Closes scylladb/scylladb#26893

* github.com:scylladb/scylladb:
  storage_proxy: use coroutine::maybe_yield();
  storage_proxy: use gates to track write handlers destruction
2025-11-07 16:48:02 +02:00
Wojciech Mitros
8e9ab7618e mv: don't mark the view as built if the reader produced no partitions
When we build a materialized view we read the entire base table from start to
end to generate all required view udpates. If a view is created while another view
is being built on the same base table, this is optimized - we start generating
view udpates for the new view from the base table rows that we're currently
reading, and we read the missed initial range again after the previous view
finishes building.
The view building progress is only updated after generating view updates for
some read partitions. However, there are scenarios where we'll generate no
view updates for the entire read range. If this was not handled we could
end up in an infinite view building loop like we did in https://github.com/scylladb/scylladb/issues/17293
To handle this, we mark the view as built if the reader generated no partitions.
However, this is not always the correct conclusion. Another scenario where
the reader won't encounter any partitions is when view building is interrupted,
and then we perform a reshard. In this scenario, we set the reader for all
shards to the last unbuilt token for an existing partition before the reshard.
However, this partition may not exist on a shard after reshard, and if there
are also no partitions with higher tokens, the reader will generate no partitions
even though it hasn't finished view building.
Additionally, we already have a check that prevents infinite view building loops
without taking the partitions generated by the reader into account. At the end
of stream, before looping back to the start, we advance current_key to the end
of the built range and check for built views in that range. This handles the case
where the entire range is empty - the conditions for a built view are:
1. the "next_token" is no greater than "first_token" (the view building process
looped back, so we've built all tokens above "first_token")
2. the "current_token" is no less than "first_token" (after looping back, we've
built all tokens below "first_token")

If the range is empty, we'll pass these conditions on an empty range after advancing
"current_key" to the end because:
1. after looping back, "next_token" will be set to `dht::minimum_token`
2. "current_key" will be set to `dht::ring_position::max()`

In this patch we remove the check for partitions generated by the reader. This fixes
the issue with resharding and it does not resurrect the issue with infinite view building
that the check was introduced for.

Fixes https://github.com/scylladb/scylladb/issues/26523

Closes scylladb/scylladb#26635

(cherry picked from commit 0a22ac3c9e)

Closes scylladb/scylladb#26889
2025-11-07 16:46:46 +02:00
Asias He
1288adb3a6 repair: Add metric for time spent on tablet repair
It is useful to check time spent on tablet repair. It can be used to
compare incremental repair and non-incremental repair. The time does not
include the time waiting for the tablet scheduler to schedule the tablet
repair task.

Fixes #26505

Closes scylladb/scylladb#26502

(cherry picked from commit dbeca7c14d)

Closes scylladb/scylladb#26881
2025-11-07 16:45:32 +02:00
Wojciech Mitros
092be62fca view_building_coordinator: rollback tasks on the leaving tablet replica
When a tablet migration is started, we abort the corresponding view
building tasks (i.e. we change the state of those tasks to "ABORTED").
However, we don't change the host and shard of these tasks until the
migration successfully completes. When for some reason we have to
rollback the migration, that means the migration didn't finish and
the aborted task still has the host and shard of the migration
source. So when we recreate tasks that should no longer be aborted
due to a rolled-back migration, we should look at the aborted tasks
of the source (leaving) replica. But we don't do it and we look at
the aborted tasks of the target replica.
In this patch we adjust the rollback mechanism to recreate tasks
for the migration source instead of destination. We also fix the
test that should have detected this issue - the injection that
the test was using didn't make us rollback, but we simply retried
a stage of the tablet migration. By using one_shot=False and adding
a second injection, we can now guarantee that the migration will
eventually fail and we'll continue to the 'cleanup_target' and
'revert_migration' stages.

Fixes https://github.com/scylladb/scylladb/issues/26691

Closes scylladb/scylladb#26825

(cherry picked from commit 977fa91e3d)

Closes scylladb/scylladb#26879
2025-11-07 16:44:04 +02:00
Botond Dénes
c70656c0db Merge '[Backport 2025.4] db/config: Change default SSTable compressor to LZ4WithDictsCompressor' from Scylladb[bot]
`sstable_compression_user_table_options` allows configuring a node-global SSTable compression algorithm for user tables via scylla.yaml. The current default is LZ4Compressor (inherited from Cassandra).

Make LZ4WithDictsCompressor the new default. Metrics from real datasets in the field have shown significant improvements in compression ratios.

If the dictionary compression feature is not enabled in the cluster (e.g., during an upgrade), fall back to the `LZ4Compressor`. Once the feature is enabled, flip the default back to the dictionary compressor using with a listener callback.

Fixes #26610.

- (cherry picked from commit d95ebe7058)

- (cherry picked from commit 96e727d7b9)

- (cherry picked from commit 2fc812a1b9)

- (cherry picked from commit a0bf932caa)

Parent PR: #26697

Closes scylladb/scylladb#26830

* github.com:scylladb/scylladb:
  test/cluster: Add test for default SSTable compressor
  db/config: Change default SSTable compressor to LZ4WithDictsCompressor
  db/config: Deprecate sstable_compression_dictionaries_allow_in_ddl
  boost/cql_query_test: Get expected compressor from config
2025-11-07 16:43:11 +02:00
Botond Dénes
bb142bdc10 Merge '[Backport 2025.4] auth: implement vector store authorization' from Scylladb[bot]
This patch implements the changes required by the Vector Store authorization, as described in https://scylladb.atlassian.net/wiki/spaces/RND/pages/107085899/Vector+Store+Authentication+And+Authorization+To+ScyllaDB, that is:

- adding a new permission VECTOR_SEARCH_INDEXING, grantable only on ALL KEYSPACES
- allowing users with that permission to perform SELECT queries, but only on tables with a vector index
- increasing the number of scheduling groups by one to allow users to create a service level for a vector store user
- adjusting the tests and documentation

These changes are needed, as the vector indexes are managed by the external service, Vector Store, which needs to read the tables to create the indexes in its memory. We would like to limit the privileges of that service to a minimum to maintain the principle of least privilege, therefore a new permission, one that allows the SELECTs conditional on the existence of a vector_index on the table.

Fixes: VECTOR-201
Fixes: https://github.com/scylladb/scylladb/issues/26804

Backport reasoning:
Backport to 2025.4 required as this can make upgrading clusters more difficult if we add it in 2026.1. As for now Scylla Cloud requires version 2025.4 to enable vector search and permission is set by orchestrator so there is no chance that someone will try to add this permission during upgrade. In 2026.1 it will be more difficult.

- (cherry picked from commit ae86bfadac)

- (cherry picked from commit 3025a35aa6)

- (cherry picked from commit 6a69bd770a)

- (cherry picked from commit e8fb745965)

- (cherry picked from commit 3db2e67478)

Parent PR: #25976

Closes scylladb/scylladb#26805

* github.com:scylladb/scylladb:
  docs: adjust docs for VS auth changes
  test: add tests for VECTOR_SEARCH_INDEXING permission
  cql: allow VECTOR_SEARCH_INDEXING users to select
  auth: add possibilty to check for any permission in set
  auth: add a new permission VECTOR_SEARCH_INDEXING
2025-11-07 16:42:33 +02:00
Piotr Dulikowski
b52055b7a1 Merge '[Backport 2025.4] transport: call update_scheduling_group for non-auth connections' from Andrzej Jackowski
This is backport of fix for https://github.com/scylladb/scylladb/issues/26040 and related test (https://github.com/scylladb/scylladb/pull/26589) to 2025.4.

Before this change, unauthorized connections stayed in main
scheduling group. It is not ideal, in such case, rather sl:default
should be used, to have a consistent behavior with a scenario
where users is authenticated but there is no service level assigned
to the user.

This commit adds a call to update_scheduling_group at the end of
connection creation for an unauthenticated user, to make sure the
service level is switched to sl:default.

Fixes: https://github.com/scylladb/scylladb/issues/26040
Fixes: https://github.com/scylladb/scylladb/issues/26581

(cherry picked from commit 278019c328)
(cherry picked from commit 8642629e8e)

No backport, as it's already a backport

Closes scylladb/scylladb#26815

* github.com:scylladb/scylladb:
  test: add test_anonymous_user to test_raft_service_levels
  transport: call update_scheduling_group for non-auth connections
2025-11-07 12:58:43 +01:00
Petr Gusev
5c7a9e89ea storage_proxy: use coroutine::maybe_yield();
This is a small "while at it" refactoring -- better to use
coroutine::maybe_yield with co_await-s.

(cherry picked from commit 5bda226ff6)
2025-11-06 13:04:52 +00:00
Petr Gusev
2e55f9521b storage_proxy: use gates to track write handlers destruction
In #26408 a write_handler_destroy_promise class was introduced to
wait for abstract_write_response_handler instances destruction. We
strived to minimize the memory footprint of
abstract_write_response_handler, with write_handler_destroy_promise-es
we required only a single additional int. It turned our that in some
cases a lot of write handlers can be scheduled for deletion
at the same time, in such cases the
vector<write_handler_destroy_promise> can become big and cause
'oversized allocation' seastar warnings.

Another concern with write_handler_destroy_promise-es was that they
were more complicated than it was worth.

In this commit we replace write_handler_destroy_promise with simple
gates. One or more gates can be attached to an
abstract_write_response_handler to wait for its destruction. We use
utils::small_vector<gate::holder, 2> to store the attached gates.
The limit 2 was chosen because we expect two gates at the same time
in most cases. One is storage_proxy::_write_handlers_gate,
which is used to wait for all handlers in
cancel_all_write_response_handlers. Another one can be attached by
a caller of cancel_write_handlers. Nothing stops several
cancel_write_handlers to be called at the same time, but it should be
rare.

The sizeof(utils::small_vector<gate::holder, 2>) == 40, this is
40.0 / 488 * 100 ~ 8% increase in
sizeof(abstract_write_response_handler), which seems acceptable.

Fixes scylladb/scylladb#26788

(cherry picked from commit 4578304b76)
2025-11-06 13:04:52 +00:00
Nikos Dragazis
64a0efe297 test/cluster: Add test for default SSTable compressor
The previous patch made the default compressor dependent on the
SSTABLE_COMPRESSION_DICTS feature:
* LZ4Compressor if the feature is disabled
* LZ4WithDictsCompressor if the feature is enabled

Add a test to verify that the cluster uses the right default in every
case.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
(cherry picked from commit a0bf932caa)
2025-11-04 15:41:40 +02:00
Nikos Dragazis
3b801f3d80 db/config: Change default SSTable compressor to LZ4WithDictsCompressor
`sstable_compression_user_table_options` allows configuring a
node-global SSTable compression algorithm for user tables via
scylla.yaml. The current default is `LZ4Compressor` (inherited from
Cassandra).

Make `LZ4WithDictsCompressor` the new default. Metrics from real datasets
in the field have shown significant improvements in compression ratios.

If the dictionary compression feature is not enabled in the cluster
(e.g., during an upgrade), fall back to the `LZ4Compressor`. Once the
feature is enabled, flip the default back to the dictionary compressor
using with a listener callback.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
(cherry picked from commit 2fc812a1b9)
2025-11-04 15:41:40 +02:00
Nikos Dragazis
bafe2bbbbc db/config: Deprecate sstable_compression_dictionaries_allow_in_ddl
The option is a knob that allows to reject dictionary-aware compressors
in the validation stage of CREATE/ALTER statements, and in the
validation of `sstable_compression_user_table_options`. It was
introduced in 7d26d3c7cb to allow the admins of Scylla Cloud to
selectively enable it in certain clusters. For more details, check:
https://github.com/scylladb/scylla-enterprise/issues/5435

As of this series, we want to start offering dictionary compression as
the default option in all clusters, i.e., treat it as a generally
available feature. This makes the knob redundant.

Additionally, making dictionary compression the default choice in
`sstable_compression_user_table_options` creates an awkward dependency
with the knob (disabling the knob should cause
`sstable_compression_user_table_options` to fall back to a non-dict
compressor as default). That may not be very clear to the end user.

For these reasons, mark the option as "Deprecated", remove all relevant
tests, and adjust the business logic as if dictionary compression is
always available.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
(cherry picked from commit 96e727d7b9)
2025-11-04 15:40:46 +02:00
Yauheni Khatsianevich
1f5082d1ce tests(lwt): new test for LWT testing during tablet resize
– Workload: N workers perform CAS updates
 UPDATE … SET s{i}=new WHERE pk=? IF (∀j≠i: s{j}>=guard_j) AND s{i}=prev
 at CL=LOCAL_QUORUM / SERIAL=LOCAL_SERIAL. Non-apply without timeout is treated
 as contention; “uncertainty” timeouts are resolved via LOCAL_SERIAL read.
- Enable balancing and increase min_tablet_count to force split,
 flush and lower min_tablet_count to merge.
- “Uncertainty” timeouts (write timeout due to uncertainty) are resolved via a
LOCAL_SERIAL read to determine whether the CAS actually applied.
- Invariants: after the run, for every pk and column s{i}, the stored value
equals the number of confirmed CAS by worker i (no lost or phantom updates)
despite ongoing tablet moves.

Closes scylladb/scylladb#26113

refs: scylladb/qa-tasks#1918
Refs #18068
Fixes #24502 (to satisfy backport rules)

(cherry picked from commit 99dc31e71a)

Closes scylladb/scylladb#26790
2025-11-04 12:47:24 +01:00
Michael Litvak
8f7a6fd5eb cdc: use chunked_vector instead of vector for stream ids
use utils::chunked_vector instead of std::vector to store cdc stream
sets for tablets.

a cdc stream set usually represents all streams for a specific table and
timestamp, and has a stream id per each tablet of the table. each stream
id is represented by 16 bytes. thus the vector could require quite large
contiguous allocations for a table that has many tablets. change it to
chunked_vector to avoid large contiguous allocations.

Fixes scylladb/scylladb#26791

Closes scylladb/scylladb#26792

(cherry picked from commit e7dbccd59e)

Closes scylladb/scylladb#26828
2025-11-04 12:41:30 +01:00
Petr Gusev
0656a73c52 paxos_state: get_replica_lock: remove shard check
This check is incorrect: the current shard may be looking at
the old version of tablets map:
* an accept RPC comes to replica shard 0, which is already at write_both_read_new
* the new shard is shard 1, so paxos_state::accept is called on shard 1
* shard 1 is still at "streaming" -> shards_ready_for_reads() returns old
shard 0

Fixes scylladb/scylladb#26801

Closes scylladb/scylladb#26809

(cherry picked from commit 88765f627a)

Closes scylladb/scylladb#26832
2025-11-02 11:14:48 +01:00
Jenkins Promoter
527124d363 Update pgo profiles - aarch64 2025-11-01 04:55:16 +02:00
Jenkins Promoter
0b30c111d6 Update pgo profiles - x86_64 2025-11-01 04:20:45 +02:00
Nikos Dragazis
260c9972b0 boost/cql_query_test: Get expected compressor from config
Since 5b6570be52, the default SSTable compression algorithm for user
tables is no longer hardcoded; it can be configured via the
`sstable_compression_user_table_options.sstable_compression` option in
scylla.yaml.

Modify the `test_table_compression` test to get the expected value from
the configuration.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
(cherry picked from commit d95ebe7058)
2025-10-31 23:50:20 +00:00
Andrzej Jackowski
1c04ce3415 test: add test_anonymous_user to test_raft_service_levels
The primary goal of this test is to reproduce scylladb/scylladb#26040
so the fix (278019c328) can be backported
to older branches.

Scenario: connect via CQL as an anonymous user and verify that the
`sl:default` scheduling group is used. Before the fix for #26040
`main` scheduling group was incorrectly used instead of `sl:default`.

Control connections may legitimately use `sl:driver`, so the test
accepts those occurrences while still asserting that regular anonymous
queries use `sl:default`.

This adds explicit coverage on master. After scylladb#24411 was
implemented, some other tests started to fail when scylladb#26040
was unfixed. However, none of the tests asserted this exact behavior.

Refs: scylladb/scylladb#26040
Refs: scylladb/scylladb#26581

Closes scylladb/scylladb#26589

(cherry picked from commit 8642629e8e)
2025-10-30 20:10:47 +01:00
Andrzej Jackowski
af8df987f5 transport: call update_scheduling_group for non-auth connections
Before this change, unauthorized connections stayed in `main`
scheduling group. It is not ideal, in such case, rather `sl:default`
should be used, to have a consistent behavior with a scenario
where users is authenticated but there is no service level assigned
to the user.

This commit adds a call to `update_scheduling_group` at the end of
connection creation for an unauthenticated user, to make sure the
service level is switched to `sl:default`.

Fixes: scylladb/scylladb#26040
(cherry picked from commit 278019c328)
2025-10-30 19:34:18 +01:00
Michał Hudobski
09d59de736 docs: adjust docs for VS auth changes
We adjust the documentation to include the new
VECTOR_SEARCH_INDEXING permission and its usage
and also to reflect the changes in the maximal
amount of service levels.

(cherry picked from commit 3db2e67478)
2025-10-30 10:13:16 +00:00
Michał Hudobski
ce04e2cb7d test: add tests for VECTOR_SEARCH_INDEXING permission
This commit adds tests to verify the expected
behavior of the VECTOR_SEARCH_INDEXING permission,
that is, allowing GRANTing this permission only on
ALL KEYSPACES and allowing SELECT queries only on tables
with vector indexes when the user has this permission

(cherry picked from commit e8fb745965)
2025-10-30 10:13:16 +00:00
Michał Hudobski
3367ffa14f cql: allow VECTOR_SEARCH_INDEXING users to select
This patch allows users with the VECTOR_SEARCH_INDEXING permission
to perform SELECT queries on tables that have a vector index.
This is needed for the Vector Store service, which
reads the vector-indexed tables, but does not require
the full SELECT permission.

(cherry picked from commit 6a69bd770a)
2025-10-30 10:13:16 +00:00
Michał Hudobski
ad1e5718ae auth: add possibilty to check for any permission in set
This commit adds a new version of command_desc struct
that contains a set of permissions instead of a singular
permission. When this struct is passed to ensure/check_has_permission,
we check if the user has any of the included permission on the resource.

(cherry picked from commit 3025a35aa6)
2025-10-30 10:13:16 +00:00
Michał Hudobski
27bf0fdf60 auth: add a new permission VECTOR_SEARCH_INDEXING
This patch adds a new permission: VECTOR_SEARCH_INDEXING,
that is grantable only for ALL KEYSPACES. It will allow selecting
from tables with vector search indexes. It is meant to be used
by the Vector Store service to allow it to build indexes without
having full SELECT permissions on the tables.

(cherry picked from commit ae86bfadac)
2025-10-30 10:13:16 +00:00
Pavel Emelyanov
9459a58116 Merge '[Backport 2025.4] cdc: improve cdc metadata loading' from Scylladb[bot]
when loading CDC streams metadata for tablets from the tables, read only
new entries from the history table instead of reading all entries. This
improves the CDC metadata reloading, making it more efficient and
predictable.

the CDC metadata is loaded as part of group0 reload whenever the
internal CDC tables are modified. on tablet split / merge, we create a
new CDC timestamp and streams by writing them to the cdc_streams_history
table by group0 operation, and when it's applied we reload the in-memory
CDC streams map by reading from the tables and constructing the updated map.

Previously, on every update, we would read the entire
cdc_streams_history entries for the changed table, constructing all its
streams and creating a new map from scratch.

We improve this now by reading only new entries from cdc_streams_history
and append them to the existing map. we can do this because we only
append new entries to cdc_streams_history with higher timestamp than all
previous entries.

This makes this reloading more efficient and predictable, because
previously we would read a number of entries that depends on the number
of tablets splits and merges, which increases over time and is
unbounded, whereas now we read only a single stream set on each update.

Fixes https://github.com/scylladb/scylladb/issues/26732

backport to 2025.4 where cdc with tablets is introduced

- (cherry picked from commit 8743422241)

- (cherry picked from commit 4cc0a80b79)

Parent PR: #26160

Closes scylladb/scylladb#26798

* github.com:scylladb/scylladb:
  test: cdc: extend cdc with tablets tests
  cdc: improve cdc metadata loading
2025-10-30 10:32:27 +03:00
Michael Litvak
59f97d0b71 test: cdc: extend cdc with tablets tests
extend and improve the tests of virtual tables for cdc with tablets.
split the existing virtual tables test to one test that validates the
virtual tables against the internal cdc tables, and triggering some
tablet splits in order to create entries in the cdc_streams_history
table, and add another test with basic validation of the virtual tables
when there are multiple cdc tables.

(cherry picked from commit 4cc0a80b79)
2025-10-30 02:44:47 +00:00
Michael Litvak
0a07c2cb19 cdc: improve cdc metadata loading
when loading CDC streams metadata for tablets from the tables, read only
new entries from the history table instead of reading all entries. This
improves the CDC metadata reloading, making it more efficient and
predictable.

the CDC metadata is loaded as part of group0 reload whenever the
internal CDC tables are modified. on tablet split / merge, we create a
new CDC timestamp and streams by writing them to the cdc_streams_history
table by group0 operation, and when it's applied we reload the in-memory
CDC streams map by reading from the tables and constructing the updated map.

Previously, on every update, we would read the entire
cdc_streams_history entries for the changed table, constructing all its
streams and creating a new map from scratch.

We improve this now by reading only new entries from cdc_streams_history
and append them to the existing map. we can do this because we only
append new entries to cdc_streams_history with higher timestamp than all
previous entries.

This makes this reloading more efficient and predictable, because
previously we would read a number of entries that depends on the number
of tablets splits and merges, which increases over time and is
unbounded, whereas now we read only a single stream set on each update.

Fixes scylladb/scylladb#26732

(cherry picked from commit 8743422241)
2025-10-30 02:44:47 +00:00
Pavel Emelyanov
080c55a115 lister: Fix race between readdir and stat
Sometimes file::list_directory() returns entries without type set. In
thase case lister calls file_type() on the entry name to get it. In case
the call returns disengated type, the code assumes that some error
occurred and resolves into exception.

That's not correct. The file_type() method returns disengated type only
if the file being inspected is missing (i.e. on ENOENT errno). But this
can validly happen if a file is removed bettween readdir and stat. In
that case it's not "some error happened", but a enry should be just
skipped. In "some error happened", then file_type() would resolve into
exceptional future on its own.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#26595

(cherry picked from commit d9bfbeda9a)

Closes scylladb/scylladb#26767
2025-10-29 11:29:57 +02:00
Anna Stuchlik
93de570e33 doc: add --list-active-releases to Web Installer
Fixes https://github.com/scylladb/scylladb/issues/26688

V2 of https://github.com/scylladb/scylladb/pull/26687

Closes scylladb/scylladb#26689

(cherry picked from commit bd5b966208)

Closes scylladb/scylladb#26765
2025-10-29 11:28:51 +02:00
Patryk Jędrzejczak
680bfa9ab7 test: test_raft_recovery_stuck: reconnect driver after rolling restarts
It turns out that #21477 wasn't sufficient to fix the issue. The driver
may still decide to reconnect the connection after `rolling_restart`
returns. One possible explanation is that the driver sometimes handles
the DOWN notification after all nodes consider each other UP.

Reconnecting the driver after restarting nodes seems to be a reliable
workaround that many tests use. We also use it here.

Fixes #19959

Closes scylladb/scylladb#26638

(cherry picked from commit 5321720853)

Closes scylladb/scylladb#26763
2025-10-29 11:27:49 +02:00
Anna Stuchlik
ed58815199 doc: add OS support for version 2025.4
Fixes https://github.com/scylladb/scylladb/issues/26450

Closes scylladb/scylladb#26616

(cherry picked from commit 6fa342fb18)

Closes scylladb/scylladb#26750
2025-10-29 11:27:08 +02:00
Botond Dénes
9c8812a154 Merge '[Backport 2025.4] LWT: use shards_ready_for_reads for replica locks' from Scylladb[bot]
When a tablet is migrated between shards on the same node, during the write_both_read_new state we begin switching reads to the new shard. Until the corresponding global barrier completes, some requests may still use write_both_read_old erm, while others already use the write_both_read_new erm. To ensure mutual exclusion between these two types of requests, we must acquire locks on both the old and new shards. Once the global barrier completes, no requests remain on the old shard, so we can safely switch to acquiring locks only on the new shard.

The idea came from the similar locking problem in the [counters for tablets PR](https://github.com/scylladb/scylladb/pull/26636#discussion_r2463932395).

Fixes scylladb/scylladb#26727

backport: need to backport to 2025.4

- (cherry picked from commit 5ab2db9613)

- (cherry picked from commit 478f7f545a)

Parent PR: #26719

Closes scylladb/scylladb#26748

* github.com:scylladb/scylladb:
  paxos_state: use shards_ready_for_reads
  paxos_state: inline shards_for_writes into get_replica_lock
2025-10-29 11:26:29 +02:00
Botond Dénes
aac49601c6 Merge '[Backport 2025.4] cdc: garbage collect CDC streams for tablets' from Scylladb[bot]
introduce helper functions that can be used for garbage collecting old
cdc streams for tablets-based keyspaces.

add a background fiber to the topology coordinator that runs
periodically and checks for old CDC streams for tablets keyspaces that
can be garbage collected.

the garbage collection works by finding the newest cdc timestamp that has been
closed for more than the configured cdc TTL, and removing all information from
the cdc internal tables about cdc timestamps and streams up to this timestamp.

in general it should be safe to remove information about these streams because
they are closed for more than TTL, therefore all rows that were written to these streams
with the configured TTL should be dead.
the exception is if the TTL is altered to a smaller value, and then we may remove information
about streams that still have live rows that were written with the longer ttl.

Fixes https://github.com/scylladb/scylladb/issues/26669

- (cherry picked from commit 440caeabcb)

- (cherry picked from commit 6109cb66be)

Parent PR: #26410

Closes scylladb/scylladb#26728

* github.com:scylladb/scylladb:
  cdc: garbage collect CDC streams periodically
  cdc: helpers for garbage collecting old streams for tablets
2025-10-29 11:25:31 +02:00
Asias He
89364d3576 repair: Remove the regular mode name in the tablet repair api
The patch e34deb72f9 (repair: Rename incremental mode name)
missed one place that references the removed regular mode name.

Fixes #26503

Closes scylladb/scylladb#26660

(cherry picked from commit 5f1febf545)

Closes scylladb/scylladb#26684
2025-10-29 11:22:56 +02:00
Anna Stuchlik
68ea778b6b doc: add support for Debian 12
Fixes https://github.com/scylladb/scylladb/issues/26640

Closes scylladb/scylladb#26668

(cherry picked from commit 9c0ff7c46b)

Closes scylladb/scylladb#26681
2025-10-29 11:22:29 +02:00
Botond Dénes
087f739bf9 Merge '[Backport 2025.4] alternator/executor: instantly mark view as built when creating it with base table' from Scylladb[bot]
`CreateTable` request creates GSI/LSI together with the base table,
the base table is empty and we don't need to actually build the view.

In tablet-based keyspaces we can just don't create view building tasks
and mark the view build status as SUCCESS on all nodes. Then, the view
building worker on each node will mark the view as built in
`system.built_views` (`view_building_worker::update_built_views()`).

Vnode-based keyspaces will use the "old" logic of view builder, which
will process the view and mark it as built.

Fixes scylladb/scylladb#26615

This fix should be backported to 2025.4.

- (cherry picked from commit 8fbf122277)

- (cherry picked from commit bdab455cbb)

- (cherry picked from commit 34503f43a1)

Parent PR: #26657

Closes scylladb/scylladb#26670

* github.com:scylladb/scylladb:
  test/alternator/test_tablets: add test for GSI backfill with tablets
  test/alternator/test_tablets: add reproducer for GSI with tablets
  alternator/executor: instantly mark view as built when creating it with base table
2025-10-29 11:21:27 +02:00
Petr Gusev
332b776e87 paxos_state: use shards_ready_for_reads
Acquiring locks on both shards for the entire tablet migration period
is redundant. In most cases, locking only the old shard or only the new
shard is sufficient. Using shards_ready_for_reads reduces the
situations in which we need to lock both shards to:
* intra-node migrations only
* only during the write_both_read_new state
Once the global barrier completes in the write_both_read_new state, no
requests remain on the old shard, so we can safely acquire locks
only on the new shard.

Fixes scylladb/scylladb#26727

(cherry picked from commit 478f7f545a)
2025-10-28 16:59:47 +00:00
Petr Gusev
ff0e7ac853 paxos_state: inline shards_for_writes into get_replica_lock
No need to have two functions since both callers of get_replica_lock()
use shards_for_writes() to compute the shards where the locks
must be acquired.

Also while at it, inline the acquire() lambda in get_replica_lock()
and replace it with a loop over shards. This makes the code
more strightforward.

(cherry picked from commit 5ab2db9613)
2025-10-28 16:59:47 +00:00
Michael Litvak
5319759bdb cdc: garbage collect CDC streams periodically
add a background fiber to the topology coordinator that runs
periodically and checks for old CDC streams for tablets keyspaces that
can be garbage collected.

(cherry picked from commit 6109cb66be)
2025-10-27 19:53:04 +00:00
Michael Litvak
55d9d5e7c2 cdc: helpers for garbage collecting old streams for tablets
introduce helper functions that can be used for garbage collecting old
cdc streams for tablets-based keyspaces.

- get_new_base_for_gc: finds a new base timestamp given a TTL, such that
  all older timestamps and streams can be removed.
- get_cdc_stream_gc_mutations: given new base timestamp and streams,
  builds mutations that update the internal cdc tables and remove the
  older streams.
- garbage_collect_cdc_streams_for_table: combines the two functions
  above to find a new base and build mutations to update it for a
  specific table
- garbage_collect_cdc_streams: builds gc mutations for all cdc tables

(cherry picked from commit 440caeabcb)
2025-10-27 19:53:04 +00:00
Jenkins Promoter
7f08d0a6cf Update ScyllaDB version to: 2025.4.0-rc4 2025-10-27 14:57:11 +02:00
Patryk Jędrzejczak
c406e1dd17 Merge '[Backport 2025.4] raft topology: fix group0 tombstone GC in the Raft-based recovery procedure' from Scylladb[bot]
Group0 tombstone GC considers only the current group 0 members
while computing the group 0 tombstone GC time. It's not enough
because in the Raft-based recovery procedure, there can be nodes
that haven't joined the current group 0 yet, but they have belonged
to a different group 0 and thus have a non-empty group 0 state ID.
The current code can cause a data resurrection in group 0 tables.

We fix this issue in this PR and add a regression test.

This issue was uncovered by `test_raft_recovery_entry_loss`, which
became flaky recently. We skipped this test for now. We will unskip
it in a following PR because it's skipped only on master, while we
want to backport this PR.

Fixes #26534

This PR contains an important bugfix, so we should backport it
to all branches with the Raft-based recovery procedure (2025.2
and newer).

- (cherry picked from commit 1d09b9c8d0)

- (cherry picked from commit 6b2e003994)

- (cherry picked from commit c57f097630)

Parent PR: #26612

Closes scylladb/scylladb#26682

* https://github.com/scylladb/scylladb:
  test: test group0 tombstone GC in the Raft-based recovery procedure
  group0_state_id_handler: remove unused group0_server_accessor
  group0_state_id_handler: consider state IDs of all non-ignored topology members
2025-10-27 10:15:49 +01:00
Avi Kivity
e85ab70054 Merge '[Backport 2025.4] tablet_metadata_guard: fix split/merge handling' from Petr Gusev
The guard should stop refreshing the ERM when the number of tablets changes. Tablet splits or merges invalidate the tablet_id field (_tablet), which means the guard can no longer correctly protect ongoing operations from tablet migrations.

The problem is specific to LWT, since tablet_metadata_guard is used mostly for heavy topology operations, which exclude with split and merge. The guard was used for LWT as an optimization -- we don't need to block topology operations or migrations of unrelated tablets. In the future, we could use the guard for regular reads/writes as well (via the token_metadata_guard wrapper).

Fixes https://github.com/scylladb/scylladb/issues/26437

backports: need to backport to 2025.4 since the bug is relevant to LWT over tablets.

(cherry picked from commit e1667afa50)

(cherry picked from commit 6f4558ed4b)

(cherry picked from commit 64ba427b85)

(cherry picked from commit ec6fba35aa)

(cherry picked from commit b23f2a2425)

(cherry picked from commit 33e9ea4a0f)

(cherry picked from commit 03d6829783)

Parent PR: https://github.com/scylladb/scylladb/pull/26619

Closes scylladb/scylladb#26700

* github.com:scylladb/scylladb:
  test_tablets_lwt: add test_tablets_merge_waits_for_lwt
  test.py: add universalasync_typed_wrap
  tablet_metadata_guard: fix split/merge handling
  tablet_metadata_guard: add debug logs
  paxos_state: shards_for_writes: improve the error message
  storage_service: barrier_and_drain – change log level to info
  topology_coordinator: fix log message
2025-10-24 21:22:49 +03:00
Petr Gusev
41f8f6b571 test_tablets_lwt: add test_tablets_merge_waits_for_lwt
(cherry picked from commit 03d6829783)
2025-10-24 12:22:20 +02:00
Petr Gusev
31e4bb1bc3 test.py: add universalasync_typed_wrap
The universalasync.wrap function doesn't preserve the
type information, which confuses the VS Code Pylance
plugin and makes code navigation hard.

In this commit we fix the problem by adding a typed
wrapped around universalasync.wrap.

Fixes: scylladb/scylladb#26639
(cherry picked from commit 33e9ea4a0f)
2025-10-24 12:21:21 +02:00
Petr Gusev
be94aab207 tablet_metadata_guard: fix split/merge handling
The guard should stop refreshing the ERM when the number of tablets
changes. Tablet splits or merges invalidate the tablet_id field
(_tablet), which means the guard can no longer correctly protect
ongoing operations from tablet migrations.

Fixes scylladb/scylladb#26437

(cherry picked from commit b23f2a2425)
2025-10-24 12:21:21 +02:00
Petr Gusev
a5be65785c tablet_metadata_guard: add debug logs
(cherry picked from commit ec6fba35aa)
2025-10-24 12:21:21 +02:00
Petr Gusev
5720dd52b8 paxos_state: shards_for_writes: improve the error message
Add the current token and tablet info, remove 'this_shard_id'
since it's always written by the logging infrastructure.

(cherry picked from commit 64ba427b85)
2025-10-24 12:21:21 +02:00
Petr Gusev
aa2021888c storage_service: barrier_and_drain – change log level to info
Debugging global barrier issues is difficult without these logs.
Since barriers do not occur frequently, increasing the log level should not produce excessive output.

(cherry picked from commit 6f4558ed4b)
2025-10-24 12:21:21 +02:00
Petr Gusev
a09c1b355e topology_coordinator: fix log message
(cherry picked from commit e1667afa50)
2025-10-24 12:21:21 +02:00
Pawel Pery
67e0c8e4b0 vector_search: fix flaky dns_refresh_aborted test
The test process like that:
- run long dns refresh process
- request for the resolve hostname with short abort_source timer - result
  should be empty list, because of aborted request

The test sometimes finishes long dns refresh before abort_source fired and the
result list is not empty.

There are two issues. First, as.reset() changes the abort_source timeout. The
patch adds a get() method to the abort_source_timeout class, so there is no
change in the abort_source timeout. Second, a sleep could be not reliable. The
patch changes the long sleep inside a dns refresh lambda into
condition_variable handling, to properly signal the end of the dns refresh
process.

Fixes: #26561
Fixes: VECTOR-268

It needs to be backported to 2025.4

Closes scylladb/scylladb#26566

(cherry picked from commit 10208c83ca)

Closes scylladb/scylladb#26598
2025-10-23 11:24:32 +02:00
Piotr Dulikowski
03d57bae80 Merge '[Backport 2025.4] storage_proxy: wait for write handlers destruction' from Scylladb[bot]
`shared_ptr<abstract_write_response_handler>` instances are captured in the `lmutate` and `rmutate` lambdas of `send_to_live_endpoints()`. As a result, an `abstract_write_response_handler` object may outlive its removal from the `storage_proxy::_response_handlers` map -> `cancel_all_write_response_handlers()` doesn't actually wait for requests completion -> `sp::drain_on_shutdown()` doesn't guarantee all requests are drained -> `sp::stop_remote()` completes too early and `paxos_store` is destroyed while LWT local writes might still be in progress. In this PR we introduce a `write_handler_destroy_promise` to wait for such pending instances in `cancel_write_handlers()` and `cancel_all_write_response_handlers()` to prevent the `use-after-free`.

A better long-term solution might be to replace `shared_ptr` with `unique_ptr` for `abstract_write_response_handler` and use a separate gate to track the `lmutate/rmutate` lambdas. We do not actually need to wait for these lambdas to finish before sending a timeout or error response to the client, as we currently do in `~abstract_write_response_handler`.

Fixes scylladb/scylladb#26355

backport: need to be backported to 2025.4 since #26355 is reproduced on LWT over tablets

- (cherry picked from commit bf2ac7ee8b)

- (cherry picked from commit b269f78fa6)

- (cherry picked from commit bbcf3f6eff)

- (cherry picked from commit 8925f31596)

Parent PR: #26408

Closes scylladb/scylladb#26658

* github.com:scylladb/scylladb:
  test_tablets_lwt: add test_lwt_shutdown
  storage_proxy: wait for write handler destruction
  storage_proxy: coroutinize cancel_write_handlers
  storage_proxy: cancel_write_handlers: don't hold a strong pointer to handler
2025-10-23 10:49:52 +02:00
Patryk Jędrzejczak
76560ca095 test: test group0 tombstone GC in the Raft-based recovery procedure
We add a regression test for the bug fixed in the previous commits.

(cherry picked from commit c57f097630)
2025-10-22 17:13:34 +00:00
Patryk Jędrzejczak
8a11535a12 group0_state_id_handler: remove unused group0_server_accessor
It became unused in the previous commit.

(cherry picked from commit 6b2e003994)
2025-10-22 17:13:34 +00:00
Patryk Jędrzejczak
d727a086c5 group0_state_id_handler: consider state IDs of all non-ignored topology members
It's not enough to consider only the current group 0 members. In the
Raft-based recovery procedure, there can be nodes that haven't joined
the current group 0 yet, but they have belonged to a different group 0
and thus have a non-empty group 0 state ID.

We fix this issue in this commit by considering topology members
instead.

We don't consider ignored nodes as an optimization. When some nodes are
dead, the group 0 state ID handler won't have to wait until all these
nodes leave the cluster. It will only have to wait until all these nodes
are ignored, which happens at the beginning of the first
removenode/replace. As a result, tombstones of group 0 tables will be
purged much sooner.

We don't rename the `group0_members` variable to keep the change
minimal. There seems to be no precise and succinct name for the used set
of nodes anyway.

We use `std::ranges::join_view` in one place because:
- `std::ranges::concat` will become available in C++26,
- `boost::range::join` is not a good option, as there is an ongoing
  effort to minimize external dependencies in Scylla.

(cherry picked from commit 1d09b9c8d0)
2025-10-22 17:13:34 +00:00
Andrei Chekun
d1274f01aa test.py: rewrite the wait_for_first_completed
Rewrite wait_for first_completed to return only first completed task guarantee
of awaiting(disappearing) all cancelled and finished tasks
Use wait_for_first_completed to avoid false pass tests in the future and issues
like #26148
Use gather_safely to await tasks and removing warning that coroutine was
not awaited

Closes scylladb/scylladb#26435

(cherry picked from commit 24d17c3ce5)

Closes scylladb/scylladb#26663
2025-10-22 18:12:52 +02:00
Michael Litvak
aa2065fe2e storage_service: improve colocated repair error to show table names
When requesting repair for tablets of a colocated table, the request
fails with an error. Improve the error message to show the table names
instead of table IDs, because the table names are more useful for users.

Fixes scylladb/scylladb#26567

Closes scylladb/scylladb#26568

(cherry picked from commit b808d84d63)

Closes scylladb/scylladb#26624
2025-10-22 15:25:15 +02:00
Asias He
5c7eb2ac61 repair: Fix uuid and nodes_down order in the log
Fixes #26536

Closes scylladb/scylladb#26547

(cherry picked from commit 33bc1669c4)

Closes scylladb/scylladb#26630
2025-10-22 14:25:18 +02:00
Tomasz Grabiec
0621a8aee5 Merge '[Backport 2025.4] Synchronize tablet split and load-and-stream' from Scylladb[bot]
Load-and-stream is broken when running concurrently to the finalization step of tablet split.

Consider this:
1) split starts
2) split finalization executes barrier and succeed
3) load-and-stream runs now, starts writing sstable (pre-split)
4) split finalization publishes changes to tablet metadata
5) load-and-stream finishes writing sstable
6) sstable cannot be loaded since it spans two tablets

two possible fixes (maybe both):

1) load-and-stream awaits for topology to quiesce
2) perform split compaction on sstable that spans both sibling tablets

This patch implements # 1. By awaiting for topology to quiesce,
we guarantee that load-and-stream only starts when there's no
chance coordinator is handling some topology operation like
split finalization.

Fixes https://github.com/scylladb/scylladb/issues/26455.

- (cherry picked from commit 3abc66da5a)

- (cherry picked from commit 4654cdc6fd)

Parent PR: #26456

Closes scylladb/scylladb#26651

* github.com:scylladb/scylladb:
  test: Add reproducer for l-a-s and split synchronization issue
  sstables_loader: Synchronize tablet split and load-and-stream
2025-10-22 14:23:04 +02:00
Jenkins Promoter
10db3f7c85 Update ScyllaDB version to: 2025.4.0-rc3 2025-10-22 14:11:52 +03:00
Michał Jadwiszczak
f6dde0aa4b test/alternator/test_tablets: add test for GSI backfill with tablets
The test should pass without the fix for scylladb/scylladb#26615,
because the `executor::updata_table()` uses
`service::prepare_new_view_announcement()`, which creates view building
tasks for the view.

But it's better to add this test.

(cherry picked from commit 34503f43a1)
2025-10-22 10:51:55 +00:00
Michał Jadwiszczak
207c273b29 test/alternator/test_tablets: add reproducer for GSI with tablets
(cherry picked from commit bdab455cbb)
2025-10-22 10:51:54 +00:00
Michał Jadwiszczak
6df48aacd7 alternator/executor: instantly mark view as built when creating it with base table
`CreateTable` request creates GSI/LSI together with the base table,
the base table is empty and we don't need to actually build the view.

In tablet-based keyspaces we can just don't create view building tasks
and mark the view build status as SUCCESS on all nodes. Then, the view
building worker on each node will mark the view as built in
`system.built_views` (`view_building_worker::update_built_views()`).

Vnode-based keyspaces will use the "old" logic of view builder, which
will process the view and mark it as built.

Fixes scylladb/scylladb#26615

(cherry picked from commit 8fbf122277)
2025-10-22 10:51:54 +00:00
Pavel Emelyanov
45341ca246 Merge '[Backport 2025.4] s3_client: handle failures which require http::request updating' from Scylladb[bot]
Apply two main changes to the s3_client error handling
1. Add a loop to s3_client's `make_request` for the case whe the retry strategy will not help since the request itself have to be updated. For example, authentication token expiration or timestamp on the request header
2. Refine the way we handle exceptions in the `chunked_download_source` background fiber, now we carry the original `exception_ptr` and also we wrap EVERY exception in `filler_exception` to prevent retry strategy trying to retry the request altogether

Fixes: https://github.com/scylladb/scylladb/issues/26483

Should be ported back to 2025.3 and 2025.4 to prevent deadlocks and failures in these versions

- (cherry picked from commit 55fb2223b6)

- (cherry picked from commit db1ca8d011)

- (cherry picked from commit 185d5cd0c6)

- (cherry picked from commit 116823a6bc)

- (cherry picked from commit 43acc0d9b9)

- (cherry picked from commit 58a1cff3db)

- (cherry picked from commit 1d34657b14)

- (cherry picked from commit 4497325cd6)

- (cherry picked from commit fdd0d66f6e)

Parent PR: #26527

Closes scylladb/scylladb#26650

* github.com:scylladb/scylladb:
  s3_client: tune logging level
  s3_client: add logging
  s3_client: improve exception handling for chunked downloads
  s3_client: fix indentation
  s3_client: add max for client level retries
  s3_client: remove `s3_retry_strategy`
  s3_client: support high-level request retries
  s3_client: just reformat `make_request`
  s3_client: unify `make_request` implementation
2025-10-22 11:33:53 +03:00
Piotr Dulikowski
1efb2eb174 view_building_worker: access tablet map through erm on sstable discovery
Currently, the data returned by `database::get_tables_metadata()` and
`database::get_token_metadata()` may not be consistent. Specifically,
the tables metadata may contain some tablet-based tables before their
tablet maps appear in the token metadata. This is going to be fixed
after issue scylladb/scylladb#24414 is closed, but for the time being
work around it by accessing the token metadata via
`table`->effective_replication_map() - that token metadata is guaranteed
to have the tablet map of the `table`.

Fixes: scylladb/scylladb#26403

Closes scylladb/scylladb#26588

(cherry picked from commit f76917956c)

Closes scylladb/scylladb#26631
2025-10-22 11:33:22 +03:00
Pavel Emelyanov
320ef84367 Merge '[Backport 2025.4] compaction/twcs: fix use after free issues' from Scylladb[bot]
The `compaction_strategy_state` class holds strategy specific state via
a `std::variant` containing different state types. When a compaction
strategy performs compaction, it retrieves a reference to its state from
the `compaction_strategy_state` object. If the table's compaction
strategy is ALTERed while a compaction is in progress, the
`compaction_strategy_state` object gets replaced, destroying the old
state. This leaves the ongoing compaction holding a dangling reference,
resulting in a use after free.

Fix this by using `seastar::shared_ptr` for the state variant
alternatives(`leveled_compaction_strategy_state_ptr` and
`time_window_compaction_strategy_state_ptr`). The compaction strategies
now hold a copy of the shared_ptr, ensuring the state remains valid for
the duration of the compaction even if the strategy is altered.

The `compaction_strategy_state` itself is still passed by reference and
only the variant alternatives use shared_ptrs. This allows ongoing
compactions to retain ownership of the state independently of the
wrapper's lifetime.

The method `maybe_wait_for_sstable_count_reduction()`, when retrieving
the list of sstables for a possible compaction, holds a reference to the
compaction strategy. If the strategy is updated during execution, it can
cause a use after free issue. To prevent this, hold a copy of the
compaction strategy so it isn’t yanked away during the method’s
execution.

Fixes #25913

Issue probably started after 9d3755f276, so backport to 2025.4

- (cherry picked from commit 1cd43bce0e)

- (cherry picked from commit 35159e5b02)

- (cherry picked from commit 18c071c94b)

Parent PR: #26593

Closes scylladb/scylladb#26625

* github.com:scylladb/scylladb:
  compaction: fix use after free when strategy is altered during compaction
  compaction/twcs: pass compaction_strategy_state to internal methods
  compaction_manager: hold a copy to compaction strategy in maybe_wait_for_sstable_count_reduction
2025-10-22 11:32:28 +03:00
Petr Gusev
01658f9fcb test_tablets_lwt: add test_lwt_shutdown
(cherry picked from commit 8925f31596)
2025-10-22 00:10:59 +00:00
Petr Gusev
e56f14b9c5 storage_proxy: wait for write handler destruction
shared_ptr<abstract_write_response_handler> instances are captured in
the lmutate/rmutate lambdas of send_to_live_endpoints(). As a result,
an abstract_write_response_handler object may outlive its removal from
the _response_handlers map. We use write_handler_destroy_promise to
wait for such pending instances in cancel_write_handlers() and
cancel_all_write_response_handlers() to prevent use-after-free.

A better long-term solution might be to replace shared_ptr with
unique_ptr for abstract_write_response_handler and use a separate gate
to track the lmutate/rmutate lambdas. We do not actually need to wait
for these lambdas to finish before sending a timeout or error response
to the client, as we currently do in ~abstract_write_response_handler.

Fixes scylladb/scylladb#26355

(cherry picked from commit bbcf3f6eff)
2025-10-22 00:10:59 +00:00
Petr Gusev
5865dad0c9 storage_proxy: coroutinize cancel_write_handlers
The cancel_write_handlers() method was assumed to be called in a thread
context, likely because it was first used from gossiper events, where a
thread context already existed. Later, this method was reused in
abort_view_writes() and abort_batch_writes(), where threads are created
on the fly and appear redundant.

The drain_on_shutdown() method also used a thread, justified by some
"delicate lifetime issues", but it is unclear what that actually means.
It seems that a straightforward co_await should work just fine.

(cherry picked from commit b269f78fa6)
2025-10-22 00:10:59 +00:00
Petr Gusev
388dfbe3ee storage_proxy: cancel_write_handlers: don't hold a strong pointer to handler
A strong pointer was held for the duration of thread::yield(),
preventing abstract_write_response_handler destruction and possibly
delaying the sending of timeout or error responses to the client.

This commit removes the strong pointer. Instead, we compute the
next iterator before calling timeout_cb(), so if the handler is
destroyed inside timeout_cb(), we already have a valid next iterator.

(cherry picked from commit bf2ac7ee8b)
2025-10-22 00:10:59 +00:00
Raphael S. Carvalho
92a603699e test: Add reproducer for l-a-s and split synchronization issue
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 4654cdc6fd)
2025-10-21 12:26:55 +00:00
Raphael S. Carvalho
d998d9d418 sstables_loader: Synchronize tablet split and load-and-stream
Load-and-stream is broken when running concurrently to the
finalization step of tablet split.

Consider this:
1) split starts
2) split finalization executes barrier and succeed
3) load-and-stream runs now, starts writing sstable (pre-split)
4) split finalization publishes changes to tablet metadata
5) load-and-stream finishes writing sstable
6) sstable cannot be loaded since it spans two tablets

two possible fixes (maybe both):

1) load-and-stream awaits for topology to quiesce
2) perform split compaction on sstable that spans both sibling tablets

This patch implements #1. By awaiting for topology to quiesce,
we guarantee that load-and-stream only starts when there's no
chance coordinator is handling some topology operation like
split finalization.

Fixes #26455.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 3abc66da5a)
2025-10-21 12:26:54 +00:00
Ernest Zaslavsky
6f6b3a26c4 s3_client: tune logging level
Change all logging related to errors in `chunked_download_source` background download fiber to `info` to make it visible right away in logs.

(cherry picked from commit fdd0d66f6e)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
4eb427976b s3_client: add logging
Add logging for the case when we encounter expired credentials, shouldnt happen but just in case

(cherry picked from commit 4497325cd6)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
94d49da8ec s3_client: improve exception handling for chunked downloads
Refactor the wrapping exception used in `chunked_download_source` to
prevent the retry strategy from reattempting failed requests. The new
implementation preserves the original `exception_ptr`, making the root
cause clearer and easier to diagnose.

(cherry picked from commit 1d34657b14)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
f9bc211966 s3_client: fix indentation
Reformat `client::make_request` to fix the indentation of `if` block

(cherry picked from commit 58a1cff3db)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
4aff338282 s3_client: add max for client level retries
To prevent client retrying indefinitely time skew and authentication errors add `max_attempts` to the `client::make_request`

(cherry picked from commit 43acc0d9b9)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
8b7dce8334 s3_client: remove s3_retry_strategy
It never worked as intended, so the credentials handling is moving to the same place where we handle time skew, since we have to reauthenticate the request

(cherry picked from commit 116823a6bc)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
2afd323838 s3_client: support high-level request retries
Add an option to retry S3 requests at the highest level, including
reinitializing headers and reauthenticating. This addresses cases
where retrying the same request fails, such as when the S3 server
rejects a timestamp older than 15 minutes.

(cherry picked from commit 185d5cd0c6)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
f2f415a742 s3_client: just reformat make_request
Just reformat previously changed methods to improve readability

(cherry picked from commit db1ca8d011)
2025-10-21 12:26:50 +00:00
Ernest Zaslavsky
5c2d8bd273 s3_client: unify make_request implementation
Refactor `make_request` to use a single core implementation that
handles authentication and issues the HTTP request. All overloads now
delegate to this unified method.

(cherry picked from commit 55fb2223b6)
2025-10-21 12:26:49 +00:00
Lakshmi Narayanan Sreethar
45b9675d28 compaction: fix use after free when strategy is altered during compaction
The `compaction_strategy_state` class holds strategy specific state via
a `std::variant` containing different state types. When a compaction
strategy performs compaction, it retrieves a reference to its state from
the `compaction_strategy_state` object. If the table's compaction
strategy is ALTERed while a compaction is in progress, the
`compaction_strategy_state` object gets replaced, destroying the old
state. This leaves the ongoing compaction holding a dangling reference,
resulting in a use after free.

Fix this by using `seastar::shared_ptr` for the state variant
alternatives(`leveled_compaction_strategy_state_ptr` and
`time_window_compaction_strategy_state_ptr`). The compaction strategies
now hold a copy of the shared_ptr, ensuring the state remains valid for
the duration of the compaction even if the strategy is altered.

The `compaction_strategy_state` itself is still passed by reference and
only the variant alternatives use shared_ptrs. This allows ongoing
compactions to retain ownership of the state independently of the
wrapper's lifetime.

Fixes #25913

Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>
(cherry picked from commit 18c071c94b)
2025-10-21 00:59:33 +00:00
Lakshmi Narayanan Sreethar
f1e1c7db4c compaction/twcs: pass compaction_strategy_state to internal methods
During TWCS compaction, multiple methods independently fetch the
compaction_strategy_state using get_state(). This can lead to
inconsistencies if the compaction strategy is ALTERed while the
compaction is in progress.

This patch fixes a part of this issue by passing down the state to the
lower level methods as parameters instead of fetching it repeatedly.

Refs #25913

Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>
(cherry picked from commit 35159e5b02)
2025-10-21 00:59:33 +00:00
Lakshmi Narayanan Sreethar
5e1f32b3d4 compaction_manager: hold a copy to compaction strategy in maybe_wait_for_sstable_count_reduction
The method `maybe_wait_for_sstable_count_reduction()`, when retrieving
the list of sstables for a possible compaction, holds a reference to the
compaction strategy. If the strategy is updated during execution, it can
cause a use after free issue. To prevent this, hold a copy of the
compaction strategy so it isn’t yanked away during the method’s
execution.

Refs #26546
Refs #25913

Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>
(cherry picked from commit 1cd43bce0e)
2025-10-21 00:59:32 +00:00
Botond Dénes
99f2dd02bf Merge '[Backport 2025.4] raft topology: disable schema pulls in the Raft-based recovery procedure' from Scylladb[bot]
Schema pulls should always be disabled when group 0 is used. However,
`migration_manager::disable_schema_pulls()` is never called during
a restart with `recovery_leader` set in the Raft-based recovery
procedure, which causes schema pulls to be re-enabled on all live nodes
(excluding the nodes replacing the dead nodes). Moreover, schema pulls
remain enabled on each node until the node is restarted, which could
be a very long time.

We fix this issue and add a regression test in this PR.

Fixes #26569

This is an important bug fix, so it should be backported to all branches
with the Raft-based recovery procedure (2025.2 and newer branches).

- (cherry picked from commit ec3a35303d)

- (cherry picked from commit da8748e2b1)

- (cherry picked from commit 71de01cd41)

Parent PR: #26572

Closes scylladb/scylladb#26599

* github.com:scylladb/scylladb:
  test: test_raft_recovery_entry_loss: fix the typo in the test case name
  test: verify that schema pulls are disabled in the Raft-based recovery procedure
  raft topology: disable schema pulls in the Raft-based recovery procedure
2025-10-20 10:39:52 +03:00
Botond Dénes
76a6a059c8 Merge '[Backport 2025.4] Fix vector store client flaky test' from Scylladb[bot]
This series of patches improves test vector_store_client_test stability. The primary issue with flaky connections was discovered while working on PR #26308.

Key Changes:
- Fixes premature connection closures in the mock server:
The mock HTTP server was not consuming request payloads, causing it to close connections immediately after a response. Subsequent tests attempting to reuse these closed connections would fail intermittently, leading to flakiness. The server has been updated to handle payloads correctly.

- Removes a retry workaround:
With the underlying connection issue resolved, the retry logic in the vector_store_client_test_ann_request test is no longer needed and has been removed.

- Mocks the DNS resolver in tests:
The vector_store_client_uri_update_to_invalid test has been corrected to mock DNS lookups, preventing it from making real network requests.

- Corrects request timeout handling:
A bug has been fixed where the request timeout was not being reset between consecutive requests.

- Unifies test timeouts:
Timeouts have been standardized across the test suite for consistency.

Fixes: #26468

It is recommended to backport this series to the 2025.4 branch. Since these changes only affect test code and do not alter any production logic, the backport is safe. Addressing this test flakiness will improve the stability of the CI pipeline and prevent it from blocking unrelated patches.

- (cherry picked from commit ac5e9c34b6)

- (cherry picked from commit 2eb752e582)

- (cherry picked from commit d99a4c3bad)

- (cherry picked from commit 0de1fb8706)

- (cherry picked from commit 62deea62a4)

Parent PR: #26374

Closes scylladb/scylladb#26551

* github.com:scylladb/scylladb:
  vector_search: Unify test timeouts
  vector_search: Fix missing timeout reset
  vector_search: Refactor ANN request test
  vector_search: Fix flaky connection in tests
  vector_search: Fix flaky test by mocking DNS queries
2025-10-20 10:35:45 +03:00
Michał Chojnowski
6ff4910d96 test/cluster/test_bti_index.py: avoid a race with CQL tracing
The test uses CQL tracing to check which files were read by a query.
This is flaky if the coordinator and the replica are different shards,
because the Python driver only waits for the coordinator, and not
for replicas, to finish writing their traces.
(So it might happen that the Python driver returns a result
with only coordinator events and no replica events).

Let's just dodge the issue by using --smp=1.

Fixes scylladb/scylladb#26432

Closes scylladb/scylladb#26434

(cherry picked from commit c35b82b860)

Closes scylladb/scylladb#26492
2025-10-20 10:32:58 +03:00
Botond Dénes
d213953d0a Merge '[Backport 2025.4] tools: fix documentation links after change to source-available' from Scylladb[bot]
Some tools commands have links to online documentation in their help output. These links were left behind in the source-available change, they still point to the old opensource docs. Furthermore, the links in the scylla-sstable help output always point to the latest stable release's documentation, instead of the appropriate one for the branch the tool was built from. Fix both of these.

Fixes: scylladb/scylladb#26320

Broken documentation link fix for the  tool help output, needs backport to all live source-available versions.

- (cherry picked from commit 5a69838d06)

- (cherry picked from commit 15a4a9936b)

- (cherry picked from commit fe73c90df9)

Parent PR: #26322

Closes scylladb/scylladb#26390

* github.com:scylladb/scylladb:
  tools/scylla-sstable: fix doc links
  release: adjust doc_link() for the post source-available world
  tools/scylla-nodetool: remove trailing " from doc urls
2025-10-20 10:31:05 +03:00
Michał Jadwiszczak
f5e76d0fcb test/cluster/test_view_building_coordinator: skip reproducer instead of xfail
The reproducer for issue scylladb/scylladb#26244 takes some time
and since the test is failing, there is no point in wasting resources on
it.
We can change the xfail mark to skip.

Refs scylladb/scylladb#26244

Closes scylladb/scylladb#26350

(cherry picked from commit d92628e3bd)

Closes scylladb/scylladb#26365
2025-10-20 10:30:34 +03:00
Aleksandra Martyniuk
2819b8b755 test: wait for cql in test_two_tablets_concurrent_repair_and_migration_repair_writer_level
In test_two_tablets_concurrent_repair_and_migration_repair_writer_level
safe_rolling_restart returns ready cql. However, get_all_tablet_replicas
uses the cql reference from manager that isn't ready. Wait for cql.

Fixes: #26328

Closes scylladb/scylladb#26349

(cherry picked from commit 0e73ce202e)

Closes scylladb/scylladb#26362
2025-10-20 10:29:56 +03:00
Avi Kivity
245d27347b dht, sstables: replace vector with chunked_vector when computing sstable shards
sstable::compute_shards_for_this_sstable() has a temporary of type
std::vector<dht::token_range> (aka dht::partition_range_vector), which
allocates a contiguous 300k when loading an sstable from disk. This
causes large allocation warnings (it doesn't really stress the allocator
since this typically happens during startup, but best to clear the warning
anyway).

Fix this by changing the container to by chunked_vector. It is passed
to dht::ring_position_range_vector_sharder, but since we're the only
user, we can change that class to accept the new type.

Fixes #24198.

Closes scylladb/scylladb#26353

(cherry picked from commit 7230a04799)

Closes scylladb/scylladb#26360
2025-10-20 10:28:59 +03:00
Patryk Jędrzejczak
323a7b8c55 test: test_raft_recovery_entry_loss: fix the typo in the test case name
(cherry picked from commit 71de01cd41)
2025-10-17 10:27:33 +00:00
Patryk Jędrzejczak
cd0bb11eef test: verify that schema pulls are disabled in the Raft-based recovery procedure
We do this at the end of `test_raft_recovery_entry_loss`. It's not worth
to add a separate regression test, as tests of the recovery procedure
are complicated and have a long running time. Also, we choose
`test_raft_recovery_entry_loss` out of all tests of the recovery
procedure because it does some schema changes.

(cherry picked from commit da8748e2b1)
2025-10-17 10:27:32 +00:00
Patryk Jędrzejczak
95d4206585 raft topology: disable schema pulls in the Raft-based recovery procedure
Schema pulls should always be disabled when group 0 is used. However,
`migration_manager::disable_schema_pulls()` is never called during
a restart with `recovery_leader` set in the Raft-based recovery
procedure, which causes schema pulls to be re-enabled on all live nodes
(excluding the nodes replacing the dead nodes). Moreover, schema pulls
remain enabled on each node until the node is restarted, which could
be a very long time.

The old gossip-based recovery procedure doesn't have this problem
because we disable schema pulls after completing the upgrade-to-group0
procedure, which is a part of the old recovery procedure.

Fixes #26569

(cherry picked from commit ec3a35303d)
2025-10-17 10:27:32 +00:00
Botond Dénes
2bc0c9c45b Update seastar submodule
* seastar 37983cd0...60e4b3b9 (1):
  > http: add "Connection: close" header to final server response.

Refs #26298
2025-10-17 10:22:45 +03:00
Artsiom Mishuta
de5a13db28 test.py: reintroducing sudo in resource_gather.py
conditionally reintroducing sudo for resource gathering
when running under docker

related: https://github.com/scylladb/scylladb/pull/26294#issuecomment-3346968097

fixes: https://github.com/scylladb/scylladb/issues/26312

Closes scylladb/scylladb#26401

(cherry picked from commit 99455833bd)

Closes scylladb/scylladb#26473
2025-10-17 09:27:13 +03:00
Pavel Emelyanov
dc3c6c3090 Update seastar submodule (iotune fixes for i7i/i4i)
* seastar c8a3515f9...37983cd04 (2):
  > iotune: fix very long warm up duration on systems with high cpu count
  > Merge '[Backport 2025.4] iotune: Add warmup period to measurements' from Robert Bindar
    iotune: Ignore measurements during warmup period
    iotune: Fix warmup calculation bug and botched rebase

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Fixes #26530

Closes scylladb/scylladb#26583
2025-10-16 20:30:37 +03:00
Jenkins Promoter
83babc20e3 Update ScyllaDB version to: 2025.4.0-rc2 2025-10-15 15:43:09 +03:00
Ernest Zaslavsky
04b9e98ef8 s3_client: track memory starvation in background filling fiber
Introduce a counter metric to monitor instances where the background
filling fiber is blocked due to insufficient memory in the S3 client.

Closes scylladb/scylladb#26466

(cherry picked from commit 413739824f)

Closes scylladb/scylladb#26555
2025-10-15 12:03:09 +02:00
Michał Chojnowski
de8c2a8196 test/boost/sstable_compressor_factory_test: fix thread-unsafe usage of Boost.Test
It turns out that Boost assertions are thread-unsafe,
(and can't be used from multiple threads concurrently).
This causes the test to fail with cryptic log corruptions sometimes.
Fix that by switching to thread-safe checks.

Fixes scylladb/scylladb#24982

Closes scylladb/scylladb#26472

(cherry picked from commit 7c6e84e2ec)

Closes scylladb/scylladb#26554
2025-10-15 12:08:54 +03:00
Jenkins Promoter
dd2e8a2105 Update pgo profiles - aarch64 2025-10-15 05:03:22 +03:00
Jenkins Promoter
90fd618967 Update pgo profiles - x86_64 2025-10-15 04:31:33 +03:00
Karol Nowacki
da8bd30a5b vector_search: Unify test timeouts
The test previously used separate timeouts for requests (5s) and the
overall test case (10s).

This change unifies both timeouts to 10 seconds.

(cherry picked from commit 62deea62a4)
2025-10-14 22:49:42 +00:00
Karol Nowacki
4e9a42f343 vector_search: Fix missing timeout reset
The `vector_store_client_test` could be flaky because the request timeout
was not consistently reset in all code paths. This could lead to a
timeout from a previous operation firing prematurely and failing the
test.

The fix ensures `abort_source_timeout` is reset before each request.
The implementation is also simplified by changing
`abort_source_timeout::reset` that combines the reset and arm
operations into a same invocation.

(cherry picked from commit 0de1fb8706)
2025-10-14 22:49:42 +00:00
Karol Nowacki
6db7481c7a vector_search: Refactor ANN request test
Refactor the `vector_store_client_test_ann_request` test to use the
`vs_mock_server` class, unifying the structure of the test cases.

This change also removes retry logic that waited for the server to be ready.
This is no longer necessary because the handler now exists for all index names
and consumes the entire request payload, preventing connection closures.

Previously, the server did not handle requests for unconfigured
indexes, which caused the connection to close. This could lead to a
race condition where the client would attempt to reuse a closed
connection.

(cherry picked from commit d99a4c3bad)
2025-10-14 22:49:42 +00:00
Karol Nowacki
62a5d4f932 vector_search: Fix flaky connection in tests
The vector store mock server was not reading the ANN request body,
which could cause it to prematurely close the connection.

This could lead to a race condition where the client attempts to reuse a
closed connection from its pool, resulting in a flaky test.

The fix is to always read the request body in the mock server.

(cherry picked from commit 2eb752e582)
2025-10-14 22:49:42 +00:00
Karol Nowacki
f5319b06ae vector_search: Fix flaky test by mocking DNS queries
The `vector_store_client_uri_update_to_invalid` test was flaky because
it performed real DNS lookups, making it dependent on the network
environment.

This commit replaces the live DNS queries with a mock to make the test
hermetic and prevent intermittent failures.

`vector_search_metrics_test` test did not call configure{vs},
as a consequence the test did real DNS queries, which made the test
flaky.

The refreshes counter increment has been moved before the call to the resolver.
In tests, the resolver is mocked leading to lack of increments in production code.
Without this change, there is no way to test DNS counter increments.

The change also simplifies the test making it more readable.

(cherry picked from commit ac5e9c34b6)
2025-10-14 22:49:42 +00:00
Piotr Wieczorek
c191c31682 alternator: Correct RCU undercount in BatchGetItem
The `describe_multi_item` function treated the last reference-captured
argument as the number of used RCU half units. The caller
`batch_get_item`, however, expected this parameter to hold an item size.
This RCU value was then passed to
`rcu_consumed_capacity_counter::get_half_units`, treating the
already-calculated RCU integer as if it were a size in bytes.

This caused a second conversion that undercounted the true RCU. During
conversion, the number of bytes is divided by `RCU_BLOCK_SIZE_LENGTH`
(=4KB), so the double conversion divided the number of bytes by 16 MB.

The fix removes the second conversion in `describe_multi_item` and
changes the API of `describe_multi_item`.

Fixes: https://github.com/scylladb/scylladb/pull/25847

Closes scylladb/scylladb#25842

(cherry picked from commit a55c5e9ec7)

Closes scylladb/scylladb#26539
2025-10-14 11:53:09 +03:00
Dawid Mędrek
a4fd7019e3 replica/database: Fix description of validate_tablet_views_indexes
The current description is not accurate: the function doesn't throw
an exception if there's an invalid materialized view. Instead, it
simply logs the keyspaces that violate the requirement.

Furthermore, the experimental feature `views-with-tablets` is no longer
necessary for considering a materialized view as valid. It was dropped
in scylladb/scylladb@b409e85c20. The
replacement for it is the cluster feature `VIEWS_WITH_TABLETS`.

Fixes scylladb/scylladb#26420

Closes scylladb/scylladb#26421

(cherry picked from commit a9577e4d52)

Closes scylladb/scylladb#26476
2025-10-14 11:52:34 +03:00
Pavel Emelyanov
e18072d4b8 Merge '[Backport 2025.4] service/qos: set long timeout for auth queries on SL cache update' from Scylladb[bot]
pass an appropriate query state for auth queries called from service
level cache reload. we use the function qos_query_state to select a
query_state based on caller context - for internal queries, we set a
very long timeout.

the service level cache reload is called from group0 reload. we want it
to have a long timeout instead of the default 5 seconds for auth
queries, because we don't have strict latency requirement on the one
hand, and on the other hand a timeout exception is undesired in the
group0 reload logic and can break group0 on the node.

Fixes https://github.com/scylladb/scylladb/issues/25290

backport possible to improve stability

- (cherry picked from commit a1161c156f)

- (cherry picked from commit 3c3dd4cf9d)

- (cherry picked from commit ad1a5b7e42)

Parent PR: #26180

Closes scylladb/scylladb#26479

* github.com:scylladb/scylladb:
  service/qos: set long timeout for auth queries on SL cache update
  auth: add query_state parameter to query functions
  auth: refactor query_all_directly_granted
2025-10-13 15:26:21 +03:00
Robert Bindar
7353aa5aa5 Make scylla_io_setup detect request size for best write IOPS
We noticed during work on scylladb/seastar#2802 that on i7i family
(later proved that it's valid for i4i family as well),
the disks are reporting the physical sector sizes incorrectly
as 512bytes, whilst we proved we can render much better write IOPS with
4096bytes.

This is not the case on AWS i3en family where the reported 512bytes
physical sector size is also the size we can achieve the best write IOPS.

This patch works around this issue by changing `scylla_io_setup` to parse
the instance type out of `/sys/devices/virtual/dmi/id/product_name`
and run iotune with the correct request size based on the instance type.

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>

Closes scylladb/scylladb#25315

(cherry picked from commit 2c74a6981b)

Closes scylladb/scylladb#26474
2025-10-13 15:25:16 +03:00
Michał Chojnowski
ec0b31b193 docs: fix a parameter name in API calls in sstable-dictionary-compression.rst
The correct argument name is `cf`, not `table`.

Fixes scylladb/scylladb#25275

Closes scylladb/scylladb#26447

(cherry picked from commit 87e3027c81)

Closes scylladb/scylladb#26495
2025-10-12 21:10:25 +03:00
Patryk Jędrzejczak
b5c3e2465f test: test_raft_no_quorum: test_can_restart: deflake the read barrier call
Expecting the group 0 read barrier to succeed with a timeout of 1s, just
after restarting 3 out of 5 voters, turned out to be flaky. In some
unlikely scenarios, such as multiple vote splits, the Raft leader
election could finish after the read barrier times out.

To deflake the test, we increase the timeout of Raft operations back to
300s for read barriers we expect to succeed.

Fixes #26457

Closes scylladb/scylladb#26489

(cherry picked from commit 5f68b9dc6b)

Closes scylladb/scylladb#26522
2025-10-12 21:02:02 +03:00
Asias He
3cae4a21ab repair: Rename incremental mode name
Using the name regular as the incremental mode could be confusing, since
regular might be interpreted as the non-incremental repair. It is better
to use incremental directly.

Before:

- regular (standard incremental repair)
- full (full incremental repair)
- disabled (incremental repair disabled)

After:

- incremental (standard incremental repair)
- full (full incremental repair)
- disabled (incremental repair disabled)

Fixes #26503

Closes scylladb/scylladb#26504

(cherry picked from commit 13dd88b010)

Closes scylladb/scylladb#26521
2025-10-12 21:01:05 +03:00
Ernest Zaslavsky
5c6335e029 s3_client: fix when condition to prevent infinite locking
Refine condition variable predicate in filling fiber to avoid
indefinite waiting when `close` is invoked.

Closes scylladb/scylladb#26449

(cherry picked from commit c2bab430d7)

Closes scylladb/scylladb#26497
2025-10-12 16:19:48 +03:00
Avi Kivity
de4975d181 dist: scylla_raid_setup: don't override XFS block size on modern kernels
In 6977064693 ("dist: scylla_raid_setup:
reduce xfs block size to 1k"), we reduced the XFS block size to 1k when
possible. This is because commitlog wants to write the smallest amount
of padding it can, and older Linux could only write a multiple of the
block size. Modern Linux [1] can O_DIRECT overwrite a range smaller than
a filesystem block.

However, this doesn't play well with some SSDs that have 512 byte
logical sector size and 4096 byte physical sector size - it causes them
to issue read-modify-writes.

To improve the situation, if we detect that the kernel is recent enough,
format the filesystem with its default block size, which should be optimal.

Note that commitlog will still issue sub-4k writes, which can translate
to RMW. There, we believe that the amplification is reduced since
sequential sub-physical-sector writes can be merged, and that the overhead
from commitlog space amplification is worse than the RMW overhead.

Tested on AWS i4i.large. fsqual report:

```
memory DMA alignment:    512
disk DMA alignment:      512
filesystem block size:   4096
context switch per write io (size-changing, append, blocksize 4096, iodepth 1): 0.0003 (GOOD)
context switch per write io (size-changing, append, blocksize 4096, iodepth 3): 0.7961 (BAD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 3): 0 (GOOD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 7): 0.0001 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.125 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 4096, iodepth 1): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 4096, iodepth 3): 0.8006 (BAD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 3): 0.0001 (GOOD)
context switch per write io (size-unchanging, append, blocksize 4096, iodepth 7): 0 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.125 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per read io (size-changing, append, blocksize 512, iodepth 30): 0 (GOOD)
```

The sub-block overwrite cases are GOOD.

In comparison, the fsqual report for 1k (similar):

```
memory DMA alignment:    512
disk DMA alignment:      512
filesystem block size:   1024
context switch per write io (size-changing, append, blocksize 1024, iodepth 1): 0.0005 (GOOD)
context switch per write io (size-changing, append, blocksize 1024, iodepth 3): 0.7948 (BAD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 3): 0.0015 (GOOD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 7): 0.0022 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.4999 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 1024, iodepth 1): 0 (GOOD)
context switch per write io (size-changing, append, blocksize 1024, iodepth 3): 0.798 (BAD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 3): 0.0012 (GOOD)
context switch per write io (size-unchanging, append, blocksize 1024, iodepth 7): 0.0019 (GOOD)
context switch per write io (size-unchanging, append, blocksize 512, iodepth 1): 0.5 (BAD)
context switch per write io (size-unchanging, overwrite, blocksize 512, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 1): 0 (GOOD)
context switch per write io (size-unchanging, overwrite, blocksize 512, O_DSYNC, iodepth 3): 0 (GOOD)
context switch per read io (size-changing, append, blocksize 512, iodepth 30): 0 (GOOD)
```

Fixes #25441.

[1] ed1128c2d0

Closes scylladb/scylladb#25445

(cherry picked from commit 5d1846d783)

Closes scylladb/scylladb#26471
2025-10-12 16:17:16 +03:00
Piotr Dulikowski
1f73e18eaf Merge '[Backport 2025.4] db/view: Require rf_rack_valid_keyspaces when creating materialized view' from Scylladb[bot]
Materialized views are currently in the experimental phase and using them
in tablet-based keyspaces requires starting Scylla with an experimental feature,
`views-with-tablets`. Any attempts to create a materialized view or secondary
index when it's not enabled will fail with an appropriate error.

After considerable effort, we're drawing close to bringing views out of the
experimental phase, and the experimental feature will no longer be needed.
However, materialized views in tablet-based keyspaces will still be restricted,
and creating them will only be possible after enabling the configuration option
`rf_rack_valid_keyspaces`. That's what we do in this PR.

In this patch, we adjust existing tests in the tree to work with the new
restriction. That shouldn't have been necessary because we've already seemingly
adjusted all of them to work with the configuration option, but some tests hid
well. We fix that mistake now.

After that, we introduce the new restriction. What's more, when starting Scylla,
we verify that there is no materialized view that would violate the contract.
If there are some that do, we list them, notify the user, and refuse to start.

High-level implementation strategy:

1. Name the restrictions in form of a function.
2. Adjust existing tests.
3. Restrict materialized views by both the experimental feature
   and the configuration option. Add validation test.
4. Drop the requirement for the experimental feature. Adjust the added test
   and add a new one.
5. Update the user documentation.

Fixes scylladb/scylladb#23030

Backport: 2025.4, as we are aiming to support materialized views for tablets from that version.

- (cherry picked from commit a1254fb6f3)

- (cherry picked from commit d6fcd18540)

- (cherry picked from commit 994f09530f)

- (cherry picked from commit 6322b5996d)

- (cherry picked from commit 71606ffdda)

- (cherry picked from commit 00222070cd)

- (cherry picked from commit 288be6c82d)

- (cherry picked from commit b409e85c20)

Parent PR: #25802

Closes scylladb/scylladb#26416

* github.com:scylladb/scylladb:
  view: Stop requiring experimental feature
  db/view: Verify valid configuration for tablet-based views
  db/view: Require rf_rack_valid_keyspaces when creating view
  test/cluster/random_failures: Skip creating secondary indexes
  test/cluster/mv: Mark test_mv_rf_change as skipped
  test/cluster: Adjust MV tests to RF-rack-validity
  test/boost/schema_loader_test.cc: Explicitly enable rf_rack_valid_keyspaces
  db/view: Name requirement for views with tablets
2025-10-12 08:20:20 +02:00
Michał Jadwiszczak
931f9ca3db db/view/view_building_worker: update state again if some batch was finished during the update
There was a race between loop in `view_building_worker::run_view_building_state_observer()`
and a moment when a batch was finishing its work (`.finally()` callback
in `view_building_worker::batch::start()`).

State observer waits on `_vb_state_machine.event` CV and when it's
awoken, it takes group0 read apply mutex and updates its state. While
updating the state, the observer looks at `batch::state` field and
reacts to it accordingly.
On the other hand, when a batch finishes its work, it sets `state` field
to `batch_state::finished` and does a broadcast on
`_vb_state_machine.event` CV.
So if the batch will execute the callback in `.finally()` while the
observer is updating its state, the observer may miss the event on the
CV and it will never notice that the batch was finished.

This patch fixes this by adding a `some_batch_finished` flag. Even if
the worker won't see an event on the CV, it will notice that the flag
was set and it will do next iteration.

Fixes scylladb/scylladb#26204

Closes scylladb/scylladb#26289

(cherry picked from commit 8d0d53016c)

Closes scylladb/scylladb#26500
2025-10-10 09:53:22 +02:00
Piotr Dulikowski
3775e8e49a Merge '[Backport 2025.4] db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground' from Scylladb[bot]
db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground
This patch moves `discover_existing_staging_sstables()` to be executed
from main level, instead of running it on the background fiber.

This method need to be run only once during the startup to collect
existing staging sstables, so there is no need to do it in the
background. This change will increase debugability of any further issues
related to it (like https://github.com/scylladb/scylladb/issues/26403).

Fixes https://github.com/scylladb/scylladb/issues/26417

The patch should be backported to 2025.4

- (cherry picked from commit 575dce765e)

- (cherry picked from commit 84e4e34d81)

Parent PR: #26446

Closes scylladb/scylladb#26501

* github.com:scylladb/scylladb:
  db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground
  db/view/view_building_worker: futurize and rename `start_background_fibers()`
2025-10-10 09:52:45 +02:00
Michał Jadwiszczak
f4d9513e0f db/view/view_building_worker: move discover_existing_staging_sstables() to the foreground
This patch moves `discover_existing_staging_sstables()` to be executed
from main level, instead of running it on the background fiber.

This method need to be run only once during the startup to collect
existing staging sstables, so there is no need to do it in the
background. This change will increase debugability of any further issues
related to it (like scylladb/scylladb#26403).

Fixes scylladb/scylladb#26417

(cherry picked from commit 84e4e34d81)
2025-10-09 22:39:33 +00:00
Michał Jadwiszczak
5eeb1e3e76 db/view/view_building_worker: futurize and rename start_background_fibers()
Next commit will move `discover_existing_staging_sstables()`
to the foreground, so to prepare for this we need to futurize
`start_background_fibers()` method and change its name to better reflect
its purpose.

(cherry picked from commit 575dce765e)
2025-10-09 22:39:32 +00:00
Patryk Jędrzejczak
989aa0b237 raft topology: make the voter handler consider only group 0 members
In the Raft-based recovery procedure, we create a new group 0 and add
live nodes to it one by one. This means that for some time there are
nodes which belong to the topology, but not to the new group 0. The
voter handler running on the recovery leader incorrectly considers these
nodes while choosing voters.

The consequences:
- misleading logs, for example, "making servers {<ID of a non-member>}
  voters", where the non-member won't become a voter anyway,
- increased chance of majority loss during the recovery procedure, for
  example, all 3 nodes that first joined the new group 0 are in the same
  dc and rack, but only one of them becomes a voter because the voter
  handler tries to make non-members in other dcs/racks voters.

Fixes #26321

Closes scylladb/scylladb#26327

(cherry picked from commit 67d48a459f)

Closes scylladb/scylladb#26428
2025-10-09 18:17:49 +02:00
Michael Litvak
eba0a2cf72 service/qos: set long timeout for auth queries on SL cache update
pass an appropriate query state for auth queries called from service
level cache reload. we use the function qos_query_state to select a
query_state based on caller context - for internal queries, we set a
very long timeout.

the service level cache reload is called from group0 reload. we want it
to have a long timeout instead of the default 5 seconds for auth
queries, because we don't have strict latency requirement on the one
hand, and on the other hand a timeout exception is undesired in the
group0 reload logic and can break group0 on the node.

Fixes scylladb/scylladb#25290

(cherry picked from commit ad1a5b7e42)
2025-10-09 12:48:45 +00:00
Michael Litvak
3a9eb9b65f auth: add query_state parameter to query functions
add a query_state parameter to several auth functions that execute
internal queries. currently the queries use the
internal_distributed_query_state() query state, and we maintain this as
default, but we want also to be able to pass a query state from the
caller.

in particular, the auth queries currently use a timeout of 5 seconds,
and we will want to set a different timeout when executed in some
different context.

(cherry picked from commit 3c3dd4cf9d)
2025-10-09 12:48:45 +00:00
Michael Litvak
f75541b7b3 auth: refactor query_all_directly_granted
rewrite query_all_directly_granted to use execute_internal instead of
query_internal in a style that is more consistent with the rest of the
module.

This will also be useful for a later change because execute_internal
accepts an additional parameter of query_state.

(cherry picked from commit a1161c156f)
2025-10-09 12:48:45 +00:00
Michał Chojnowski
879db5855d utils/config_file: fix a missing allowed_values propagation in one of named_value constructors
In one of the constructors of `named_value`, the `allowed_values`
argument isn't used.

(This means that if some config entry uses this constructor,
the values aren't validated on the config layer,
and might give some lower layer a bad surprise).

Fix that.

Fixes scyllladb/scylladb#26371

Closes scylladb/scylladb#26196

(cherry picked from commit 3b338e36c2)

Closes scylladb/scylladb#26425
2025-10-09 13:19:41 +03:00
Michał Chojnowski
22d3ee5670 sstables/trie: actually apply BYPASS CACHE to index reads
BYPASS CACHE is implemented for `bti_index_reader` by
giving it its own private `cached_file` wrappers over
Partitions.db and Rows.db, instead of passing it
the shared `cached_file` owned by the sstable.

But due to an oversight, the private `cached_file`s aren't
constructed on top of the raw Partitions.db and Rows.db
files, but on top of `cached_file_impl` wrappers around
those files. Which means that BYPASS CACHE doesn't
actually do its job.

Tests based on `scylla_index_page_cache_*` metrics
and on CQL tracing still see the reads from the private
files as "cache misses", but those misses are served
from the shared cached files anyway, so the tests don't see
the problem. In this commit we extend `test_bti_index.py`
with a check that looks at reactor's `io_queue` metrics
instead, and catches the problem.

Fixes scylladb/scylladb#26372

Closes scylladb/scylladb#26373

(cherry picked from commit dbddba0794)

Closes scylladb/scylladb#26424
2025-10-09 13:17:29 +03:00
Dawid Mędrek
2bdf792f8e view: Stop requiring experimental feature
We modify the requirements for using materialized views in tablet-based
keyspaces. Before, it was necessary to enable the configuration option
`rf_rack_valid_keyspaces`, having the cluster feature `VIEWS_WITH_TABLETS`
enabled, and using the experimental feature `views-with-tablets`.
We drop the last requirement.

We adjust code to that change and provide a new validation test.
We also update the user documentation to reflect the changes.

Fixes scylladb/scylladb#23030

(cherry picked from commit b409e85c20)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
2e2d1f17bb db/view: Verify valid configuration for tablet-based views
Creating a materialized view or a secondary index in a tablet-based
keyspace requires that the user enabled two options:

* experimental feature `views-with-tablets`,
* configuration option `rf_rack_vaid_keyspaces`.

Because the latter has only become a necessity recently (in this series),
it's possible that there are already existing materialized views that
violate it.

We add a new check at start-up that iterates over existing views and
makes sure that that is not the case. Otherwise, Scylla notifies the user
of the problem.

(cherry picked from commit 288be6c82d)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
e9aba62cc5 db/view: Require rf_rack_valid_keyspaces when creating view
We extend the requirements for being able to create materialized views
and secondary indexes in tablet-based keyspaces. It's now necessary to
enable the configuration option `rf_rack_valid_keyspaces`. This is
a stepping stone towards bringing materialized views and secondary
indexes with tablets out of the experimental phase.

We add a validation test to verify the changes.

Refs scylladb/scylladb#23030

(cherry picked from commit 00222070cd)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
a7d0cf6dd0 test/cluster/random_failures: Skip creating secondary indexes
Materialized views are going to require the configuration option
`rf_rack_valid_keyspaces` when being created in tablet-based keyspaces.
Since random-failure tests still haven't been adjusted to work with it,
and because it's not trivial, we skip the cases when we end up creating
or dropping an index.

(cherry picked from commit 71606ffdda)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
6e94c075e3 test/cluster/mv: Mark test_mv_rf_change as skipped
The test will not work with `rf_rack_valid_keyspaces`. Since the option
is going to become a requirement for using views with tablets, the test
will need to be rewritten to take that into consideration. Since that
adjustment doesn't seem trivial, we mark the test as skipped for the
time being.

(cherry picked from commit 6322b5996d)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
f90ca413a0 test/cluster: Adjust MV tests to RF-rack-validity
Some of the new tests covering materialized views explicitly disabled
the configuration option `rf_rack_valid_keyspaces`. It's going to become
a new requirement for views with tablets, so we adjust those tests and
enable the option. There is one exception, the test:

`cluster/mv/test_mv_topology_change.py::test_mv_rf_change`

We handle it separately in the following commit.

(cherry picked from commit 994f09530f)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
5e0f5f4b44 test/boost/schema_loader_test.cc: Explicitly enable rf_rack_valid_keyspaces
The test cases in the file aren't run via an existing interface like
`do_with_cql_env`, but they rely on a more direct approach -- calling
one of the schema loader tools. Because of that, they manage the
`db::config` object on their own and don't enable the configuration
option `rf_rack_valid_keyspaces`.

That hasn't been a problem so far since the test doesn't attempt to
create RF-rack-invalid keyspaces anyway. However, in an upcoming commit,
we're going to further restrict views with tablets and require that the
option is enabled.

To prepare for that, we enable the option in all test cases. It's only
necessary in a small subset of them, but it won't hurt the enforce it
everywhere, so let's do that.

Refs scylladb/scylladb#23958

(cherry picked from commit d6fcd18540)
2025-10-06 13:19:54 +00:00
Dawid Mędrek
5d32fef3ae db/view: Name requirement for views with tablets
We add a named requirement, a function, for materialized views with tablets.
It decides whether we can create views and secondary indexes in a given
keyspace. It's a stepping stone towards modifying the requirements for it.

This way, we keep the code in one place, so it's not possible to forget
to modify it somewhere. It also makes it more organized and concise.

(cherry picked from commit a1254fb6f3)
2025-10-06 13:19:53 +00:00
Botond Dénes
1b5c46a796 Merge '[Backport 2025.4] test: dtest: test_limits.py: migrate from dtest' from Dario Mirovic
Backport motivation:
scylla-dtest PR [limits_test.py: remove tests already ported to scylladb repo](https://github.com/scylladb/scylla-dtest/pull/6232) that removes migrated tests got merged before branch-2025.4 separation
scylladb PR [test: dtest: test_limits.py: migrate from dtest](https://github.com/scylladb/scylladb/pull/26077) got merged after branch-2025.4 separation
This caused the tests to be fully removed from branch-2025.4. This backport PR makes sure the tests are present in scylladb branch-2025.4.

This PR migrates limits tests from dtest to this repository.

One reason is that there is an ongoing effort to migrate tests from dtest to here.

Debug logs are enabled on `test_max_cells` for `lsa-timing` logger, to have more information about memory reclaim operation times and memory chunk sizes. This will allow analysis of their value distributions, which can be helpful with debugging if the issue reoccurs.

Also, scylladb keeps sql files with metrics which, with some modifications, can be used to track metrics over time for some tests. This would show if there are pauses and spikes or the test performance is more or less consistent over time.

scylla-dtest PR that removes migrated tests:
[limits_test.py: remove tests already ported to scylladb repo #6232](https://github.com/scylladb/scylla-dtest/pull/6232)

Fixes #25097

- (cherry picked from commit 82e9623911)
- (cherry picked from commit 70128fd5c7)
- (cherry picked from commit 554fd5e801)
- (cherry picked from commit b3347bcf84)

Parent PR: #26077

Closes scylladb/scylladb#26359

* github.com:scylladb/scylladb:
  test: dtest: limits_test.py: test_max_cells log level
  test: dtest: limits_test.py: make the tests work
  test: dtest: test_limits.py: remove test that are not being migrated
  test: dtest: copy unmodified limits_test.py
2025-10-06 15:46:49 +03:00
Jenkins Promoter
f2c5874fa9 Update ScyllaDB version to: 2025.4.0-rc1 2025-10-03 21:26:12 +03:00
Botond Dénes
4049dae0b2 tools/scylla-sstable: fix doc links
The doc links in scylla-sstable help output are static, so they always
point to the documentation of the latest stable release, not to the
documentation of the release the tool binary is from. On top of that,
the links point to old open-source documentation, which is now EOL.
Fix both problems: point link at the new source-available documentation
pages and make them version aware.

(cherry picked from commit fe73c90df9)
2025-10-03 14:29:19 +00:00
Botond Dénes
8b83294c0f release: adjust doc_link() for the post source-available world
There is no more separate enterprise product and the doc urls are
slightly different.

(cherry picked from commit 15a4a9936b)
2025-10-03 14:29:19 +00:00
Botond Dénes
5930726b38 tools/scylla-nodetool: remove trailing " from doc urls
They are accidental leftover from a previous way of storing command
descriptions.

(cherry picked from commit 5a69838d06)
2025-10-03 14:29:19 +00:00
Dario Mirovic
664cdd3d99 test: dtest: limits_test.py: test_max_cells log level
Set `lsa-timing` logger log level to `debug`. This will help with
the analysis of the whole spectrum of memory reclaim operation
times and memory sizes.

Refs #25097

(cherry picked from commit b3347bcf84)
2025-10-01 22:40:34 +02:00
Dario Mirovic
4ea6c51fb1 test: dtest: limits_test.py: make the tests work
Remove unused imports and markers.
Remove Apache license header.

Enable the test in suite.yaml for `dev` and `debug` modes.

Refs #25097

(cherry picked from commit 554fd5e801)
2025-10-01 22:40:29 +02:00
Dario Mirovic
eb9babfd4a test: dtest: test_limits.py: remove test that are not being migrated
Refs #25097

(cherry picked from commit 70128fd5c7)
2025-10-01 22:40:24 +02:00
Dario Mirovic
558f460517 test: dtest: copy unmodified limits_test.py
Copy limits_test.py from scylla-dtest to test/cluster/dtest/limits_test.py.
Add license header.

Disable it for `debug`, `dev`, and `release` mode.

Refs #25097

(cherry picked from commit 82e9623911)
2025-10-01 22:40:16 +02:00
Jenkins Promoter
a9f4024c1b Update pgo profiles - aarch64 2025-10-01 04:42:23 +03:00
Jenkins Promoter
6969918d31 Update pgo profiles - x86_64 2025-10-01 04:20:49 +03:00
Luis Freitas
d69edfcd34 Update ScyllaDB version to: 2025.4.0-rc0 2025-09-30 18:51:59 +03:00
167 changed files with 4022 additions and 1066 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2025.4.0-dev
VERSION=2025.4.0-rc4
if test -f version
then

View File

@@ -136,6 +136,7 @@ future<> controller::start_server() {
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);
}).handle_exception([this, addr, alternator_port, alternator_https_port] (std::exception_ptr ep) {

View File

@@ -16,6 +16,7 @@
#include "cdc/cdc_options.hh"
#include "auth/service.hh"
#include "db/config.hh"
#include "db/view/view_build_status.hh"
#include "mutation/tombstone.hh"
#include "utils/log.hh"
#include "schema/schema_builder.hh"
@@ -243,7 +244,8 @@ executor::executor(gms::gossiper& gossiper,
_mm(mm),
_sdks(sdks),
_cdc_metadata(cdc_metadata),
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()),
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization),
_warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization),
_ssg(ssg),
_parsed_expression_cache(std::make_unique<parsed::expression_cache>(
parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard},
@@ -879,15 +881,37 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
co_return rjson::print(std::move(response));
}
// This function increments the authorization_failures counter, and may also
// log a warn-level message and/or throw an access_denied exception, depending
// on what enforce_authorization and warn_authorization are set to.
// Note that if enforce_authorization is false, this function will return
// without throwing. So a caller that doesn't want to continue after an
// authorization_error must explicitly return after calling this function.
static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) {
stats.authorization_failures++;
if (enforce_authorization) {
if (warn_authorization) {
elogger.warn("alternator_warn_authorization=true: {}", msg);
}
throw api_error::access_denied(std::move(msg));
} else {
if (warn_authorization) {
elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg);
}
}
}
// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY,
// SELECT, DROP, etc.) on the given table. When permission is denied an
// appropriate user-readable api_error::access_denied is thrown.
future<> verify_permission(
bool enforce_authorization,
bool warn_authorization,
const service::client_state& client_state,
const schema_ptr& schema,
auth::permission permission_to_check) {
if (!enforce_authorization) {
auth::permission permission_to_check,
alternator::stats& stats) {
if (!enforce_authorization && !warn_authorization) {
co_return;
}
// Unfortunately, the fix for issue #23218 did not modify the function
@@ -902,31 +926,33 @@ future<> verify_permission(
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
throw api_error::access_denied(fmt::format(
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
"Write access denied on internal table {}.{} to role {} because it is not a superuser",
schema->ks_name(), schema->cf_name(), username));
co_return;
}
}
auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name());
if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
if (!client_state.user() || !client_state.user()->name ||
!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
sstring username = "<anonymous>";
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
// Using exceptions for errors makes this function faster in the
// success path (when the operation is allowed).
throw api_error::access_denied(format(
"{} access on table {}.{} is denied to role {}",
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
"{} access on table {}.{} is denied to role {}, client address {}",
auth::permissions::to_string(permission_to_check),
schema->ks_name(), schema->cf_name(), username));
schema->ks_name(), schema->cf_name(), username, client_state.get_client_address()));
}
}
// Similar to verify_permission() above, but just for CREATE operations.
// Those do not operate on any specific table, so require permissions on
// ALL KEYSPACES instead of any specific table.
future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) {
if (!enforce_authorization) {
static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) {
if (!enforce_authorization && !warn_authorization) {
co_return;
}
auto resource = auth::resource(auth::resource_kind::data);
@@ -935,7 +961,7 @@ future<> verify_create_permission(bool enforce_authorization, const service::cli
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
throw api_error::access_denied(format(
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
"CREATE access on ALL KEYSPACES is denied to role {}", username));
}
}
@@ -952,7 +978,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
schema_ptr schema = get_table(_proxy, request);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
size_t retries = mm.get_concurrent_ddl_retries();
for (;;) {
@@ -1290,7 +1316,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
if (tags->Size() < 1) {
co_return api_error::validation("The number of tags must be at least 1") ;
}
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
});
@@ -1311,7 +1337,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++;
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
});
@@ -1496,8 +1522,25 @@ bytes extract_from_attrs_column_computation::compute_value(const schema&, const
on_internal_error(elogger, "extract_from_attrs_column_computation::compute_value called without row");
}
// Because `CreateTable` request creates GSI/LSI together with the base table (so the base table is empty),
// we can skip view building process and immediately mark the view as built on all nodes.
//
// However, we can do this only for tablet-based views because `view_building_worker` will automatically propagate
// this information to `system.built_views` table (see `view_building_worker::update_built_views()`).
// For vnode-based views, `view_builder` will process the view and mark it as built.
static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out, std::vector<schema_ptr> schemas, api::timestamp_type ts, service::storage_proxy& sp) {
auto token_metadata = sp.get_token_metadata_ptr();
for (auto& schema: schemas) {
if (schema->is_view()) {
for (auto& host_id: token_metadata->get_topology().get_all_host_ids()) {
auto view_status_mut = co_await sp.system_keyspace().make_view_build_status_mutation(ts, {schema->ks_name(), schema->cf_name()}, host_id, db::view::build_status::SUCCESS);
out.push_back(std::move(view_status_mut));
}
}
}
}
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) {
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
@@ -1703,7 +1746,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
set_table_creation_time(tags_map, db_clock::now());
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
co_await verify_create_permission(enforce_authorization, client_state);
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
schema_ptr schema = builder.build();
for (auto& view_builder : view_builders) {
@@ -1754,6 +1797,9 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
schemas.push_back(view_builder.build());
}
co_await service::prepare_new_column_families_announcement(schema_mutations, sp, *ksm, schemas, ts);
if (ksm->uses_tablets()) {
co_await mark_view_schemas_as_built(schema_mutations, schemas, ts, sp);
}
// If a role is allowed to create a table, we must give it permissions to
// use (and eventually delete) the specific table it just created (and
@@ -1800,9 +1846,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization);
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats);
});
}
@@ -1855,7 +1901,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
verify_billing_mode(request);
}
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request]
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
schema_ptr schema;
size_t retries = mm.get_concurrent_ddl_retries();
@@ -2026,7 +2072,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified");
}
co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER);
co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats);
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
for (view_ptr view : new_views) {
auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp());
@@ -2789,7 +2835,7 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -2892,7 +2938,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -3163,7 +3209,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema));
}
for (const auto& b : mutation_builders) {
co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats);
}
// If alternator_force_read_before_write is true we will first get the previous item size
// and only then do send the mutation.
@@ -3636,16 +3682,16 @@ future<std::vector<rjson::value>> executor::describe_multi_item(schema_ptr schem
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
uint64_t& rcu_half_units) {
noncopyable_function<void(uint64_t)> item_callback) {
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build();
std::vector<rjson::value> ret;
for (auto& result_row : result_set->rows()) {
rjson::value item = rjson::empty_object();
rcu_consumed_capacity_counter consumed_capacity;
describe_single_item(*selection, result_row, *attrs_to_get, item, &consumed_capacity._total_bytes);
rcu_half_units += consumed_capacity.get_half_units();
uint64_t item_length_in_bytes = 0;
describe_single_item(*selection, result_row, *attrs_to_get, item, &item_length_in_bytes);
item_callback(item_length_in_bytes);
ret.push_back(std::move(item));
co_await coroutine::maybe_yield();
}
@@ -4365,7 +4411,7 @@ future<executor::request_return_type> executor::update_item(client_state& client
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -4475,7 +4521,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem");
rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
service::storage_proxy::coordinator_query_result qr =
co_await _proxy.query(
schema, std::move(command), std::move(partition_ranges), cl,
@@ -4584,7 +4630,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
}
};
std::vector<table_requests> requests;
std::vector<std::vector<uint64_t>> responses_sizes;
uint batch_size = 0;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs(get_table_from_batch_request(_proxy, it));
@@ -4604,7 +4649,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
}
for (const table_requests& tr : requests) {
co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats);
}
_stats.api_operations.batch_get_item_batch_total += batch_size;
@@ -4612,11 +4657,10 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
// If we got here, all "requests" are valid, so let's start the
// requests for the different partitions all in parallel.
std::vector<future<std::vector<rjson::value>>> response_futures;
responses_sizes.resize(requests.size());
size_t responses_sizes_pos = 0;
for (const auto& rs : requests) {
responses_sizes[responses_sizes_pos].resize(rs.requests.size());
size_t pos = 0;
std::vector<uint64_t> consumed_rcu_half_units_per_table(requests.size());
for (size_t i = 0; i < requests.size(); i++) {
const table_requests& rs = requests[i];
bool is_quorum = rs.cl == db::consistency_level::LOCAL_QUORUM;
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_histogram.add(rs.requests.size());
for (const auto &r : rs.requests) {
@@ -4639,16 +4683,17 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
query::tombstone_limit(_proxy.get_tombstone_limit()));
command->allow_limit = db::allow_per_partition_rate_limit::yes;
const auto item_callback = [is_quorum, &rcus_per_table = consumed_rcu_half_units_per_table[i]](uint64_t size) {
rcus_per_table += rcu_consumed_capacity_counter::get_half_units(size, is_quorum);
};
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, &response_size = responses_sizes[responses_sizes_pos][pos]] (service::storage_proxy::coordinator_query_result qr) mutable {
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, item_callback = std::move(item_callback)] (service::storage_proxy::coordinator_query_result qr) mutable {
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), response_size);
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), std::move(item_callback));
});
pos++;
response_futures.push_back(std::move(f));
}
responses_sizes_pos++;
}
// Wait for all requests to complete, and then return the response.
@@ -4660,14 +4705,11 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
rjson::value response = rjson::empty_object();
rjson::add(response, "Responses", rjson::empty_object());
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
size_t rcu_half_units;
auto fut_it = response_futures.begin();
responses_sizes_pos = 0;
rjson::value consumed_capacity = rjson::empty_array();
for (const auto& rs : requests) {
for (size_t i = 0; i < requests.size(); i++) {
const table_requests& rs = requests[i];
std::string table = table_name(*rs.schema);
size_t pos = 0;
rcu_half_units = 0;
for (const auto &r : rs.requests) {
auto& pk = r.first;
auto& cks = r.second;
@@ -4682,7 +4724,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
for (rjson::value& json : results) {
rjson::push_back(response["Responses"][table], std::move(json));
}
rcu_half_units += rcu_consumed_capacity_counter::get_half_units(responses_sizes[responses_sizes_pos][pos], rs.cl == db::consistency_level::LOCAL_QUORUM);
} catch(...) {
eptr = std::current_exception();
// This read of potentially several rows in one partition,
@@ -4706,8 +4747,8 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second));
}
}
pos++;
}
uint64_t rcu_half_units = consumed_rcu_half_units_per_table[i];
_stats.rcu_half_units_total += rcu_half_units;
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->rcu_half_units_total += rcu_half_units;
@@ -4717,7 +4758,6 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
rjson::add(entry, "CapacityUnits", rcu_half_units*0.5);
rjson::push_back(consumed_capacity, std::move(entry));
}
responses_sizes_pos++;
}
if (should_add_rcu) {
@@ -5069,10 +5109,11 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
filter filter,
query::partition_slice::option_set custom_opts,
service::client_state& client_state,
cql3::cql_stats& cql_stats,
alternator::stats& stats,
tracing::trace_state_ptr trace_state,
service_permit permit,
bool enforce_authorization) {
bool enforce_authorization,
bool warn_authorization) {
lw_shared_ptr<service::pager::paging_state> old_paging_state = nullptr;
tracing::trace(trace_state, "Performing a database query");
@@ -5099,7 +5140,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
old_paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
}
co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT);
co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats);
auto regular_columns =
table_schema->regular_columns() | std::views::transform(&column_definition::id)
@@ -5134,10 +5175,10 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
if (paging_state) {
rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state));
}
if (has_filter){
cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
if (has_filter) {
stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
// update our "filtered_row_matched_total" for all the rows matched, despited the filter
cql_stats.filtered_rows_matched_total += size;
stats.cql_stats.filtered_rows_matched_total += size;
}
if (opt_items) {
if (opt_items->size() >= max_items_for_rapidjson_array) {
@@ -5261,7 +5302,7 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan");
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization);
std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization);
}
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
@@ -5742,7 +5783,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
query::partition_slice::option_set opts;
opts.set_if<query::partition_slice::option::reversed>(!forward);
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization);
std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization);
}
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {

View File

@@ -139,6 +139,7 @@ class executor : public peering_sharded_service<executor> {
db::system_distributed_keyspace& _sdks;
cdc::metadata& _cdc_metadata;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
// An smp_service_group to be used for limiting the concurrency when
// forwarding Alternator request between shards - if necessary for LWT.
smp_service_group _ssg;
@@ -228,12 +229,15 @@ public:
const std::optional<attrs_to_get>&,
uint64_t* = nullptr);
// Converts a multi-row selection result to JSON compatible with DynamoDB.
// For each row, this method calls item_callback, which takes the size of
// the item as the parameter.
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
uint64_t& rcu_half_units);
noncopyable_function<void(uint64_t)> item_callback = {});
static void describe_single_item(const cql3::selection::selection&,
const std::vector<managed_bytes_opt>&,
@@ -261,7 +265,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000);
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
// SELECT, DROP, etc.) on the given table. When permission is denied an
// appropriate user-readable api_error::access_denied is thrown.
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
/**
* Make return type for serializing the object "streamed",

View File

@@ -31,6 +31,7 @@
#include "utils/overloaded_functor.hh"
#include "utils/aws_sigv4.hh"
#include "client_data.hh"
#include "utils/updateable_value.hh"
static logging::logger slogger("alternator-server");
@@ -270,24 +271,57 @@ protected:
}
};
// This function increments the authentication_failures counter, and may also
// log a warn-level message and/or throw an exception, depending on what
// enforce_authorization and warn_authorization are set to.
// The username and client address are only used for logging purposes -
// they are not included in the error message returned to the client, since
// the client knows who it is.
// Note that if enforce_authorization is false, this function will return
// without throwing. So a caller that doesn't want to continue after an
// authentication_error must explicitly return after calling this function.
template<typename Exception>
static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) {
stats.authentication_failures++;
if (enforce_authorization) {
if (warn_authorization) {
slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address);
}
throw std::move(e);
} else {
if (warn_authorization) {
slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address);
}
}
}
future<std::string> server::verify_signature(const request& req, const chunked_content& content) {
if (!_enforce_authorization) {
if (!_enforce_authorization.get() && !_warn_authorization.get()) {
slogger.debug("Skipping authorization");
return make_ready_future<std::string>();
}
auto host_it = req._headers.find("Host");
if (host_it == req._headers.end()) {
throw api_error::invalid_signature("Host header is mandatory for signature verification");
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature("Host header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
}
auto authorization_it = req._headers.find("Authorization");
if (authorization_it == req._headers.end()) {
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::missing_authentication_token("Authorization header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
}
std::string host = host_it->second;
std::string_view authorization_header = authorization_it->second;
auto pos = authorization_header.find_first_of(' ');
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)),
"", req.get_client_address());
return make_ready_future<std::string>();
}
authorization_header.remove_prefix(pos+1);
std::string credential;
@@ -322,7 +356,9 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential));
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address());
return make_ready_future<std::string>();
}
std::string user(credential_split[0]);
std::string datestamp(credential_split[1]);
@@ -346,7 +382,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
return get_key_from_roles(proxy, as, std::move(username));
};
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
user = std::move(user),
host = std::move(host),
datestamp = std::move(datestamp),
@@ -354,18 +390,32 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
signed_headers_map = std::move(signed_headers_map),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
key_cache::value_ptr key_ptr(nullptr);
try {
key_ptr = key_ptr_fut.get();
} catch (const api_error& e) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
e, user, req.get_client_address());
return std::string();
}
std::string signature;
try {
signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method,
datestamp, signed_headers_str, signed_headers_map, &content, region, service, "");
} catch (const std::exception& e) {
throw api_error::invalid_signature(e.what());
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())),
user, req.get_client_address());
return std::string();
}
if (signature != std::string_view(user_signature)) {
_key_cache.remove(user);
throw api_error::unrecognized_client("The security token included in the request is invalid.");
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::unrecognized_client("wrong signature"),
user, req.get_client_address());
return std::string();
}
return user;
});
@@ -597,9 +647,11 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
_enforce_authorization = std::move(enforce_authorization);
_warn_authorization = std::move(warn_authorization);
_max_concurrent_requests = std::move(max_concurrent_requests);
if (!port && !https_port) {
return make_exception_future<>(std::runtime_error("Either regular port or TLS port"

View File

@@ -43,6 +43,7 @@ class server : public peering_sharded_service<server> {
key_cache _key_cache;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server>, 2> _enabled_servers;
named_gate _pending_requests;
// In some places we will need a CQL updateable_timeout_config object even
@@ -94,7 +95,8 @@ public:
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
// get_client_data() is called (on each shard separately) when the virtual
// table "system.clients" is read. It is expected to generate a list of

View File

@@ -176,6 +176,16 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses,
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty()
});
// Only register the following metrics for the global metrics, not per-table
if (!has_table) {
metrics.add_group("alternator", {
seastar::metrics::make_counter("authentication_failures", stats.authentication_failures,
seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_counter("authorization_failures", stats.authorization_failures,
seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
});
}
}
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) {

View File

@@ -79,6 +79,17 @@ public:
utils::estimated_histogram batch_get_item_histogram{22}; // a histogram that covers the range 1 - 100
utils::estimated_histogram batch_write_item_histogram{22}; // a histogram that covers the range 1 - 100
} api_operations;
// Count of authentication and authorization failures, counted if either
// alternator_enforce_authorization or alternator_warn_authorization are
// set to true. If both are false, no authentication or authorization
// checks are performed, so failures are not recognized or counted.
// "authentication" failure means the request was not signed with a valid
// user and key combination. "authorization" failure means the request was
// authenticated to a valid user - but this user did not have permissions
// to perform the operation (considering RBAC settings and the user's
// superuser status).
uint64_t authentication_failures = 0;
uint64_t authorization_failures = 0;
// Miscellaneous event counters
uint64_t total_operations = 0;
uint64_t unsupported_operations = 0;

View File

@@ -828,7 +828,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
partition_key pk = iter.shard.id.to_partition_key(*schema);

View File

@@ -94,7 +94,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {

View File

@@ -2924,7 +2924,7 @@
},
{
"name":"incremental_mode",
"description":"Set the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.",
"description":"Set the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental mode.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -233,9 +233,9 @@ future<role_set> ldap_role_manager::query_granted(std::string_view grantee_name,
}
future<role_to_directly_granted_map>
ldap_role_manager::query_all_directly_granted() {
ldap_role_manager::query_all_directly_granted(::service::query_state& qs) {
role_to_directly_granted_map result;
auto roles = co_await query_all();
auto roles = co_await query_all(qs);
for (auto& role: roles) {
auto granted_set = co_await query_granted(role, recursive_role_query::no);
for (auto& granted: granted_set) {
@@ -247,8 +247,8 @@ ldap_role_manager::query_all_directly_granted() {
co_return result;
}
future<role_set> ldap_role_manager::query_all() {
return _std_mgr.query_all();
future<role_set> ldap_role_manager::query_all(::service::query_state& qs) {
return _std_mgr.query_all(qs);
}
future<> ldap_role_manager::create_role(std::string_view role_name) {
@@ -311,12 +311,12 @@ future<bool> ldap_role_manager::can_login(std::string_view role_name) {
}
future<std::optional<sstring>> ldap_role_manager::get_attribute(
std::string_view role_name, std::string_view attribute_name) {
return _std_mgr.get_attribute(role_name, attribute_name);
std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
return _std_mgr.get_attribute(role_name, attribute_name, qs);
}
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name) {
return _std_mgr.query_attribute_for_all(attribute_name);
future<role_manager::attribute_vals> ldap_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state& qs) {
return _std_mgr.query_attribute_for_all(attribute_name, qs);
}
future<> ldap_role_manager::set_attribute(

View File

@@ -75,9 +75,9 @@ class ldap_role_manager : public role_manager {
future<role_set> query_granted(std::string_view, recursive_role_query) override;
future<role_to_directly_granted_map> query_all_directly_granted() override;
future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
future<role_set> query_all() override;
future<role_set> query_all(::service::query_state&) override;
future<bool> exists(std::string_view) override;
@@ -85,9 +85,9 @@ class ldap_role_manager : public role_manager {
future<bool> can_login(std::string_view) override;
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view) override;
future<std::optional<sstring>> get_attribute(std::string_view, std::string_view, ::service::query_state&) override;
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view) override;
future<role_manager::attribute_vals> query_attribute_for_all(std::string_view, ::service::query_state&) override;
future<> set_attribute(std::string_view, std::string_view, std::string_view, ::service::group0_batch& mc) override;

View File

@@ -78,11 +78,11 @@ future<role_set> maintenance_socket_role_manager::query_granted(std::string_view
return operation_not_supported_exception<role_set>("QUERY GRANTED");
}
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted() {
future<role_to_directly_granted_map> maintenance_socket_role_manager::query_all_directly_granted(::service::query_state&) {
return operation_not_supported_exception<role_to_directly_granted_map>("QUERY ALL DIRECTLY GRANTED");
}
future<role_set> maintenance_socket_role_manager::query_all() {
future<role_set> maintenance_socket_role_manager::query_all(::service::query_state&) {
return operation_not_supported_exception<role_set>("QUERY ALL");
}
@@ -98,11 +98,11 @@ future<bool> maintenance_socket_role_manager::can_login(std::string_view role_na
return make_ready_future<bool>(true);
}
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
future<std::optional<sstring>> maintenance_socket_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<std::optional<sstring>>("GET ATTRIBUTE");
}
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name) {
future<role_manager::attribute_vals> maintenance_socket_role_manager::query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) {
return operation_not_supported_exception<role_manager::attribute_vals>("QUERY ATTRIBUTE");
}

View File

@@ -53,9 +53,9 @@ public:
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
virtual future<role_set> query_all() override;
virtual future<role_set> query_all(::service::query_state&) override;
virtual future<bool> exists(std::string_view role_name) override;
@@ -63,9 +63,9 @@ public:
virtual future<bool> can_login(std::string_view role_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;

View File

@@ -36,7 +36,8 @@ static const std::unordered_map<sstring, auth::permission> permission_names({
{"MODIFY", auth::permission::MODIFY},
{"AUTHORIZE", auth::permission::AUTHORIZE},
{"DESCRIBE", auth::permission::DESCRIBE},
{"EXECUTE", auth::permission::EXECUTE}});
{"EXECUTE", auth::permission::EXECUTE},
{"VECTOR_SEARCH_INDEXING", auth::permission::VECTOR_SEARCH_INDEXING}});
const sstring& auth::permissions::to_string(permission p) {
for (auto& v : permission_names) {

View File

@@ -33,6 +33,7 @@ enum class permission {
// data access
SELECT, // required for SELECT.
MODIFY, // required for INSERT, UPDATE, DELETE, TRUNCATE.
VECTOR_SEARCH_INDEXING, // required for SELECT from tables with vector indexes if SELECT permission is not granted.
// permission management
AUTHORIZE, // required for GRANT and REVOKE.
@@ -54,7 +55,8 @@ typedef enum_set<
permission::MODIFY,
permission::AUTHORIZE,
permission::DESCRIBE,
permission::EXECUTE>> permission_set;
permission::EXECUTE,
permission::VECTOR_SEARCH_INDEXING>> permission_set;
bool operator<(const permission_set&, const permission_set&);

View File

@@ -41,22 +41,26 @@ static const std::unordered_map<resource_kind, std::size_t> max_parts{
{resource_kind::functions, 2}};
static permission_set applicable_permissions(const data_resource_view& dv) {
if (dv.table()) {
return permission_set::of<
// We only support VECTOR_SEARCH_INDEXING permission for ALL KEYSPACES.
auto set = permission_set::of<
permission::ALTER,
permission::DROP,
permission::SELECT,
permission::MODIFY,
permission::AUTHORIZE>();
if (!dv.table()) {
set.add(permission_set::of<permission::CREATE>());
}
return permission_set::of<
permission::CREATE,
permission::ALTER,
permission::DROP,
permission::SELECT,
permission::MODIFY,
permission::AUTHORIZE>();
if (!dv.table() && !dv.keyspace()) {
set.add(permission_set::of<permission::VECTOR_SEARCH_INDEXING>());
}
return set;
}
static permission_set applicable_permissions(const role_resource_view& rv) {

View File

@@ -17,12 +17,17 @@
#include <seastar/core/format.hh>
#include <seastar/core/sstring.hh>
#include "auth/common.hh"
#include "auth/resource.hh"
#include "cql3/description.hh"
#include "seastarx.hh"
#include "exceptions/exceptions.hh"
#include "service/raft/raft_group0_client.hh"
namespace service {
class query_state;
};
namespace auth {
struct role_config final {
@@ -167,9 +172,9 @@ public:
/// (role2, role3)
/// }
///
virtual future<role_to_directly_granted_map> query_all_directly_granted() = 0;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state& = internal_distributed_query_state()) = 0;
virtual future<role_set> query_all() = 0;
virtual future<role_set> query_all(::service::query_state& = internal_distributed_query_state()) = 0;
virtual future<bool> exists(std::string_view role_name) = 0;
@@ -186,12 +191,12 @@ public:
///
/// \returns the value of the named attribute, if one is set.
///
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) = 0;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
///
/// \returns a mapping of each role's value for the named attribute, if one is set for the role.
///
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name) = 0;
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state& = internal_distributed_query_state()) = 0;
/// Sets `attribute_name` with `attribute_value` for `role_name`.
/// \returns an exceptional future with nonexistant_role if the role does not exist.

View File

@@ -231,6 +231,17 @@ struct command_desc {
} type_ = type::OTHER;
};
/// Similar to command_desc, but used in cases where multiple permissions allow the access to the resource.
struct command_desc_with_permission_set {
permission_set permission;
const ::auth::resource& resource;
enum class type {
ALTER_WITH_OPTS,
ALTER_SYSTEM_WITH_ALLOWED_OPTS,
OTHER
} type_ = type::OTHER;
};
///
/// Protected resources cannot be modified even if the performer has permissions to do so.
///

View File

@@ -663,21 +663,30 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
});
}
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted() {
future<role_to_directly_granted_map> standard_role_manager::query_all_directly_granted(::service::query_state& qs) {
const sstring query = seastar::format("SELECT * FROM {}.{}",
get_auth_ks_name(_qp),
meta::role_members_table::name);
const auto results = co_await _qp.execute_internal(
query,
db::consistency_level::ONE,
qs,
cql3::query_processor::cache_internal::yes);
role_to_directly_granted_map roles_map;
co_await _qp.query_internal(query, [&roles_map] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
roles_map.insert({row.get_as<sstring>("member"), row.get_as<sstring>("role")});
co_return stop_iteration::no;
});
std::transform(
results->begin(),
results->end(),
std::inserter(roles_map, roles_map.begin()),
[] (const cql3::untyped_result_set_row& row) {
return std::make_pair(row.get_as<sstring>("member"), row.get_as<sstring>("role")); }
);
co_return roles_map;
}
future<role_set> standard_role_manager::query_all() {
future<role_set> standard_role_manager::query_all(::service::query_state& qs) {
const sstring query = seastar::format("SELECT {} FROM {}.{}",
meta::roles_table::role_col_name,
get_auth_ks_name(_qp),
@@ -695,7 +704,7 @@ future<role_set> standard_role_manager::query_all() {
const auto results = co_await _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_query_state(),
qs,
cql3::query_processor::cache_internal::yes);
role_set roles;
@@ -727,11 +736,11 @@ future<bool> standard_role_manager::can_login(std::string_view role_name) {
});
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state& qs) {
const sstring query = seastar::format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
get_auth_ks_name(_qp),
meta::role_attributes_table::name);
const auto result_set = co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
const auto result_set = co_await _qp.execute_internal(query, db::consistency_level::ONE, qs, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
if (!result_set->empty()) {
const cql3::untyped_result_set_row &row = result_set->one();
co_return std::optional<sstring>(row.get_as<sstring>("value"));
@@ -739,11 +748,11 @@ future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_
co_return std::optional<sstring>{};
}
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name) {
return query_all().then([this, attribute_name] (role_set roles) {
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles)] (attribute_vals &role_to_att_val) {
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name] (sstring role) {
return get_attribute(role, attribute_name).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name, ::service::query_state& qs) {
return query_all(qs).then([this, attribute_name, &qs] (role_set roles) {
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles), &qs] (attribute_vals &role_to_att_val) {
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name, &qs] (sstring role) {
return get_attribute(role, attribute_name, qs).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
if (att_val) {
role_to_att_val.emplace(std::move(role), std::move(*att_val));
}
@@ -788,7 +797,7 @@ future<> standard_role_manager::remove_attribute(std::string_view role_name, std
future<std::vector<cql3::description>> standard_role_manager::describe_role_grants() {
std::vector<cql3::description> result{};
const auto grants = co_await query_all_directly_granted();
const auto grants = co_await query_all_directly_granted(internal_distributed_query_state());
result.reserve(grants.size());
for (const auto& [grantee_role, granted_role] : grants) {

View File

@@ -66,9 +66,9 @@ public:
virtual future<role_set> query_granted(std::string_view grantee_name, recursive_role_query) override;
virtual future<role_to_directly_granted_map> query_all_directly_granted() override;
virtual future<role_to_directly_granted_map> query_all_directly_granted(::service::query_state&) override;
virtual future<role_set> query_all() override;
virtual future<role_set> query_all(::service::query_state&) override;
virtual future<bool> exists(std::string_view role_name) override;
@@ -76,9 +76,9 @@ public:
virtual future<bool> can_login(std::string_view role_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name, ::service::query_state&) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name, ::service::query_state&) override;
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value, ::service::group0_batch& mc) override;

View File

@@ -1209,6 +1209,23 @@ future<mutation> create_table_streams_mutation(table_id table, db_clock::time_po
co_return std::move(m);
}
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
auto s = db::system_keyspace::cdc_streams_state();
mutation m(s, partition_key::from_single_value(*s,
data_value(table.uuid()).serialize_nonnull()
));
m.set_static_cell("timestamp", stream_ts, ts);
for (const auto& sid : stream_ids) {
auto ck = clustering_key::from_singular(*s, dht::token::to_int64(sid.token()));
m.set_cell(ck, "stream_id", data_value(sid.to_bytes()), ts);
co_await coroutine::maybe_yield();
}
co_return std::move(m);
}
utils::chunked_vector<mutation>
make_drop_table_streams_mutations(table_id table, api::timestamp_type ts) {
utils::chunked_vector<mutation> mutations;
@@ -1235,32 +1252,50 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
tables_to_process = _cdc_metadata.get_tables_with_cdc_tablet_streams() | std::ranges::to<std::unordered_set<table_id>>();
}
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) -> future<> {
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) -> future<> {
if (tables) {
for (auto table : *tables) {
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
return f(table, base_ts, std::move(base_stream_set));
});
}
} else {
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
return f(table, base_ts, std::move(base_stream_set));
});
}
};
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
table_streams new_table_map;
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, std::vector<cdc::stream_id> stream_set) {
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector<cdc::stream_id> stream_set) {
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(stream_tp.time_since_epoch()).count();
new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)};
};
append_stream(base_ts, std::move(base_stream_set));
// if we already have a loaded streams map, and the base timestamp is unchanged, then read
// the history entries starting from the latest one we have and append it to the existing map.
// we can do it because we only append new rows with higher timestamps to the history table.
std::optional<std::reference_wrapper<const committed_stream_set>> from_streams;
std::optional<db_clock::time_point> from_ts;
const auto& all_streams = _cdc_metadata.get_all_tablet_streams();
if (auto it = all_streams.find(table); it != all_streams.end()) {
const auto& current_map = *it->second;
if (current_map.cbegin()->second.ts == base_ts) {
const auto& latest_entry = current_map.crbegin()->second;
from_streams = std::cref(latest_entry);
from_ts = latest_entry.ts;
}
}
co_await _sys_ks.local().read_cdc_streams_history(table, [&] (table_id tid, db_clock::time_point ts, cdc_stream_diff diff) -> future<> {
const auto& prev_stream_set = std::crbegin(new_table_map)->second.streams;
if (!from_ts) {
append_stream(base_ts, std::move(base_stream_set));
}
co_await _sys_ks.local().read_cdc_streams_history(table, from_ts, [&] (table_id tid, db_clock::time_point ts, cdc_stream_diff diff) -> future<> {
const auto& prev_stream_set = new_table_map.empty() ?
from_streams->get().streams : std::crbegin(new_table_map)->second.streams;
append_stream(ts, co_await cdc::metadata::construct_next_stream_set(
prev_stream_set, std::move(diff.opened_streams), diff.closed_streams));
@@ -1272,7 +1307,11 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
new_table_map_copy[ts] = entry;
co_await coroutine::maybe_yield();
}
svc._cdc_metadata.load_tablet_streams_map(table, std::move(new_table_map_copy));
if (!from_ts) {
svc._cdc_metadata.load_tablet_streams_map(table, std::move(new_table_map_copy));
} else {
svc._cdc_metadata.append_tablet_streams_map(table, std::move(new_table_map_copy));
}
}));
tables_to_process.erase(table);
@@ -1306,7 +1345,7 @@ future<> generation_service::query_cdc_timestamps(table_id table, bool ascending
}
}
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
const auto& all_tables = _cdc_metadata.get_all_tablet_streams();
auto table_it = all_tables.find(table);
if (table_it == all_tables.end()) {
@@ -1363,8 +1402,8 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
co_return;
}
std::vector<cdc::stream_id> new_streams;
new_streams.reserve(new_tablet_map.tablet_count());
utils::chunked_vector<cdc::stream_id> new_streams;
co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count());
for (auto tid : new_tablet_map.tablet_ids()) {
new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0);
co_await coroutine::maybe_yield();
@@ -1386,4 +1425,113 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
muts.emplace_back(std::move(mut));
}
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
utils::chunked_vector<mutation> muts;
muts.reserve(2);
auto gc_now = gc_clock::now();
auto tombstone_ts = ts - 1;
{
// write the new base stream set to cdc_streams_state
auto s = db::system_keyspace::cdc_streams_state();
mutation m(s, partition_key::from_single_value(*s,
data_value(table.uuid()).serialize_nonnull()
));
m.partition().apply(tombstone(tombstone_ts, gc_now));
m.set_static_cell("timestamp", data_value(base_ts), ts);
for (const auto& sid : base_stream_set) {
co_await coroutine::maybe_yield();
auto ck = clustering_key::from_singular(*s, dht::token::to_int64(sid.token()));
m.set_cell(ck, "stream_id", data_value(sid.to_bytes()), ts);
}
muts.emplace_back(std::move(m));
}
{
// remove all entries from cdc_streams_history up to the new base
auto s = db::system_keyspace::cdc_streams_history();
mutation m(s, partition_key::from_single_value(*s,
data_value(table.uuid()).serialize_nonnull()
));
auto range = query::clustering_range::make_ending_with({
clustering_key_prefix::from_single_value(*s, timestamp_type->decompose(base_ts)), true});
auto bv = bound_view::from_range(range);
m.partition().apply_delete(*s, range_tombstone{bv.first, bv.second, tombstone{ts, gc_now}});
muts.emplace_back(std::move(m));
}
co_return std::move(muts);
}
table_streams::const_iterator get_new_base_for_gc(const table_streams& streams_map, std::chrono::seconds ttl) {
// find the most recent timestamp that is older than ttl_seconds, which will become the new base.
// all streams with older timestamps can be removed because they are closed for more than ttl_seconds
// (they are all replaced by streams with the newer timestamp).
auto ts_upper_bound = db_clock::now() - ttl;
auto it = streams_map.begin();
while (it != streams_map.end()) {
auto next_it = std::next(it);
if (next_it == streams_map.end()) {
break;
}
auto next_tp = next_it->second.ts;
if (next_tp <= ts_upper_bound) {
// the next timestamp is older than ttl_seconds, so the current one is obsolete
it = next_it;
} else {
break;
}
}
return it;
}
future<utils::chunked_vector<mutation>> generation_service::garbage_collect_cdc_streams_for_table(table_id table, std::optional<std::chrono::seconds> ttl, api::timestamp_type ts) {
const auto& table_streams = *_cdc_metadata.get_all_tablet_streams().at(table);
// if TTL is not provided by the caller then use the table's CDC TTL
auto base_schema = cdc::get_base_table(_db, *_db.find_schema(table));
ttl = ttl.or_else([&] -> std::optional<std::chrono::seconds> {
auto ttl_seconds = base_schema->cdc_options().ttl();
if (ttl_seconds > 0) {
return std::chrono::seconds(ttl_seconds);
} else {
// ttl=0 means no ttl
return std::nullopt;
}
});
if (!ttl) {
co_return utils::chunked_vector<mutation>{};
}
auto new_base_it = get_new_base_for_gc(table_streams, *ttl);
if (new_base_it == table_streams.begin() || new_base_it == table_streams.end()) {
// nothing to gc
co_return utils::chunked_vector<mutation>{};
}
for (auto it = table_streams.begin(); it != new_base_it; ++it) {
cdc_log.info("Garbage collecting CDC stream metadata for table {}: removing generation {} because it is older than the CDC TTL of {} seconds",
table, it->second.ts, *ttl);
}
co_return co_await get_cdc_stream_gc_mutations(table, new_base_it->second.ts, new_base_it->second.streams, ts);
}
future<> generation_service::garbage_collect_cdc_streams(utils::chunked_vector<canonical_mutation>& muts, api::timestamp_type ts) {
for (auto table : _cdc_metadata.get_tables_with_cdc_tablet_streams()) {
co_await coroutine::maybe_yield();
auto table_muts = co_await garbage_collect_cdc_streams_for_table(table, std::nullopt, ts);
for (auto&& m : table_muts) {
muts.emplace_back(std::move(m));
}
}
}
} // namespace cdc

View File

@@ -143,12 +143,12 @@ stream_state read_stream_state(int8_t val);
struct committed_stream_set {
db_clock::time_point ts;
std::vector<cdc::stream_id> streams;
utils::chunked_vector<cdc::stream_id> streams;
};
struct cdc_stream_diff {
std::vector<stream_id> closed_streams;
std::vector<stream_id> opened_streams;
utils::chunked_vector<stream_id> closed_streams;
utils::chunked_vector<stream_id> opened_streams;
};
using table_streams = std::map<api::timestamp_type, committed_stream_set>;
@@ -220,8 +220,11 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const locator::tablet_map&, api::timestamp_type);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const utils::chunked_vector<cdc::stream_id>&, api::timestamp_type);
utils::chunked_vector<mutation> make_drop_table_streams_mutations(table_id, api::timestamp_type ts);
future<mutation> get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts);
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
table_streams::const_iterator get_new_base_for_gc(const table_streams&, std::chrono::seconds ttl);
} // namespace cdc

View File

@@ -149,10 +149,13 @@ public:
future<> load_cdc_tablet_streams(std::optional<std::unordered_set<table_id>> changed_tables);
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> generate_tablet_resize_update(utils::chunked_vector<canonical_mutation>& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts);
future<utils::chunked_vector<mutation>> garbage_collect_cdc_streams_for_table(table_id table, std::optional<std::chrono::seconds> ttl, api::timestamp_type ts);
future<> garbage_collect_cdc_streams(utils::chunked_vector<canonical_mutation>& muts, api::timestamp_type ts);
private:
/* Retrieve the CDC generation which starts at the given timestamp (from a distributed table created for this purpose)
* and start using it for CDC log writes if it's not obsolete.

View File

@@ -54,7 +54,7 @@ cdc::stream_id get_stream(
}
static cdc::stream_id get_stream(
const std::vector<cdc::stream_id>& streams,
const utils::chunked_vector<cdc::stream_id>& streams,
dht::token tok) {
if (streams.empty()) {
on_internal_error(cdc_log, "get_stream: streams empty");
@@ -159,7 +159,7 @@ cdc::stream_id cdc::metadata::get_vnode_stream(api::timestamp_type ts, dht::toke
return ret;
}
const std::vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
const utils::chunked_vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
auto now = api::new_timestamp();
if (ts > now + get_generation_leeway().count()) {
throw exceptions::invalid_request_exception(seastar::format(
@@ -259,10 +259,10 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
return !it->second;
}
future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
const std::vector<cdc::stream_id>& prev_stream_set,
std::vector<cdc::stream_id> opened,
const std::vector<cdc::stream_id>& closed) {
future<utils::chunked_vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
utils::chunked_vector<cdc::stream_id> opened,
const utils::chunked_vector<cdc::stream_id>& closed) {
if (closed.size() == prev_stream_set.size()) {
// all previous streams are closed, so the next stream set is just the opened streams.
@@ -273,8 +273,8 @@ future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
// streams and removing the closed streams. we assume each stream set is
// sorted by token, and the result is sorted as well.
std::vector<cdc::stream_id> next_stream_set;
next_stream_set.reserve(prev_stream_set.size() + opened.size() - closed.size());
utils::chunked_vector<cdc::stream_id> next_stream_set;
co_await utils::reserve_gently(next_stream_set, prev_stream_set.size() + opened.size() - closed.size());
auto next_prev = prev_stream_set.begin();
auto next_closed = closed.begin();
@@ -306,6 +306,10 @@ void cdc::metadata::load_tablet_streams_map(table_id tid, table_streams new_tabl
_tablet_streams[tid] = make_lw_shared(std::move(new_table_map));
}
void cdc::metadata::append_tablet_streams_map(table_id tid, table_streams new_table_map) {
_tablet_streams[tid]->insert(std::make_move_iterator(new_table_map.begin()), std::make_move_iterator(new_table_map.end()));
}
void cdc::metadata::remove_tablet_streams_map(table_id tid) {
_tablet_streams.erase(tid);
}
@@ -314,8 +318,8 @@ std::vector<table_id> cdc::metadata::get_tables_with_cdc_tablet_streams() const
return _tablet_streams | std::views::keys | std::ranges::to<std::vector<table_id>>();
}
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const std::vector<stream_id>& before, const std::vector<stream_id>& after) {
std::vector<stream_id> closed, opened;
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const utils::chunked_vector<stream_id>& before, const utils::chunked_vector<stream_id>& after) {
utils::chunked_vector<stream_id> closed, opened;
auto before_it = before.begin();
auto after_it = after.begin();

View File

@@ -37,7 +37,9 @@ class metadata final {
using container_t = std::map<api::timestamp_type, std::optional<topology_description>>;
container_t _gens;
using table_streams_ptr = lw_shared_ptr<const table_streams>;
// per-table streams map for tables in tablets-based keyspaces.
// the streams map is shared with the virtual tables reader, hence we can only insert new entries to it, not erase.
using table_streams_ptr = lw_shared_ptr<table_streams>;
using tablet_streams_map = std::unordered_map<table_id, table_streams_ptr>;
tablet_streams_map _tablet_streams;
@@ -47,7 +49,7 @@ class metadata final {
container_t::const_iterator gen_used_at(api::timestamp_type ts) const;
const std::vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
const utils::chunked_vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
public:
/* Is a generation with the given timestamp already known or obsolete? It is obsolete if and only if
@@ -100,6 +102,7 @@ public:
bool prepare(db_clock::time_point ts);
void load_tablet_streams_map(table_id tid, table_streams new_table_map);
void append_tablet_streams_map(table_id tid, table_streams new_table_map);
void remove_tablet_streams_map(table_id tid);
const tablet_streams_map& get_all_tablet_streams() const {
@@ -108,14 +111,14 @@ public:
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
static future<std::vector<stream_id>> construct_next_stream_set(
const std::vector<cdc::stream_id>& prev_stream_set,
std::vector<cdc::stream_id> opened,
const std::vector<cdc::stream_id>& closed);
static future<utils::chunked_vector<stream_id>> construct_next_stream_set(
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
utils::chunked_vector<cdc::stream_id> opened,
const utils::chunked_vector<cdc::stream_id>& closed);
static future<cdc_stream_diff> generate_stream_diff(
const std::vector<stream_id>& before,
const std::vector<stream_id>& after);
const utils::chunked_vector<stream_id>& before,
const utils::chunked_vector<stream_id>& after);
};

View File

@@ -1506,7 +1506,7 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
co_return;
}
auto num_runs_for_compaction = [&, this] -> future<size_t> {
auto& cs = t.get_compaction_strategy();
auto cs = t.get_compaction_strategy();
auto desc = co_await cs.get_sstables_for_compaction(t, get_strategy_control());
co_return std::ranges::size(desc.sstables
| std::views::transform(std::mem_fn(&sstables::sstable::run_identifier))

View File

@@ -804,9 +804,9 @@ compaction_strategy_state compaction_strategy_state::make(const compaction_strat
case compaction_strategy_type::incremental:
return compaction_strategy_state(default_empty_state{});
case compaction_strategy_type::leveled:
return compaction_strategy_state(leveled_compaction_strategy_state{});
return compaction_strategy_state(seastar::make_shared<leveled_compaction_strategy_state>());
case compaction_strategy_type::time_window:
return compaction_strategy_state(time_window_compaction_strategy_state{});
return compaction_strategy_state(seastar::make_shared<time_window_compaction_strategy_state>());
default:
throw std::runtime_error("strategy not supported");
}

View File

@@ -18,7 +18,7 @@ namespace compaction {
class compaction_strategy_state {
public:
struct default_empty_state {};
using states_variant = std::variant<default_empty_state, leveled_compaction_strategy_state, time_window_compaction_strategy_state>;
using states_variant = std::variant<default_empty_state, leveled_compaction_strategy_state_ptr, time_window_compaction_strategy_state_ptr>;
private:
states_variant _state;
public:

View File

@@ -14,12 +14,12 @@
namespace compaction {
leveled_compaction_strategy_state& leveled_compaction_strategy::get_state(compaction_group_view& table_s) const {
return table_s.get_compaction_strategy_state().get<leveled_compaction_strategy_state>();
leveled_compaction_strategy_state_ptr leveled_compaction_strategy::get_state(compaction_group_view& table_s) const {
return table_s.get_compaction_strategy_state().get<leveled_compaction_strategy_state_ptr>();
}
future<compaction_descriptor> leveled_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
auto& state = get_state(table_s);
auto state = get_state(table_s);
auto candidates = co_await control.candidates(table_s);
// NOTE: leveled_manifest creation may be slightly expensive, so later on,
// we may want to store it in the strategy itself. However, the sstable
@@ -27,10 +27,10 @@ future<compaction_descriptor> leveled_compaction_strategy::get_sstables_for_comp
// sstable in it may be marked for deletion after compacted.
// Currently, we create a new manifest whenever it's time for compaction.
leveled_manifest manifest = leveled_manifest::create(table_s, candidates, _max_sstable_size_in_mb, _stcs_options);
if (!state.last_compacted_keys) {
generate_last_compacted_keys(state, manifest);
if (!state->last_compacted_keys) {
generate_last_compacted_keys(*state, manifest);
}
auto candidate = manifest.get_compaction_candidates(*state.last_compacted_keys, state.compaction_counter);
auto candidate = manifest.get_compaction_candidates(*state->last_compacted_keys, state->compaction_counter);
if (!candidate.sstables.empty()) {
auto main_set = co_await table_s.main_sstable_set();
@@ -78,12 +78,12 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(comp
}
void leveled_compaction_strategy::notify_completion(compaction_group_view& table_s, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
auto& state = get_state(table_s);
auto state = get_state(table_s);
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
if (removed.empty() || added.empty() || !state.last_compacted_keys) {
if (removed.empty() || added.empty() || !state->last_compacted_keys) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();
@@ -99,16 +99,16 @@ void leveled_compaction_strategy::notify_completion(compaction_group_view& table
}
target_level = std::max(target_level, int(candidate->get_sstable_level()));
}
state.last_compacted_keys.value().at(min_level) = last->get_last_decorated_key();
state->last_compacted_keys.value().at(min_level) = last->get_last_decorated_key();
for (int i = leveled_manifest::MAX_LEVELS - 1; i > 0; i--) {
state.compaction_counter[i]++;
state->compaction_counter[i]++;
}
state.compaction_counter[target_level] = 0;
state->compaction_counter[target_level] = 0;
if (leveled_manifest::logger.level() == logging::log_level::debug) {
for (auto j = 0U; j < state.compaction_counter.size(); j++) {
leveled_manifest::logger.debug("CompactionCounter: {}: {}", j, state.compaction_counter[j]);
for (auto j = 0U; j < state->compaction_counter.size(); j++) {
leveled_manifest::logger.debug("CompactionCounter: {}: {}", j, state->compaction_counter[j]);
}
}
}

View File

@@ -36,6 +36,8 @@ struct leveled_compaction_strategy_state {
leveled_compaction_strategy_state();
};
using leveled_compaction_strategy_state_ptr = seastar::shared_ptr<leveled_compaction_strategy_state>;
class leveled_compaction_strategy : public compaction_strategy_impl {
static constexpr int32_t DEFAULT_MAX_SSTABLE_SIZE_IN_MB = 160;
static constexpr auto SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
@@ -45,7 +47,7 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
private:
int32_t calculate_max_sstable_size_in_mb(std::optional<sstring> option_value) const;
leveled_compaction_strategy_state& get_state(compaction_group_view& table_s) const;
leveled_compaction_strategy_state_ptr get_state(compaction_group_view& table_s) const;
public:
static unsigned ideal_level_for_input(const std::vector<sstables::shared_sstable>& input, uint64_t max_sstable_size);
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);

View File

@@ -13,6 +13,7 @@
#include "sstables/sstables.hh"
#include "sstables/sstable_set_impl.hh"
#include "compaction_strategy_state.hh"
#include "utils/error_injection.hh"
#include <ranges>
@@ -22,8 +23,8 @@ extern logging::logger clogger;
using timestamp_type = api::timestamp_type;
time_window_compaction_strategy_state& time_window_compaction_strategy::get_state(compaction_group_view& table_s) const {
return table_s.get_compaction_strategy_state().get<time_window_compaction_strategy_state>();
time_window_compaction_strategy_state_ptr time_window_compaction_strategy::get_state(compaction_group_view& table_s) const {
return table_s.get_compaction_strategy_state().get<time_window_compaction_strategy_state_ptr>();
}
const std::unordered_map<sstring, std::chrono::seconds> time_window_compaction_strategy_options::valid_window_units = {
@@ -335,7 +336,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<sstables::shared_
future<compaction_descriptor>
time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
auto& state = get_state(table_s);
auto state = get_state(table_s);
auto compaction_time = gc_clock::now();
auto candidates = co_await control.candidates(table_s);
@@ -344,7 +345,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
}
auto now = db_clock::now();
if (now - state.last_expired_check > _options.expired_sstable_check_frequency) {
if (now - state->last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("[{}] TWCS expired check sufficiently far in the past, checking for fully expired SSTables", fmt::ptr(this));
// Find fully expired SSTables. Those will be included no matter what.
@@ -356,12 +357,14 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
// Keep checking for fully_expired_sstables until we don't find
// any among the candidates, meaning they are either already compacted
// or registered for compaction.
state.last_expired_check = now;
state->last_expired_check = now;
} else {
clogger.debug("[{}] TWCS skipping check for fully expired SSTables", fmt::ptr(this));
}
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
co_await utils::get_local_injector().inject("twcs_get_sstables_for_compaction", utils::wait_for_message(30s));
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time, *state);
clogger.debug("[{}] Going to compact {} non-expired sstables", fmt::ptr(this), compaction_candidates.size());
co_return compaction_descriptor(std::move(compaction_candidates));
}
@@ -384,8 +387,8 @@ time_window_compaction_strategy::compaction_mode(const time_window_compaction_st
std::vector<sstables::shared_sstable>
time_window_compaction_strategy::get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control,
std::vector<sstables::shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time) {
auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables);
std::vector<sstables::shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time, time_window_compaction_strategy_state& state) {
auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables, state);
if (!most_interesting.empty()) {
return most_interesting;
@@ -410,14 +413,14 @@ time_window_compaction_strategy::get_next_non_expired_sstables(compaction_group_
}
std::vector<sstables::shared_sstable>
time_window_compaction_strategy::get_compaction_candidates(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidate_sstables) {
auto& state = get_state(table_s);
time_window_compaction_strategy::get_compaction_candidates(compaction_group_view& table_s, strategy_control& control,
std::vector<sstables::shared_sstable> candidate_sstables, time_window_compaction_strategy_state& state) {
auto [buckets, max_timestamp] = get_buckets(std::move(candidate_sstables), _options);
// Update the highest window seen, if necessary
state.highest_window_seen = std::max(state.highest_window_seen, max_timestamp);
return newest_bucket(table_s, control, std::move(buckets), table_s.min_compaction_threshold(), table_s.schema()->max_compaction_threshold(),
state.highest_window_seen);
state.highest_window_seen, state);
}
timestamp_type
@@ -465,8 +468,7 @@ namespace compaction {
std::vector<sstables::shared_sstable>
time_window_compaction_strategy::newest_bucket(compaction_group_view& table_s, strategy_control& control, std::map<timestamp_type, std::vector<sstables::shared_sstable>> buckets,
int min_threshold, int max_threshold, timestamp_type now) {
auto& state = get_state(table_s);
int min_threshold, int max_threshold, timestamp_type now, time_window_compaction_strategy_state& state) {
clogger.debug("time_window_compaction_strategy::newest_bucket:\n now {}\n{}", now, buckets);
for (auto&& [key, bucket] : buckets | std::views::reverse) {
@@ -517,7 +519,7 @@ time_window_compaction_strategy::trim_to_threshold(std::vector<sstables::shared_
}
future<int64_t> time_window_compaction_strategy::estimated_pending_compactions(compaction_group_view& table_s) const {
auto& state = get_state(table_s);
auto state = get_state(table_s);
auto min_threshold = table_s.min_compaction_threshold();
auto max_threshold = table_s.schema()->max_compaction_threshold();
auto main_set = co_await table_s.main_sstable_set();
@@ -526,7 +528,7 @@ future<int64_t> time_window_compaction_strategy::estimated_pending_compactions(c
int64_t n = 0;
for (auto& [bucket_key, bucket] : buckets) {
switch (compaction_mode(state, bucket, bucket_key, max_timestamp, min_threshold)) {
switch (compaction_mode(*state, bucket, bucket_key, max_timestamp, min_threshold)) {
case bucket_compaction_mode::size_tiered:
n += size_tiered_compaction_strategy::estimated_pending_compactions(bucket, min_threshold, max_threshold, _stcs_options);
break;

View File

@@ -67,6 +67,8 @@ struct time_window_compaction_strategy_state {
std::unordered_set<api::timestamp_type> recent_active_windows;
};
using time_window_compaction_strategy_state_ptr = seastar::shared_ptr<time_window_compaction_strategy_state>;
class time_window_compaction_strategy : public compaction_strategy_impl {
time_window_compaction_strategy_options _options;
size_tiered_compaction_strategy_options _stcs_options;
@@ -87,7 +89,7 @@ public:
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
private:
time_window_compaction_strategy_state& get_state(compaction_group_view& table_s) const;
time_window_compaction_strategy_state_ptr get_state(compaction_group_view& table_s) const;
static api::timestamp_type
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
@@ -110,9 +112,11 @@ private:
compaction_mode(const time_window_compaction_strategy_state&, const bucket_t& bucket, api::timestamp_type bucket_key, api::timestamp_type now, size_t min_threshold) const;
std::vector<sstables::shared_sstable>
get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time);
get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> non_expiring_sstables,
gc_clock::time_point compaction_time, time_window_compaction_strategy_state& state);
std::vector<sstables::shared_sstable> get_compaction_candidates(compaction_group_view& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidate_sstables);
std::vector<sstables::shared_sstable> get_compaction_candidates(compaction_group_view& table_s, strategy_control& control,
std::vector<sstables::shared_sstable> candidate_sstables, time_window_compaction_strategy_state& state);
public:
// Find the lowest timestamp for window of given size
static api::timestamp_type
@@ -126,7 +130,7 @@ public:
std::vector<sstables::shared_sstable>
newest_bucket(compaction_group_view& table_s, strategy_control& control, std::map<api::timestamp_type, std::vector<sstables::shared_sstable>> buckets,
int min_threshold, int max_threshold, api::timestamp_type now);
int min_threshold, int max_threshold, api::timestamp_type now, time_window_compaction_strategy_state& state);
static std::vector<sstables::shared_sstable>
trim_to_threshold(std::vector<sstables::shared_sstable> bucket, int max_threshold);

View File

@@ -1078,7 +1078,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/s3/client.cc',
'utils/s3/retryable_http_client.cc',
'utils/s3/retry_strategy.cc',
'utils/s3/s3_retry_strategy.cc',
'utils/s3/credentials_providers/aws_credentials_provider.cc',
'utils/s3/credentials_providers/environment_aws_credentials_provider.cc',
'utils/s3/credentials_providers/instance_profile_credentials_provider.cc',

View File

@@ -1224,7 +1224,7 @@ listPermissionsStatement returns [std::unique_ptr<list_permissions_statement> st
;
permission returns [auth::permission perm = auth::permission{}]
: p=(K_CREATE | K_ALTER | K_DROP | K_SELECT | K_MODIFY | K_AUTHORIZE | K_DESCRIBE | K_EXECUTE)
: p=(K_CREATE | K_ALTER | K_DROP | K_SELECT | K_MODIFY | K_AUTHORIZE | K_DESCRIBE | K_EXECUTE | K_VECTOR_SEARCH_INDEXING)
{ $perm = auth::permissions::from_string($p.text); }
;
@@ -2398,6 +2398,8 @@ K_EXECUTE: E X E C U T E;
K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;
K_VECTOR_SEARCH_INDEXING: V E C T O R '_' S E A R C H '_' I N D E X I N G;
// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');

View File

@@ -136,9 +136,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
}
compression_parameters cp(*compression_options);
cp.validate(
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
cp.validate(compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)));
}
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);

View File

@@ -10,6 +10,8 @@
#include <seastar/core/coroutine.hh>
#include "create_index_statement.hh"
#include "db/config.hh"
#include "db/view/view.hh"
#include "exceptions/exceptions.hh"
#include "prepared_statement.hh"
#include "types/types.hh"
@@ -92,9 +94,13 @@ std::vector<::shared_ptr<index_target>> create_index_statement::validate_while_e
throw exceptions::invalid_request_exception(format("index names shouldn't be more than {:d} characters long (got \"{}\")", schema::NAME_LENGTH, _index_name.c_str()));
}
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
throw exceptions::invalid_request_exception(format("Secondary indexes are not supported on base tables with tablets (keyspace '{}')", keyspace()));
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
validate_for_local_index(*schema);
std::vector<::shared_ptr<index_target>> targets;

View File

@@ -113,8 +113,7 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
if (rs->uses_tablets()) {
warnings.push_back(
"Tables in this keyspace will be replicated using Tablets "
"and will not support Materialized Views, Secondary Indexes and counters features. "
"To use Materialized Views, Secondary Indexes or counters, drop this keyspace and re-create it "
"and will not support counters features. To use counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
if (ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");

View File

@@ -152,9 +152,13 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
schema_ptr schema = validation::validate_column_family(db, _base_name.get_keyspace(), _base_name.get_column_family());
if (!db.features().views_with_tablets && db.find_keyspace(keyspace()).get_replication_strategy().uses_tablets()) {
throw exceptions::invalid_request_exception(format("Materialized views are not supported on base tables with tablets"));
try {
db::view::validate_view_keyspace(db, keyspace());
} catch (const std::exception& e) {
// The type of the thrown exception is not specified, so we need to wrap it here.
throw exceptions::invalid_request_exception(e.what());
}
if (schema->is_counter()) {
throw exceptions::invalid_request_exception(format("Materialized views are not supported on counter tables"));
}

View File

@@ -21,6 +21,7 @@
#include "exceptions/exceptions.hh"
#include <seastar/core/future.hh>
#include <seastar/coroutine/exception.hh>
#include "index/vector_index.hh"
#include "service/broadcast_tables/experimental/lang.hh"
#include "service/qos/qos_common.hh"
#include "vector_search/vector_store_client.hh"
@@ -245,7 +246,8 @@ future<> select_statement::check_access(query_processor& qp, const service::clie
auto& cf_name = s->is_view()
? s->view_info()->base_name()
: (cdc ? cdc->cf_name() : column_family());
co_await state.has_column_family_access(keyspace(), cf_name, auth::permission::SELECT);
bool is_vector_indexed = secondary_index::vector_index::has_vector_index(*_schema);
co_await state.has_column_family_access(keyspace(), cf_name, auth::permission::SELECT, auth::command_desc::type::OTHER, is_vector_indexed);
} catch (const data_dictionary::no_such_column_family& e) {
// Will be validated afterwards.
co_return;

View File

@@ -1318,15 +1318,15 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, sstable_format(this, "sstable_format", liveness::LiveUpdate, value_status::Used, "me", "Default sstable file format", {"md", "me", "ms"})
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{},
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
"Server-global user table compression options. If enabled, all user tables"
"will be compressed using the provided options, unless overridden"
"by compression options in the table schema. The available options are:\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor (default), LZ4WithDictsCompressor, SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
"* compression_level: (Default: 3) Compression level for ZstdCompressor and ZstdWithDictsCompressor. Higher levels provide better compression ratios at the cost of speed. Allowed values are integers between 1 and 22.")
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Deprecated, true,
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
@@ -1426,7 +1426,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.")
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.")
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,
@@ -1756,7 +1757,7 @@ std::map<sstring, db::experimental_features_t::feature> db::experimental_feature
{"broadcast-tables", feature::BROADCAST_TABLES},
{"keyspace-storage-options", feature::KEYSPACE_STORAGE_OPTIONS},
{"tablets", feature::UNUSED},
{"views-with-tablets", feature::VIEWS_WITH_TABLETS}
{"views-with-tablets", feature::UNUSED}
};
}

View File

@@ -136,8 +136,7 @@ struct experimental_features_t {
UDF,
ALTERNATOR_STREAMS,
BROADCAST_TABLES,
KEYSPACE_STORAGE_OPTIONS,
VIEWS_WITH_TABLETS
KEYSPACE_STORAGE_OPTIONS
};
static std::map<sstring, feature> map(); // See enum_option.
static std::vector<enum_option<experimental_features_t>> all();
@@ -478,6 +477,7 @@ public:
named_value<uint16_t> alternator_https_port;
named_value<sstring> alternator_address;
named_value<bool> alternator_enforce_authorization;
named_value<bool> alternator_warn_authorization;
named_value<sstring> alternator_write_isolation;
named_value<uint32_t> alternator_streams_time_window_s;
named_value<uint32_t> alternator_timeout_in_ms;

View File

@@ -2463,14 +2463,14 @@ future<bool> system_keyspace::cdc_is_rewritten() {
}
future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) {
noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) {
static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE);
static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE);
struct cur_t {
table_id tid;
db_clock::time_point ts;
std::vector<cdc::stream_id> streams;
utils::chunked_vector<cdc::stream_id> streams;
};
std::optional<cur_t> cur;
@@ -2487,7 +2487,7 @@ future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
if (cur) {
co_await f(cur->tid, cur->ts, std::move(cur->streams));
}
cur = { tid, ts, std::vector<cdc::stream_id>() };
cur = { tid, ts, utils::chunked_vector<cdc::stream_id>() };
}
cur->streams.push_back(std::move(stream_id));
@@ -2499,9 +2499,10 @@ future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
}
}
future<> system_keyspace::read_cdc_streams_history(table_id table,
future<> system_keyspace::read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from,
noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f) {
static const sstring query = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_HISTORY);
static const sstring query_all = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_HISTORY);
static const sstring query_from = format("SELECT table_id, timestamp, stream_state, stream_id FROM {}.{} WHERE table_id = ? AND timestamp > ?", NAME, CDC_STREAMS_HISTORY);
struct cur_t {
table_id tid;
@@ -2510,7 +2511,11 @@ future<> system_keyspace::read_cdc_streams_history(table_id table,
};
std::optional<cur_t> cur;
co_await _qp.query_internal(query, db::consistency_level::ONE, {table.uuid()}, 1000, [&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
co_await _qp.query_internal(from ? query_from : query_all,
db::consistency_level::ONE,
from ? data_value_list{table.uuid(), *from} : data_value_list{table.uuid()},
1000,
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
auto tid = table_id(row.get_as<utils::UUID>("table_id"));
auto ts = row.get_as<db_clock::time_point>("timestamp");
auto stream_state = cdc::read_stream_state(row.get_as<int8_t>("stream_state"));

View File

@@ -601,8 +601,8 @@ public:
future<bool> cdc_is_rewritten();
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f);
future<> read_cdc_streams_history(table_id table, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f);
future<> read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
// Load Raft Group 0 id from scylla.local
future<utils::UUID> get_raft_group0_id();

View File

@@ -26,6 +26,7 @@
#include <seastar/coroutine/maybe_yield.hh>
#include <flat_map>
#include "db/config.hh"
#include "db/view/base_info.hh"
#include "db/view/view_build_status.hh"
#include "db/view/view_consumer.hh"
@@ -3305,15 +3306,6 @@ public:
_step.base->schema()->cf_name(), _step.current_token(), view_names);
}
if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) {
if (_step.current_key.key().is_empty()) {
// consumer got end-of-stream without consuming a single partition
vlogger.debug("Reader didn't produce anything, marking views as built");
while (!_step.build_status.empty()) {
_built_views.views.push_back(std::move(_step.build_status.back()));
_step.build_status.pop_back();
}
}
// before going back to the minimum token, advance current_key to the end
// and check for built views in that range.
_step.current_key = { _step.prange.end().value_or(dht::ring_position::max()).value().token(), partition_key::make_empty()};
@@ -3332,6 +3324,7 @@ public:
// Called in the context of a seastar::thread.
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
inject_failure("dont_start_build_step");
gc_clock::time_point now = gc_clock::now();
auto compaction_state = make_lw_shared<compact_for_query_state>(
*step.reader.schema(),
@@ -3365,6 +3358,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
vlogger.warn("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
}).get();
utils::get_local_injector().inject("delay_finishing_build_step", utils::wait_for_message(60s)).get();
}
future<> view_builder::mark_as_built(view_ptr view) {
@@ -3715,5 +3709,22 @@ sstring build_status_to_sstring(build_status status) {
on_internal_error(vlogger, fmt::format("Unknown view build status: {}", (int)status));
}
void validate_view_keyspace(const data_dictionary::database& db, std::string_view keyspace_name) {
const bool tablet_views_enabled = db.features().views_with_tablets;
// Note: if the configuration option `rf_rack_valid_keyspaces` is enabled, we can be
// sure that all tablet-based keyspaces are RF-rack-valid. We check that
// at start-up and then we don't allow for creating RF-rack-invalid keyspaces.
const bool rf_rack_valid_keyspaces = db.get_config().rf_rack_valid_keyspaces();
const bool required_config = tablet_views_enabled && rf_rack_valid_keyspaces;
const bool uses_tablets = db.find_keyspace(keyspace_name).get_replication_strategy().uses_tablets();
if (!required_config && uses_tablets) {
throw std::logic_error("Materialized views and secondary indexes are not supported on base tables with tablets. "
"To be able to use them, enable the configuration option `rf_rack_valid_keyspaces` and make sure "
"that the cluster feature `VIEWS_WITH_TABLETS` is enabled.");
}
}
} // namespace view
} // namespace db

View File

@@ -309,6 +309,18 @@ endpoints_to_update get_view_natural_endpoint(
bool use_tablets_basic_rack_aware_view_pairing,
replica::cf_stats& cf_stats);
/// Verify that the provided keyspace is eligible for storing materialized views.
///
/// Result:
/// * If the keyspace is eligible, no effect.
/// * If the keyspace is not eligible, an exception is thrown. Its type is not specified,
/// and the user of this function cannot make any assumption about it. The carried exception
/// message will be worded in a way that can be directly passed on to the end user.
///
/// Preconditions:
/// * The provided `keyspace_name` must correspond to an existing keyspace.
void validate_view_keyspace(const data_dictionary::database&, std::string_view keyspace_name);
}
}

View File

@@ -127,8 +127,9 @@ view_building_worker::view_building_worker(replica::database& db, db::system_key
init_messaging_service();
}
void view_building_worker::start_background_fibers() {
future<> view_building_worker::init() {
SCYLLA_ASSERT(this_shard_id() == 0);
co_await discover_existing_staging_sstables();
_staging_sstables_registrator = run_staging_sstables_registrator();
_view_building_state_observer = run_view_building_state_observer();
_mnotifier.register_listener(this);
@@ -195,8 +196,6 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
}
future<> view_building_worker::run_staging_sstables_registrator() {
co_await discover_existing_staging_sstables();
while (!_as.abort_requested()) {
try {
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
@@ -310,7 +309,10 @@ std::unordered_map<table_id, std::vector<view_building_worker::staging_sstable_t
return;
}
auto& tablet_map = _db.get_token_metadata().tablets().get_tablet_map(table_id);
// scylladb/scylladb#26403: Make sure to access the tablets map via the effective replication map of the table object.
// The token metadata object pointed to by the database (`_db.get_token_metadata()`) may not contain
// the tablets map of the currently processed table yet. After #24414 is fixed, this should not matter anymore.
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
auto sstables = table->get_sstables();
for (auto sstable: *sstables) {
if (!sstable->requires_view_building()) {
@@ -340,6 +342,7 @@ future<> view_building_worker::run_view_building_state_observer() {
while (!_as.abort_requested()) {
bool sleep = false;
_state.some_batch_finished = false;
try {
vbw_logger.trace("view_building_state_observer() iteration");
auto read_apply_mutex_holder = co_await _group0_client.hold_read_apply_mutex(_as);
@@ -349,7 +352,12 @@ future<> view_building_worker::run_view_building_state_observer() {
_as.check();
read_apply_mutex_holder.return_all();
co_await _vb_state_machine.event.wait();
// A batch could finished its work while the worker was
// updating the state. In that case we should do another iteration.
if (!_state.some_batch_finished) {
co_await _vb_state_machine.event.wait();
}
} catch (abort_requested_exception&) {
} catch (broken_condition_variable&) {
} catch (...) {
@@ -657,6 +665,7 @@ future<> view_building_worker::local_state::clear_state() {
finished_tasks.clear();
aborted_tasks.clear();
state_updated_cv.broadcast();
some_batch_finished = false;
vbw_logger.debug("View building worker state was cleared.");
}
@@ -676,6 +685,7 @@ void view_building_worker::batch::start() {
return do_work();
}).finally([this] () {
state = batch_state::finished;
_vbw.local()._state.some_batch_finished = true;
_vbw.local()._vb_state_machine.event.broadcast();
});
}

View File

@@ -111,6 +111,7 @@ class view_building_worker : public seastar::peering_sharded_service<view_buildi
std::unordered_set<utils::UUID> finished_tasks;
std::unordered_set<utils::UUID> aborted_tasks;
bool some_batch_finished = false;
condition_variable state_updated_cv;
// Clears completed/aborted tasks and creates batches (without starting them) for started tasks.
@@ -166,7 +167,7 @@ public:
view_building_worker(replica::database& db, db::system_keyspace& sys_ks, service::migration_notifier& mnotifier,
service::raft_group0_client& group0_client, view_update_generator& vug, netw::messaging_service& ms,
view_building_state_machine& vbsm);
void start_background_fibers();
future<> init();
future<> register_staging_sstable_tasks(std::vector<sstables::shared_sstable> ssts, table_id table_id);

View File

@@ -1278,7 +1278,7 @@ public:
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
co_await emit_stream_set(ts, cdc::stream_state::current, current);
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);

View File

@@ -204,7 +204,7 @@ ring_position_range_sharder::next(const schema& s) {
return ring_position_range_and_shard{std::move(_range), shard};
}
ring_position_range_vector_sharder::ring_position_range_vector_sharder(const sharder& sharder, dht::partition_range_vector ranges)
ring_position_range_vector_sharder::ring_position_range_vector_sharder(const sharder& sharder, utils::chunked_vector<dht::partition_range> ranges)
: _ranges(std::move(ranges))
, _sharder(sharder)
, _current_range(_ranges.begin()) {

View File

@@ -11,6 +11,7 @@
#include "dht/ring_position.hh"
#include "dht/token-sharding.hh"
#include "utils/interval.hh"
#include "utils/chunked_vector.hh"
#include <vector>
@@ -89,7 +90,7 @@ struct ring_position_range_and_shard_and_element : ring_position_range_and_shard
//
// During migration uses a view on shard routing for reads.
class ring_position_range_vector_sharder {
using vec_type = dht::partition_range_vector;
using vec_type = utils::chunked_vector<dht::partition_range>;
vec_type _ranges;
const sharder& _sharder;
vec_type::iterator _current_range;
@@ -104,7 +105,7 @@ public:
// Initializes the `ring_position_range_vector_sharder` with the ranges to be processesd.
// Input ranges should be non-overlapping (although nothing bad will happen if they do
// overlap).
ring_position_range_vector_sharder(const sharder& sharder, dht::partition_range_vector ranges);
ring_position_range_vector_sharder(const sharder& sharder, utils::chunked_vector<dht::partition_range> ranges);
// Fetches the next range-shard mapping. When the input range is exhausted, std::nullopt is
// returned. Within an input range, results are contiguous and non-overlapping (but since input
// ranges usually are discontiguous, overall the results are not contiguous). Together, the results

View File

@@ -131,6 +131,28 @@ def configure_iotune_open_fd_limit(shards_count):
logging.error(f"Required FDs count: {precalculated_fds_count}, default limit: {fd_limits}!")
sys.exit(1)
def force_random_request_size_of_4k():
"""
It is a known bug that on i4i, i7i, i8g, i8ge instances, the disk controller reports the wrong
physical sector size as 512bytes, but the actual physical sector size is 4096bytes. This function
helps us work around that issue until AWS manages to get a fix for it. It returns 4096 if it
detect it's running on one of the affected instance types, otherwise it returns None and IOTune
will use the physical sector size reported by the disk.
"""
path="/sys/devices/virtual/dmi/id/product_name"
try:
with open(path, "r") as f:
instance_type = f.read().strip()
except FileNotFoundError:
logging.warning(f"Couldn't find {path}. Falling back to IOTune using the physical sector size reported by disk.")
return
prefixes = ["i7i", "i4i", "i8g", "i8ge"]
if any(instance_type.startswith(p) for p in prefixes):
return 4096
def run_iotune():
if "SCYLLA_CONF" in os.environ:
conf_dir = os.environ["SCYLLA_CONF"]
@@ -173,6 +195,8 @@ def run_iotune():
configure_iotune_open_fd_limit(cpudata.nr_shards())
if (reqsize := force_random_request_size_of_4k()):
iotune_args += ["--random-write-io-buffer-size", f"{reqsize}"]
try:
subprocess.check_call([bindir() + "/iotune",
"--format", "envfile",

View File

@@ -17,6 +17,7 @@ import stat
import logging
import pyudev
import psutil
import platform
from pathlib import Path
from scylla_util import *
from subprocess import run, SubprocessError
@@ -102,6 +103,21 @@ def is_selinux_enabled():
return True
return False
def is_kernel_version_at_least(major, minor):
"""Check if the Linux kernel version is at least major.minor"""
try:
kernel_version = platform.release()
# Extract major.minor from version string like "5.15.0-56-generic"
version_parts = kernel_version.split('.')
if len(version_parts) >= 2:
kernel_major = int(version_parts[0])
kernel_minor = int(version_parts[1])
return (kernel_major, kernel_minor) >= (major, minor)
except (ValueError, IndexError):
# If we can't parse the version, assume older kernel for safety
pass
return False
if __name__ == '__main__':
if os.getuid() > 0:
print('Requires root permission.')
@@ -231,8 +247,17 @@ if __name__ == '__main__':
# see https://git.kernel.org/pub/scm/fs/xfs/xfsprogs-dev.git/tree/mkfs/xfs_mkfs.c .
# and it also cannot be smaller than the sector size.
block_size = max(1024, sector_size)
run('udevadm settle', shell=True, check=True)
run(f'mkfs.xfs -b size={block_size} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
# On Linux 5.12+, sub-block overwrites are supported well, so keep the default block
# size, which will play better with the SSD.
if is_kernel_version_at_least(5, 12):
block_size_opt = ""
else:
block_size_opt = f"-b size={block_size}"
run(f'mkfs.xfs {block_size_opt} {fsdev} -K -m rmapbt=0 -m reflink=0', shell=True, check=True)
run('udevadm settle', shell=True, check=True)
if is_debian_variant():

View File

@@ -1,16 +1,25 @@
{
"Linux Distributions": {
"Ubuntu": ["22.04", "24.04"],
"Debian": ["11"],
"Debian": ["11", "12"],
"Rocky / CentOS / RHEL": ["8", "9", "10"],
"Amazon Linux": ["2023"]
},
"ScyllaDB Versions": [
{
"version": "ScyllaDB 2025.4",
"supported_OS": {
"Ubuntu": ["22.04", "24.04"],
"Debian": ["11", "12"],
"Rocky / CentOS / RHEL": ["8", "9", "10"],
"Amazon Linux": ["2023"]
}
},
{
"version": "ScyllaDB 2025.3",
"supported_OS": {
"Ubuntu": ["22.04", "24.04"],
"Debian": ["11"],
"Debian": ["11", "12"],
"Rocky / CentOS / RHEL": ["8", "9", "10"],
"Amazon Linux": ["2023"]
}
@@ -19,7 +28,7 @@
"version": "ScyllaDB 2025.2",
"supported_OS": {
"Ubuntu": ["22.04", "24.04"],
"Debian": ["11"],
"Debian": ["11", "12"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": ["2023"]
}
@@ -28,7 +37,7 @@
"version": "ScyllaDB 2025.1",
"supported_OS": {
"Ubuntu": ["22.04", "24.04"],
"Debian": ["11"],
"Debian": ["11", "12"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": ["2023"]
}

View File

@@ -109,6 +109,32 @@ to do what, configure the following in ScyllaDB's configuration:
alternator_enforce_authorization: true
```
Note: switching `alternator_enforce_authorization` from `false` to `true`
before the client application has the proper secret keys and permission
tables set up will cause the application's requests to immediately fail.
Therefore, we recommend to begin by keeping `alternator_enforce_authorization`
set to `false` and setting `alternator_warn_authorization` to `true`.
This setting will continue to allow all requests without failing on
authentication or authorization errors - but will _count_ would-be
authentication and authorization failures in the two metrics:
* `scylla_alternator_authentication_failures`
* `scylla_alternator_authorization_failures`
`alternator_warn_authorization=true` also generates a WARN-level log message
on each authentication or authorization failure. These log messages each
includes the string `alternator_enforce_authorization=true`, and information
that can help pinpoint the source of the error - such as the username
involved in the attempt, and the address of the client sending the request.
When you see that both metrics are not increasing (or, alternatively, that no
more log messages appear), you can be sure that the application is properly
set up and can finally set `alternator_enforce_authorization` to `true`.
You can leave `alternator_warn_authorization` set or unset, depending on
whether or not you want to see log messages when requests fail on
authentication/authorization (in any case, the metric counts these failures,
and the client will also get the error).
Alternator implements the same [signature protocol](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html)
as DynamoDB and the rest of AWS. Clients use, as usual, an access key ID and
a secret access key to prove their identity and the authenticity of their

View File

@@ -202,12 +202,9 @@ enabled. If you plan to use any of the features listed below, CREATE your keyspa
:ref:`with tablets disabled <tablets-enable-tablets>`.
* Counters
* Materialized Views (MV) ``*``
* Secondary indexes (SI, as it depends on MV) ``*``
``*`` You can enable experimental support for MV and SI using
the ``--experimental-features=views-with-tablets`` configuration option.
See :ref:`Views with tablets <admin-views-with-tablets>` for details.
To enable materialized views and secondary indexes for tablet keyspaces, use
the `--rf-rack-valid-keyspaces` See :ref:`Views with tablets <admin-views-with-tablets>` for details.
Resharding in keyspaces with tablets enabled has the following limitations:

View File

@@ -263,3 +263,16 @@ To list all available CDC streams for a tablets-based keyspace:
...
Query all streams to read the entire CDC log.
Garbage collection of CDC streams metadata
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
For tablets-based keyspaces, Scylla periodically performs garbage collection of old CDC streams metadata in the ``system.cdc_timestamps`` and ``system.cdc_streams`` tables.
This process removes information about streams that are no longer needed, helping to prevent unbounded growth of the metadata tables.
The garbage collection process runs periodically in the background and examines streams that have been closed.
It removes information about a stream if the stream's close timestamp is older than the configured TTL of the CDC table.
Since the stream has been closed for longer than the TTL, this means that all rows in this stream have also exceeded their TTL and expired, unless the table's TTL was altered to a smaller value after some rows have been written.
.. warning::
When altering the TTL of a CDC table to a smaller value, you can lose information about streams that still contain live rows. Make sure to read all the information you need from the ``system.cdc_timestamps`` and ``system.cdc_streams`` tables before performing such alterations.

View File

@@ -14,6 +14,7 @@ See :doc:`OS Support by Platform and Version </getting-started/os-support/>`.
Install ScyllaDB with Web Installer
---------------------------------------
To install ScyllaDB with Web Installer, run:
.. code:: console
@@ -27,7 +28,13 @@ You can run the command with the ``-h`` or ``--help`` flag to print information
Installing a Non-default Version
---------------------------------------
You can install a version other than the default.
You can install a version other than the default. To get the list of supported
release versions, run:
.. code:: console
curl -sSf get.scylladb.com/server | sudo bash -s -- --list-active-releases
Versions 2025.1 and Later
==============================

View File

@@ -341,17 +341,13 @@ credentials and endpoint.
Views with Tablets
------------------
By default, Materialized Views (MV) and Secondary Indexes (SI)
are disabled in keyspaces that use tablets.
Support for MV and SI with tablets is experimental and must be explicitly
enabled in the ``scylla.yaml`` configuration file by specifying
the ``views-with-tablets`` option:
Materialized Views (MV) and Secondary Indexes (SI) are enabled in keyspaces that use tablets
only when :term:`RF-rack-valid keyspaces <RF-rack-valid keyspace>` are enforced. That can be
done in the ``scylla.yaml`` configuration file by specifying
.. code-block:: yaml
experimental_features:
- views-with-tablets
rf_rack_valid_keyspaces: true
Monitoring

View File

@@ -53,7 +53,7 @@ ScyllaDB nodetool cluster repair command supports the following options:
nodetool cluster repair --tablet-tokens 1,10474535988
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'regular', or 'full'. 'regular': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to regular.
- ``--incremental-mode`` specifies the incremental repair mode. Can be 'disabled', 'incremental', or 'full'. 'incremental': The incremental repair logic is enabled. Unrepaired sstables will be included for repair. Repaired sstables will be skipped. The incremental repair states will be updated after repair. 'full': The incremental repair logic is enabled. Both repaired and unrepaired sstables will be included for repair. The incremental repair states will be updated after repair. 'disabled': The incremental repair logic is disabled completely. The incremental repair states, e.g., repaired_at in sstables and sstables_repaired_at in the system.tablets table, will not be updated after repair. When the option is not provided, it defaults to incremental.
For example:

View File

@@ -29,16 +29,27 @@ Load and Stream
.. code::
nodetool refresh <my_keyspace> <my_table> [--load-and-stream | -las] [--scope <scope>]
nodetool refresh <my_keyspace> <my_table> [(--load-and-stream | -las) [[(--primary-replica-only | -pro)] | [--scope <scope>]]]
The Load and Stream feature extends nodetool refresh.
The ``--load-and-stream`` option loads arbitrary sstables into the cluster by reading the sstable data and streaming each partition to the replica(s) that owns it. In addition, the ``--scope`` and ``--primary-replica-only`` options are applied to filter the set of target replicas for each partition. For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. One can copy the sstables from the old cluster to any of the new nodes and trigger refresh with load and stream.
The Load and Stream feature extends nodetool refresh. The new ``-las`` option loads arbitrary sstables that do not belong to a node into the cluster. It loads the sstables from the disk and calculates the data's owning nodes, and streams automatically.
For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. We can copy the sstables from the old cluster to any of the new nodes and trigger the load and stream process.
Load and Stream make restores and migrations much easier:
* You can place sstable from every node to every node
* No need to run nodetool cleanup to remove unused data
With --primary-replica-only (or -pro) option, only the primary replica of each partition in an sstable will be used as the target.
--primary-replica-only must be applied together with --load-and-stream.
--primary-replica-only cannot be used with --scope, they are mutually exclusive.
--primary-replica-only requires repair to be run after the load and stream operation is completed.
Scope
-----

View File

@@ -38,14 +38,14 @@ Manual Dictionary Training
You can manually trigger dictionary training using the REST API::
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&table=mytable"
curl -X POST "http://node-address:10000/storage_service/retrain_dict?keyspace=mykeyspace&cf=mytable"
Estimating Compression Ratios
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To choose the best compression configuration, you can estimate compression ratios using the REST API::
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&table=mytable"
curl -X GET "http://node-address:10000/storage_service/estimate_compression_ratios?keyspace=mykeyspace&cf=mytable"
This will return a report with estimated compression ratios for various combinations of compression
parameters (algorithm, chunk size, zstd level, dictionary).

View File

@@ -387,6 +387,7 @@ The full set of available permissions is:
- ``MODIFY``
- ``AUTHORIZE``
- ``DESCRIBE``
- ``VECTOR_SEARCH_INDEXING``
..
- ``EXECUTE``
@@ -458,6 +459,9 @@ permissions can be granted on which types of resources, and which statements are
* - ``DESCRIBE``
- ``ALL ROLES``
- ``LIST ROLES`` on all roles or only roles granted to another specified role
* - ``VECTOR_SEARCH_INDEXING``
- ``ALL KEYSPACES``
- ``SELECT`` on all tables with vector search indexes
.. _grant-permission-statement:

View File

@@ -204,6 +204,10 @@ public:
return bool(_mask & mask_for(e));
}
bool intersects(const enum_set& other) const {
return bool(_mask & other._mask);
}
template<enum_type e>
void remove() {
_mask &= ~mask_for<e>();

View File

@@ -76,7 +76,7 @@ struct repair_row_level_start_response {
namespace locator {
enum class tablet_repair_incremental_mode : uint8_t {
regular,
incremental,
full,
disabled,
};

View File

@@ -99,9 +99,6 @@ std::set<sstring> get_disabled_features_from_db_config(const db::config& cfg, st
if (!cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
disabled.insert("KEYSPACE_STORAGE_OPTIONS"s);
}
if (!cfg.check_experimental(db::experimental_features_t::feature::VIEWS_WITH_TABLETS)) {
disabled.insert("VIEWS_WITH_TABLETS"s);
}
if (cfg.force_gossip_topology_changes()) {
if (cfg.enable_tablets_by_default()) {
throw std::runtime_error("Tablets cannot be enabled with gossip topology changes. Use either --tablets-mode-for-new-keyspaces=enabled|enforced or --force-gossip-topology-changes, but not both.");

View File

@@ -754,7 +754,7 @@ tablet_task_type tablet_task_type_from_string(const sstring& name) {
// The names are persisted in system tables so should not be changed.
static const std::unordered_map<locator::tablet_repair_incremental_mode, sstring> tablet_repair_incremental_mode_to_name = {
{locator::tablet_repair_incremental_mode::disabled, "disabled"},
{locator::tablet_repair_incremental_mode::regular, "regular"},
{locator::tablet_repair_incremental_mode::incremental, "incremental"},
{locator::tablet_repair_incremental_mode::full, "full"},
};
@@ -1204,10 +1204,23 @@ effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replica
void tablet_metadata_guard::check() noexcept {
auto erm = _table->get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_tablet.table);
auto& old_tmap = _erm->get_token_metadata().tablets().get_tablet_map(_tablet.table);
auto* trinfo = tmap.get_tablet_transition_info(_tablet.tablet);
if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage)) {
tablet_logger.debug("tablet_metadata_guard::check: table {}.{}, tablet {}, "
"old erm version {}, new erm version {}, old tablet map {}, new tablet map {}",
_table->schema()->ks_name(), _table->schema()->cf_name(),
_tablet,
_erm.get()->get_token_metadata().get_version(),
erm.get()->get_token_metadata().get_version(),
old_tmap,
tmap);
if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage) ||
old_tmap.tablet_count() != tmap.tablet_count())
{
tablet_logger.debug("tablet_metadata_guard::check: retain the erm and abort the guard");
_abort_source.request_abort();
} else {
tablet_logger.debug("tablet_metadata_guard::check: refresh the erm");
_erm = std::move(erm);
subscribe();
}

View File

@@ -162,11 +162,11 @@ sstring tablet_task_type_to_string(tablet_task_type);
tablet_task_type tablet_task_type_from_string(const sstring&);
// - regular (regular incremental repair): The incremental repair logic is enabled.
// - incremental (incremental repair): The incremental repair logic is enabled.
// Unrepaired sstables will be included for repair. Repaired sstables will be
// skipped. The incremental repair states will be updated after repair.
// - full (full incremental repair): The incremental repair logic is enabled.
// - full (full repair): The incremental repair logic is enabled.
// Both repaired and unrepaired sstables will be included for repair. The
// incremental repair states will be updated after repair.
@@ -175,12 +175,12 @@ tablet_task_type tablet_task_type_from_string(const sstring&);
// sstables_repaired_at in system.tablets table, will not be updated after
// repair.
enum class tablet_repair_incremental_mode : uint8_t {
regular,
incremental,
full,
disabled,
};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::regular};
constexpr tablet_repair_incremental_mode default_tablet_repair_incremental_mode{tablet_repair_incremental_mode::incremental};
sstring tablet_repair_incremental_mode_to_string(tablet_repair_incremental_mode);
tablet_repair_incremental_mode tablet_repair_incremental_mode_from_string(const sstring&);

52
main.cc
View File

@@ -2128,7 +2128,7 @@ sharded<locator::shared_token_metadata> token_metadata;
});
checkpoint(stop_signal, "starting sstables loader");
sst_loader.start(std::ref(db), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
sst_loader.start(std::ref(db), std::ref(ss), std::ref(messaging), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(sstm), maintenance_scheduling_group).get();
auto stop_sst_loader = defer_verbose_shutdown("sstables loader", [&sst_loader] {
sst_loader.stop().get();
});
@@ -2208,15 +2208,55 @@ sharded<locator::shared_token_metadata> token_metadata;
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(cfg->rf_rack_valid_keyspaces(), token_metadata.local().get());
// Materialized views and secondary indexes are still restricted and require specific configuration
// options to work. Make sure that if there are existing views or indexes, they don't violate
// the requirements imposed on them.
db.local().validate_tablet_views_indexes();
// Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
//
// Also, if the dictionary compression feature is not enabled, use
// LZ4Compressor as the default algorithm instead of LZ4WithDictsCompressor.
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
auto& sstable_compression_options = cfg->sstable_compression_user_table_options;
gms::feature::listener_registration reg_listener;
if (!sstable_compression_options.is_set() && !dicts_feature_enabled) {
if (sstable_compression_options().get_algorithm() != compression_parameters::algorithm::lz4_with_dicts) {
on_internal_error(startlog, "expected LZ4WithDictsCompressor as default algorithm for sstable_compression_user_table_options.");
}
startlog.info("SSTABLE_COMPRESSION_DICTS feature is disabled. Overriding default SSTable compression to use LZ4Compressor instead of LZ4WithDictsCompressor.");
compression_parameters original_params{sstable_compression_options().get_options()};
auto params = sstable_compression_options().get_options();
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(compression_parameters::algorithm::lz4));
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
sstable_compression_options(compression_parameters{params}, utils::config_file::config_source::None);
}
}).get();
// Register a callback to update the default compression algorithm when the feature is enabled.
// Precondition:
// The callback must run inside seastar::async context:
// - If the listener fires immediately, we are running inside seastar::async already.
// - If the listener is deferred, `feature_service::enable()` runs it inside seastar::async.
reg_listener = feature_service.local().sstable_compression_dicts.when_enabled([&sstable_compression_options, params = std::move(original_params)] {
startlog.info("SSTABLE_COMPRESSION_DICTS feature is now enabled. Overriding default SSTable compression to use LZ4WithDictsCompressor.");
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
sstable_compression_options(params, utils::config_file::config_source::None);
}
}).get();
});
}
try {
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
const auto& dicts_usage_allowed = cfg->sstable_compression_dictionaries_allow_in_ddl();
cfg->sstable_compression_user_table_options().validate(
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)),
compression_parameters::dicts_usage_allowed(dicts_usage_allowed));
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)));
} catch (const std::exception& e) {
startlog.error("Invalid sstable_compression_user_table_options: {}", e.what());
throw bad_configuration_error();
@@ -2426,7 +2466,7 @@ sharded<locator::shared_token_metadata> token_metadata;
checkpoint(stop_signal, "starting view building worker's background fibers");
with_scheduling_group(maintenance_scheduling_group, [&] {
view_building_worker.local().start_background_fibers();
return view_building_worker.local().init();
}).get();
auto drain_view_buiding_worker = defer_verbose_shutdown("draining view building worker", [&] {
view_building_worker.invoke_on_all(&db::view::view_building_worker::drain).get();

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:103bd12a1f0feb60d814da074b81ebafaa13059d1267ee3612c48a8bc96798b6
size 6242980
oid sha256:a2233592a0d28e525a0daf9a7f706929d0ae7588ffcaf7b4d76cf209245cbca1
size 6302064

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:2cb637e741a2b9badc96f3f175f15db257b9273ea43040289de7d72657b5505a
size 6240824
oid sha256:ad9e9cc9364a3f01cfe3f320cde2a8dce74bf2e9ed6dfbc3118a5110f2f720f3
size 6310124

View File

@@ -15,7 +15,6 @@
#include <seastar/core/format.hh>
static const char scylla_product_str[] = SCYLLA_PRODUCT;
static const char scylla_version_str[] = SCYLLA_VERSION;
static const char scylla_release_str[] = SCYLLA_RELEASE;
static const char scylla_build_mode_str[] = SCYLLA_BUILD_MODE_STR;
@@ -31,12 +30,9 @@ std::string scylla_build_mode()
}
std::string doc_link(std::string_view url_tail) {
const std::string_view product = scylla_product_str;
const std::string_view version = scylla_version_str;
const auto prefix = product == "scylla-enterprise" ? "enterprise" : "opensource";
std::string branch = product == "scylla-enterprise" ? "enterprise" : "master";
std::string branch = "master";
if (!version.ends_with("~dev")) {
std::vector<std::string> components;
boost::split(components, version, boost::algorithm::is_any_of("."));
@@ -45,7 +41,7 @@ std::string doc_link(std::string_view url_tail) {
branch = fmt::format("branch-{}.{}", components[0], components[1]);
}
return fmt::format("https://{}.docs.scylladb.com/{}/{}", prefix, branch, url_tail);
return fmt::format("https://docs.scylladb.com/manual/{}/{}", branch, url_tail);
}
// get the version number into writeable memory, so we can grep for it if we get a core dump

View File

@@ -420,7 +420,7 @@ future<std::tuple<bool, gc_clock::time_point>> repair_service::flush_hints(repai
}
if (!nodes_down.empty()) {
rlogger.warn("repair[{}]: Skipped sending repair_flush_hints_batchlog due to nodes_down={}, continue to run repair",
nodes_down, uuid);
uuid, nodes_down);
co_return std::make_tuple(hints_batchlog_flushed, flush_time);
}
co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, &times, &req] (locator::host_id node) -> future<> {

View File

@@ -196,6 +196,7 @@ struct row_level_repair_metrics {
uint64_t rx_hashes_nr{0};
uint64_t inc_sst_skipped_bytes{0};
uint64_t inc_sst_read_bytes{0};
uint64_t tablet_time_ms{0};
row_level_repair_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("repair", {
@@ -219,6 +220,8 @@ struct row_level_repair_metrics {
sm::description("Total number of bytes skipped from sstables for incremental repair on this shard.")),
sm::make_counter("inc_sst_read_bytes", inc_sst_read_bytes,
sm::description("Total number of bytes read from sstables for incremental repair on this shard.")),
sm::make_counter("tablet_time_ms", tablet_time_ms,
sm::description("Time spent on tablet repair on this shard in milliseconds.")),
});
}
};
@@ -3473,7 +3476,16 @@ future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
service::frozen_topology_guard topo_guard) {
auto start_time = flush_time;
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time, topo_guard);
co_return co_await repair.run();
bool is_tablet = shard_task.db.local().find_column_family(table_id).uses_tablets();
bool is_tablet_rebuild = shard_task.sched_info.for_tablet_rebuild;
auto t = std::chrono::steady_clock::now();
auto update_time = seastar::defer([&] {
if (is_tablet && !is_tablet_rebuild) {
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t);
_metrics.tablet_time_ms += duration.count();
}
});
co_await repair.run();
}
class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subscriber {

View File

@@ -12,6 +12,7 @@
#include <fmt/ranges.h>
#include <fmt/std.h>
#include <seastar/core/rwlock.hh>
#include "db/view/view.hh"
#include "locator/network_topology_strategy.hh"
#include "locator/tablets.hh"
#include "locator/token_metadata_fwd.hh"
@@ -86,6 +87,7 @@
#include "tracing/trace_keyspace_helper.hh"
#include <algorithm>
#include <flat_set>
using namespace std::chrono_literals;
using namespace db;
@@ -3483,6 +3485,37 @@ void database::check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces
}
}
void database::validate_tablet_views_indexes() const {
dblog.info("Verifying that all existing materialized views are valid");
const data_dictionary::database& db = this->as_data_dictionary();
std::flat_set<std::string_view> invalid_keyspaces;
for (const view_ptr& view : get_views()) {
const auto& ks = view->ks_name();
try {
db::view::validate_view_keyspace(db, ks);
} catch (...) {
invalid_keyspaces.emplace(ks);
}
}
if (invalid_keyspaces.empty()) {
dblog.info("All existing materialized views are valid");
return;
}
// `std::flat_set` guarantees iteration in the increasing order.
const std::string ks_list = invalid_keyspaces
| std::views::join_with(std::string_view(", "))
| std::ranges::to<std::string>();
dblog.warn("Some of the existing keyspaces violate the requirements "
"for using materialized views or secondary indexes. Those features require enabling "
"the configuration option `rf_rack_valid_keyspaces` and the cluster feature "
"`VIEWS_WITH_TABLETS`. The keyspaces that violate that condition: {}", ks_list);
}
utils::chunked_vector<uint64_t> compute_random_sorted_ints(uint64_t max_value, uint64_t n_values) {
static thread_local std::minstd_rand rng{std::random_device{}()};
std::uniform_int_distribution<uint64_t> dist(0, max_value);

View File

@@ -2091,6 +2091,20 @@ public:
// * the `locator::topology` instance corresponding to the passed `locator::token_metadata_ptr`
// must contain a complete list of racks and data centers in the cluster.
void check_rf_rack_validity(const bool enforce_rf_rack_valid_keyspaces, const locator::token_metadata_ptr) const;
/// Verify that all existing materialized views are valid.
///
/// We consider a materialized view valid if one of the following
/// conditions is satisfied:
/// * it resides in a vnode-based keyspace,
/// * it resides in a tablet-based keyspace, the cluster feature `VIEWS_WITH_TABLETS`
/// is enabled, and the configuration option `rf_rack_valid_keyspaces` is enabled.
///
/// Result:
/// * Depending on whether there are invalid materialized views, the function will
/// log that either everything's OK, or that there are some keyspaces that violate
/// the requirement.
void validate_tablet_views_indexes() const;
private:
// SSTable sampling might require considerable amounts of memory,
// so we want to limit the number of concurrent sampling operations.

Submodule seastar updated: c8a3515f9b...60e4b3b921

View File

@@ -93,9 +93,9 @@ future<> service::client_state::has_function_access(const sstring& ks, const sst
}
future<> service::client_state::has_column_family_access(const sstring& ks,
const sstring& cf, auth::permission p, auth::command_desc::type t) const {
const sstring& cf, auth::permission p, auth::command_desc::type t, std::optional<bool> is_vector_indexed) const {
auto r = auth::make_data_resource(ks, cf);
co_return co_await has_access(ks, {p, r, t});
co_return co_await has_access(ks, {p, r, t}, is_vector_indexed);
}
future<> service::client_state::has_schema_access(const schema& s, auth::permission p) const {
@@ -148,7 +148,7 @@ future<> service::client_state::check_internal_table_permissions(std::string_vie
return make_ready_future<>();
}
future<> service::client_state::has_access(const sstring& ks, auth::command_desc cmd) const {
future<> service::client_state::has_access(const sstring& ks, auth::command_desc cmd, std::optional<bool> is_vector_indexed) const {
if (ks.empty()) {
throw exceptions::invalid_request_exception("You have not set a keyspace for this session");
}
@@ -224,10 +224,40 @@ future<> service::client_state::has_access(const sstring& ks, auth::command_desc
ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser.");
}
if (cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) {
co_return co_await ensure_has_permission<auth::command_desc_with_permission_set>({auth::permission_set::of<auth::permission::SELECT, auth::permission::VECTOR_SEARCH_INDEXING>(), cmd.resource});
}
co_return co_await ensure_has_permission(cmd);
}
future<bool> service::client_state::check_has_permission(auth::command_desc cmd) const {
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc_with_permission_set& cmd) {
return permissions.intersects(cmd.permission);
}
static bool intersects_permissions(const auth::permission_set& permissions, const auth::command_desc& cmd) {
return permissions.contains(cmd.permission);
}
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc& cmd) const {
return format("User {} has no {} permission on {} or any of its parents",
*_user,
auth::permissions::to_string(cmd.permission),
cmd.resource);
}
sstring service::client_state::generate_authorization_error_msg(const auth::command_desc_with_permission_set& cmd) const {
sstring perm_names = fmt::format("{}", fmt::join(auth::permissions::to_strings(cmd.permission), ", "));
return format("User {} has none of the permissions ({}) on {} or any of its parents",
*_user,
perm_names,
cmd.resource);
}
template <typename Cmd>
future<bool> service::client_state::check_has_permission(Cmd cmd) const {
if (_is_internal) {
co_return true;
}
@@ -235,27 +265,29 @@ future<bool> service::client_state::check_has_permission(auth::command_desc cmd)
std::optional<auth::resource> parent_r = cmd.resource.parent();
auth::permission_set set = co_await auth::get_permissions(*_auth_service, *_user, cmd.resource);
if (set.contains(cmd.permission)) {
if (intersects_permissions(set, cmd)) {
co_return true;
}
if (parent_r) {
co_return co_await check_has_permission({cmd.permission, *parent_r});
co_return co_await check_has_permission<Cmd>({cmd.permission, *parent_r});
}
co_return false;
}
template future<bool> service::client_state::check_has_permission(auth::command_desc) const;
template future<bool> service::client_state::check_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
future<> service::client_state::ensure_has_permission(auth::command_desc cmd) const {
template <typename Cmd>
future<> service::client_state::ensure_has_permission(Cmd cmd) const {
return check_has_permission(cmd).then([this, cmd](bool ok) {
if (!ok) {
return make_exception_future<>(exceptions::unauthorized_exception(
format("User {} has no {} permission on {} or any of its parents",
*_user,
auth::permissions::to_string(cmd.permission),
cmd.resource)));
generate_authorization_error_msg(cmd)));
}
return make_ready_future<>();
});
}
template future<> service::client_state::ensure_has_permission(auth::command_desc) const;
template future<> service::client_state::ensure_has_permission<auth::command_desc_with_permission_set>(auth::command_desc_with_permission_set) const;
void service::client_state::set_keyspace(replica::database& db, std::string_view keyspace) {
// Skip keyspace validation for non-authenticated users. Apparently, some client libraries

View File

@@ -332,7 +332,7 @@ public:
future<> has_all_keyspaces_access(auth::permission) const;
future<> has_keyspace_access(const sstring&, auth::permission) const;
future<> has_column_family_access(const sstring&, const sstring&, auth::permission,
auth::command_desc::type = auth::command_desc::type::OTHER) const;
auth::command_desc::type = auth::command_desc::type::OTHER, std::optional<bool> is_vector_indexed = std::nullopt) const;
future<> has_schema_access(const schema& s, auth::permission p) const;
future<> has_schema_access(const sstring&, const sstring&, auth::permission p) const;
@@ -341,11 +341,13 @@ public:
future<> has_function_access(const sstring& ks, const sstring& function_signature, auth::permission p) const;
private:
future<> check_internal_table_permissions(std::string_view ks, std::string_view table_name, const auth::command_desc& cmd) const;
future<> has_access(const sstring& keyspace, auth::command_desc) const;
future<> has_access(const sstring& keyspace, auth::command_desc, std::optional<bool> is_vector_indexed = std::nullopt) const;
sstring generate_authorization_error_msg(const auth::command_desc&) const;
sstring generate_authorization_error_msg(const auth::command_desc_with_permission_set&) const;
public:
future<bool> check_has_permission(auth::command_desc) const;
future<> ensure_has_permission(auth::command_desc) const;
template<typename Cmd = auth::command_desc> future<bool> check_has_permission(Cmd) const;
template<typename Cmd = auth::command_desc> future<> ensure_has_permission(Cmd) const;
future<> maybe_update_per_service_level_params();
void update_per_service_level_params(qos::service_level_options& slo);

View File

@@ -54,29 +54,28 @@ void paxos_state::key_lock_map::release_semaphore_for_key(const dht::token& key)
}
}
future<paxos_state::replica_guard> paxos_state::get_replica_lock(const dht::token& key,
clock_type::time_point timeout, const dht::shard_replica_set& shards)
future<paxos_state::replica_guard> paxos_state::get_replica_lock(const schema& s, const dht::token& token,
clock_type::time_point timeout)
{
if (shards.empty()) {
on_internal_error(logger, "empty shards");
}
// When a tablet is migrated between shards on the same node, during the
// write_both_read_new state we begin switching reads to the new shard.
// Until the corresponding global barrier completes, some requests may still
// use write_both_read_old erm, while others already use the write_both_read_new erm.
// To ensure mutual exclusion between these two types of requests, we must
// acquire locks on both the old and new shards.
// Once the global barrier completes, no requests remain on the old shard,
// so we can safely switch to acquiring locks only on the new shard.
auto shards = s.table().get_effective_replication_map()->shards_ready_for_reads(s, token);
std::ranges::sort(shards);
replica_guard replica_guard;
replica_guard.resize(shards.size());
auto acquire = [&shards, &replica_guard, &key, timeout](unsigned index) {
return smp::submit_to(shards[index], [&key, timeout] {
auto g = make_lw_shared<guard>(_paxos_table_lock, key, timeout);
for (size_t i = 0; i < shards.size(); ++i) {
replica_guard[i] = co_await smp::submit_to(shards[i], [token, timeout] {
auto g = make_lw_shared<guard>(_paxos_table_lock, token, timeout);
return g->lock().then([g]{ return make_foreign(std::move(g)); });
}).then([&replica_guard, index](guard_foreign_ptr g) {
replica_guard[index] = std::move(g);
});
};
co_await acquire(0);
if (shards.size() > 1) {
co_await acquire(1);
}
co_return replica_guard;
}
@@ -86,16 +85,6 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
co_return m;
}
static dht::shard_replica_set shards_for_writes(const schema& s, dht::token token) {
auto shards = s.table().shard_for_writes(token);
if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) {
on_internal_error(paxos_state::logger,
format("invalid shard, this_shard_id {}, shard_for_writes {}", this_shard_id(), shards));
}
std::ranges::sort(shards);
return shards;
}
future<prepare_response> paxos_state::prepare(storage_proxy& sp, paxos_store& paxos_store, tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
@@ -111,8 +100,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, paxos_store& pa
}
});
const auto shards = shards_for_writes(*schema, token);
auto guard = co_await get_replica_lock(token, timeout, shards);
auto guard = co_await get_replica_lock(*schema, token, timeout);
// When preparing, we need to use the same time as "now" (that's the time we use to decide if something
// is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
@@ -203,8 +191,7 @@ future<bool> paxos_state::accept(storage_proxy& sp, paxos_store& paxos_store, tr
}
});
const auto shards = shards_for_writes(*schema, token);
auto guard = co_await get_replica_lock(token, timeout, shards);
auto guard = co_await get_replica_lock(*schema, token, timeout);
auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(proposal.ballot);
paxos_state state = co_await paxos_store.load_paxos_state(proposal.update.key(), schema, gc_clock::time_point(now_in_sec), timeout);
@@ -283,6 +270,8 @@ future<> paxos_state::learn(storage_proxy& sp, paxos_store& paxos_store, schema_
tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
}
co_await utils::get_local_injector().inject("paxos_state_learn_after_mutate", utils::wait_for_message(5min));
// We don't need to lock the partition key if there is no gap between loading paxos
// state and saving it, and here we're just blindly updating.
co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout);

View File

@@ -86,8 +86,8 @@ private:
using guard_foreign_ptr = foreign_ptr<lw_shared_ptr<guard>>;
using replica_guard = boost::container::static_vector<guard_foreign_ptr, 2>;
static future<replica_guard> get_replica_lock(const dht::token& key, clock_type::time_point timeout,
const dht::shard_replica_set& shards);
static future<replica_guard> get_replica_lock(const schema& s, const dht::token& token,
clock_type::time_point timeout);
utils::UUID _promised_ballot = utils::UUID_gen::min_time_UUID();
std::optional<proposal> _accepted_proposal;

View File

@@ -319,7 +319,7 @@ future<> service_level_controller::update_service_levels_cache(qos::query_contex
});
}
future<> service_level_controller::auth_integration::reload_cache() {
future<> service_level_controller::auth_integration::reload_cache(qos::query_context ctx) {
SCYLLA_ASSERT(this_shard_id() == global_controller);
const auto _ = _stop_gate.hold();
@@ -336,11 +336,12 @@ future<> service_level_controller::auth_integration::reload_cache() {
}
auto units = co_await get_units(_sl_controller._global_controller_db->notifications_serializer, 1);
auto& qs = qos_query_state(ctx);
auto& role_manager = _auth_service.underlying_role_manager();
const auto all_roles = co_await role_manager.query_all();
const auto hierarchy = co_await role_manager.query_all_directly_granted();
const auto all_roles = co_await role_manager.query_all(qs);
const auto hierarchy = co_await role_manager.query_all_directly_granted(qs);
// includes only roles with attached service level
const auto attributes = co_await role_manager.query_attribute_for_all("service_level");
const auto attributes = co_await role_manager.query_attribute_for_all("service_level", qs);
std::map<sstring, service_level_options> effective_sl_map;
@@ -403,7 +404,7 @@ future<> service_level_controller::update_cache(update_both_cache_levels update_
}
if (_auth_integration) {
co_await _auth_integration->reload_cache();
co_await _auth_integration->reload_cache(ctx);
}
}

View File

@@ -173,7 +173,7 @@ public:
future<std::vector<cql3::description>> describe_attached_service_levels();
/// Must be executed on shard 0.
future<> reload_cache();
future<> reload_cache(qos::query_context ctx);
void clear_cache();
};

View File

@@ -18,26 +18,23 @@ namespace service {
static logging::logger slogger("group0_tombstone_gc_handler");
raft::server* group0_server_accessor::get_server() const {
return _raft_gr.find_server(_group0_id);
}
lowres_clock::duration group0_state_id_handler::get_refresh_interval(const replica::database& db) {
return std::chrono::milliseconds{db.get_config().group0_tombstone_gc_refresh_interval_in_ms()};
}
void group0_state_id_handler::refresh() {
auto* const group0_server = _server_accessor.get_server();
if (!group0_server) {
slogger.debug("Skipping due to group0 server not found");
return;
}
// It's not enough to consider only the current group 0 members. In the Raft-based recovery procedure, there can be
// nodes that haven't joined the current group 0 yet but they have belonged to a different group 0 and thus have
// a non-empty group 0 state ID.
//
// Ignored nodes are permanently banned, so we can safely filter them out even if they belong to the group 0.
const auto& group0_members = std::ranges::join_view(std::to_array({
std::views::all(_topo_sm._topology.normal_nodes),
std::views::all(_topo_sm._topology.transition_nodes)
})) | std::views::keys | std::ranges::views::filter([this] (const raft::server_id& id) {
return !_topo_sm._topology.ignored_nodes.contains(id);
}) | std::ranges::to<std::vector>();
const auto& group0_members = std::invoke([&] {
auto members = group0_server->get_configuration().current;
members.merge(group0_server->get_configuration().previous);
return members;
});
if (group0_members.empty()) {
slogger.info("Skipping due to empty group0");
return;
@@ -48,10 +45,10 @@ void group0_state_id_handler::refresh() {
std::vector<raft::server_id> group0_members_missing_endpoint;
std::vector<raft::server_id> group0_members_missing_state_id;
const auto& group0_members_state_ids = group0_members | std::ranges::views::transform([&](const auto& member) -> std::optional<utils::UUID> {
const auto* state_id_ptr = _gossiper.get_application_state_ptr(locator::host_id{member.addr.id.uuid()}, gms::application_state::GROUP0_STATE_ID);
const auto& group0_members_state_ids = group0_members | std::ranges::views::transform([&](const auto& id) -> std::optional<utils::UUID> {
const auto* state_id_ptr = _gossiper.get_application_state_ptr(locator::host_id{id.uuid()}, gms::application_state::GROUP0_STATE_ID);
if (!state_id_ptr) {
group0_members_missing_state_id.push_back(member.addr.id);
group0_members_missing_state_id.push_back(id);
return std::nullopt;
}
@@ -100,11 +97,10 @@ void group0_state_id_handler::refresh() {
gc_state.update_group0_refresh_time(tombstone_gc_time);
}
group0_state_id_handler::group0_state_id_handler(
replica::database& local_db, gms::gossiper& gossiper, group0_server_accessor server_accessor)
: _local_db(local_db)
group0_state_id_handler::group0_state_id_handler(topology_state_machine& topo_sm, replica::database& local_db, gms::gossiper& gossiper)
: _topo_sm(topo_sm)
, _local_db(local_db)
, _gossiper(gossiper)
, _server_accessor(server_accessor)
, _refresh_interval(get_refresh_interval(local_db)) {
}

View File

@@ -8,6 +8,7 @@
#pragma once
#include "raft/raft.hh"
#include "service/topology_state_machine.hh"
#include "utils/UUID.hh"
#include <seastar/core/future.hh>
#include <seastar/core/timer.hh>
@@ -29,25 +30,11 @@ namespace service {
class raft_group_registry;
class group0_server_accessor {
raft_group_registry& _raft_gr;
raft::group_id _group0_id;
public:
group0_server_accessor(raft_group_registry& raft_gr, raft::group_id group0_id)
: _raft_gr(raft_gr)
, _group0_id(group0_id) {
}
[[nodiscard]] raft::server* get_server() const;
};
class group0_state_id_handler {
topology_state_machine& _topo_sm;
replica::database& _local_db;
gms::gossiper& _gossiper;
group0_server_accessor _server_accessor;
lowres_clock::duration _refresh_interval;
timer<> _timer;
@@ -60,7 +47,7 @@ class group0_state_id_handler {
void refresh();
public:
group0_state_id_handler(replica::database& local_db, gms::gossiper& gossiper, group0_server_accessor server_accessor);
group0_state_id_handler(topology_state_machine& topo_sm, replica::database& local_db, gms::gossiper& gossiper);
void run();

View File

@@ -54,12 +54,13 @@ namespace service {
static logging::logger slogger("group0_raft_sm");
group0_state_machine::group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss,
group0_server_accessor server_accessor, gms::gossiper& gossiper, gms::feature_service& feat,
gms::gossiper& gossiper, gms::feature_service& feat,
bool topology_change_enabled)
: _client(client), _mm(mm), _sp(sp), _ss(ss)
, _gate("group0_state_machine")
, _topology_change_enabled(topology_change_enabled)
, _state_id_handler(sp.local_db(), gossiper, server_accessor), _feature_service(feat)
, _state_id_handler(ss._topology_state_machine, sp.local_db(), gossiper)
, _feature_service(feat)
, _topology_on_raft_support_listener(feat.supports_consistent_topology_changes.when_enabled([this] () noexcept {
// Using features to decide whether to start fetching topology snapshots
// or not is technically not correct because we also use features to guard

View File

@@ -118,7 +118,7 @@ class group0_state_machine : public raft_state_machine {
future<> merge_and_apply(group0_state_machine_merger& merger);
public:
group0_state_machine(raft_group0_client& client, migration_manager& mm, storage_proxy& sp, storage_service& ss,
group0_server_accessor server_accessor, gms::gossiper& gossiper, gms::feature_service& feat, bool topology_change_enabled);
gms::gossiper& gossiper, gms::feature_service& feat, bool topology_change_enabled);
future<> apply(std::vector<raft::command_cref> command) override;
future<raft::snapshot_id> take_snapshot() override;
void drop_snapshot(raft::snapshot_id id) override;

View File

@@ -497,7 +497,15 @@ future<> group0_voter_handler::update_nodes(
};
// Helper for adding a single node to the nodes list
auto add_node = [&nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
auto add_node = [this, &nodes, &group0_config, &leader_id](const raft::server_id& id, const replica_state& rs, bool is_alive) {
// Some topology members may not belong to the new group 0 in the Raft-based recovery procedure.
if (!group0_config.contains(id)) {
if (!_gossiper.get_recovery_leader()) {
rvlogger.warn("node {} in state {} is not a part of the group 0 configuration {}, ignoring",
id, rs.state, group0_config);
}
return;
}
const auto is_voter = group0_config.can_vote(id);
const auto is_leader = (id == leader_id);
nodes.emplace(id, group0_voter_calculator::node_descriptor{

View File

@@ -243,7 +243,7 @@ const raft::server_id& raft_group0::load_my_id() {
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm, bool topology_change_enabled) {
auto state_machine = std::make_unique<group0_state_machine>(
_client, mm, qp.proxy(), ss, group0_server_accessor{_raft_gr, gid}, _gossiper, _feat, topology_change_enabled);
_client, mm, qp.proxy(), ss, _gossiper, _feat, topology_change_enabled);
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.failure_detector(), gid, my_id);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
@@ -745,7 +745,10 @@ future<> raft_group0::setup_group0_if_exist(db::system_keyspace& sys_ks, service
} else {
// We'll disable them once we complete the upgrade procedure.
}
} else if (!qp.db().get_config().recovery_leader.is_set()) {
} else if (qp.db().get_config().recovery_leader.is_set()) {
group0_log.info("Disabling migration_manager schema pulls in the Raft-based recovery procedure");
co_await mm.disable_schema_pulls();
} else {
// Scylla has bootstrapped earlier but group 0 ID is not present and we are not recovering from majority loss
// using the Raft-based procedure. This means we're upgrading.
// Upgrade will start through a feature listener created after we enter NORMAL state.

View File

@@ -1599,6 +1599,7 @@ protected:
service_permit _permit; // holds admission permit until operation completes
db::per_partition_rate_limit::info _rate_limit_info;
db::view::update_backlog _view_backlog; // max view update backlog of all participating targets
utils::small_vector<gate::holder, 2> _holders;
protected:
virtual bool waited_for(locator::host_id from) = 0;
@@ -1630,6 +1631,11 @@ public:
if (cancellable) {
register_cancellable();
}
attach_to(_proxy->_write_handlers_gate);
}
void attach_to(gate& g) {
_holders.push_back(g.hold());
}
virtual ~abstract_write_response_handler() {
--_stats.writes;
@@ -7143,54 +7149,65 @@ void storage_proxy::on_released(const locator::host_id& hid) {
}
}
void storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun) {
SCYLLA_ASSERT(thread::running_in_thread());
future<> storage_proxy::cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun) {
gate g;
auto it = _cancellable_write_handlers_list->begin();
while (it != _cancellable_write_handlers_list->end()) {
auto guard = it->shared_from_this();
if (filter_fun(*it) && _response_handlers.contains(it->id())) {
it->timeout_cb();
// timeout_cb() may destroy the current handler. Since the list uses
// bi::link_mode<bi::auto_unlink>, destruction automatically unlinks the
// element. To avoid accessing an invalidated iterator, we cache `next`
// before invoking timeout_cb().
const auto next = std::next(it);
if (filter_fun(*it)) {
it->attach_to(g);
if (_response_handlers.contains(it->id())) {
it->timeout_cb();
}
}
++it;
it = next;
if (need_preempt() && it != _cancellable_write_handlers_list->end()) {
// Save the iterator position. If the handler is destroyed during yield(),
// the iterator will be updated to point to the next item in the list.
//
// Handler destruction triggers sending a response to the client.
// To avoid delaying that, we dont hold a strong reference here; instead,
// iterator_guard handles safe iterator updates while allowing prompt
// handler destruction and client response.
cancellable_write_handlers_list::iterator_guard ig{*_cancellable_write_handlers_list, it};
seastar::thread::yield();
co_await coroutine::maybe_yield();
}
}
co_await g.close();
}
void storage_proxy::on_down(const gms::inet_address& endpoint, locator::host_id id) {
// FIXME: make gossiper notifictaions to pass host ids
return cancel_write_handlers([id] (const abstract_write_response_handler& handler) {
cancel_write_handlers([id] (const abstract_write_response_handler& handler) {
const auto& targets = handler.get_targets();
return std::ranges::find(targets, id) != targets.end();
});
}).get();
};
future<> storage_proxy::drain_on_shutdown() {
//NOTE: the thread is spawned here because there are delicate lifetime issues to consider
// and writing them down with plain futures is error-prone.
return async([this] {
cancel_all_write_response_handlers().get();
_hints_resource_manager.stop().get();
});
co_await cancel_all_write_response_handlers();
co_await _hints_resource_manager.stop();
}
future<> storage_proxy::abort_view_writes() {
return async([this] {
cancel_write_handlers([] (const abstract_write_response_handler& handler) { return handler.is_view(); });
return cancel_write_handlers([] (const abstract_write_response_handler& handler) {
return handler.is_view();
});
}
future<> storage_proxy::abort_batch_writes() {
return async([this] {
cancel_write_handlers([] (const abstract_write_response_handler& handler) { return handler.is_batch(); });
return cancel_write_handlers([] (const abstract_write_response_handler& handler) {
return handler.is_batch();
});
}
future<>
storage_proxy::stop() {
return make_ready_future<>();
co_await utils::get_local_injector().inject("storage_proxy::stop", utils::wait_for_message(5min));
}
locator::token_metadata_ptr storage_proxy::get_token_metadata_ptr() const noexcept {
@@ -7202,12 +7219,11 @@ future<utils::chunked_vector<dht::token_range_endpoints>> storage_proxy::describ
}
future<> storage_proxy::cancel_all_write_response_handlers() {
auto f = _write_handlers_gate.close();
while (!_response_handlers.empty()) {
_response_handlers.begin()->second->timeout_cb();
if (!_response_handlers.empty()) {
co_await maybe_yield();
}
co_await coroutine::maybe_yield();
}
co_await std::move(f);
}
}

View File

@@ -318,6 +318,8 @@ private:
class cancellable_write_handlers_list;
std::unique_ptr<cancellable_write_handlers_list> _cancellable_write_handlers_list;
gate _write_handlers_gate;
/* This is a pointer to the shard-local part of the sharded cdc_service:
* storage_proxy needs access to cdc_service to augment mutations.
*
@@ -500,7 +502,7 @@ private:
future<> mutate_counters(Range&& mutations, db::consistency_level cl, tracing::trace_state_ptr tr_state, service_permit permit, clock_type::time_point timeout);
// Retires (times out) write response handlers which were constructed as `cancellable` and pass the given filter.
void cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);
future<> cancel_write_handlers(noncopyable_function<bool(const abstract_write_response_handler&)> filter_fun);
/**
* Returns whether for a range query doing a query against merged is likely

View File

@@ -5928,7 +5928,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
}
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
const auto current_version = ss._shared_token_metadata.get()->get_version();
rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
rtlogger.info("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
version, current_version);
// This shouldn't happen under normal operation, it's only plausible
@@ -5949,7 +5949,7 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
co_await ss._shared_token_metadata.stale_versions_in_use();
co_await get_topology_session_manager().drain_closing_sessions();
rtlogger.debug("raft_topology_cmd::barrier_and_drain done");
rtlogger.info("raft_topology_cmd::barrier_and_drain done");
});
co_await utils::get_local_injector().inject("raft_topology_barrier_and_drain_fail", [this] (auto& handler) -> future<> {
@@ -6693,11 +6693,14 @@ future<std::unordered_map<sstring, sstring>> storage_service::add_repair_tablet_
// repair can only be requested for the base table, and this will repair the base table's tablets
// and all its colocated tablets as well.
if (!get_token_metadata().tablets().is_base_table(table)) {
auto table_schema = _db.local().find_schema(table);
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
throw std::invalid_argument(::format(
"Cannot set repair request on table {} because it is colocated with the base table {}. "
"Cannot set repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
"Repair requests can be made only on the base table. "
"Repairing the base table will also repair all tables colocated with it.",
table, get_token_metadata().tablets().get_base_table(table)));
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
@@ -6777,10 +6780,13 @@ future<> storage_service::del_repair_tablet_request(table_id table, locator::tab
// see add_repair_tablet_request. repair requests can only be added on base tables.
if (!get_token_metadata().tablets().is_base_table(table)) {
auto table_schema = _db.local().find_schema(table);
auto base_schema = _db.local().find_schema(get_token_metadata().tablets().get_base_table(table));
throw std::invalid_argument(::format(
"Cannot delete repair request on table {} because it is colocated with the base table {}. "
"Cannot delete repair request on table '{}'.'{}' because it is colocated with the base table '{}'.'{}'. "
"Repair requests can be added and deleted only on the base table.",
table, get_token_metadata().tablets().get_base_table(table)));
table_schema->ks_name(), table_schema->cf_name(), base_schema->ks_name(), base_schema->cf_name()));
}
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
@@ -7163,6 +7169,20 @@ future<> storage_service::await_topology_quiesced() {
co_await _topology_state_machine.await_not_busy();
}
future<bool> storage_service::verify_topology_quiesced(token_metadata::version_t expected_version) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.verify_topology_quiesced(expected_version);
});
}
co_await _group0->group0_server().read_barrier(&_group0_as);
co_return _topology_state_machine._topology.version == expected_version && !_topology_state_machine._topology.is_busy();
}
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
join_node_request_result result;
rtlogger.info("received request to join from host_id: {}", params.host_id);
@@ -8135,6 +8155,20 @@ void storage_service::set_topology_change_kind(topology_change_kind kind) {
_gossiper.set_topology_state_machine(kind == topology_change_kind::raft ? & _topology_state_machine : nullptr);
}
bool storage_service::raft_topology_change_enabled() const {
if (this_shard_id() != 0) {
on_internal_error(slogger, "raft_topology_change_enabled() must run on shard 0");
}
return _topology_change_kind_enabled == topology_change_kind::raft;
}
bool storage_service::legacy_topology_change_enabled() const {
if (this_shard_id() != 0) {
on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0");
}
return _topology_change_kind_enabled == topology_change_kind::legacy;
}
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
_protocol_servers.push_back(&server);
if (start_instantly) {
@@ -8150,7 +8184,7 @@ future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, n
return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f));
}
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
}

View File

@@ -358,7 +358,7 @@ public:
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
private:
inet_address get_broadcast_address() const noexcept {
@@ -870,12 +870,8 @@ private:
topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const;
public:
bool raft_topology_change_enabled() const {
return _topology_change_kind_enabled == topology_change_kind::raft;
}
bool legacy_topology_change_enabled() const {
return _topology_change_kind_enabled == topology_change_kind::legacy;
}
bool raft_topology_change_enabled() const;
bool legacy_topology_change_enabled() const;
private:
future<> _raft_state_monitor = make_ready_future<>();
@@ -1002,7 +998,11 @@ public:
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> set_tablet_balancing_enabled(bool);
future<> await_topology_quiesced();
// Verifies topology is not busy, and also that topology version hasn't changed since the one provided
// by the caller.
future<bool> verify_topology_quiesced(token_metadata::version_t expected_version);
// In the maintenance mode, other nodes won't be available thus we disabled joining
// the token ring and the token metadata won't be populated with the local node's endpoint.

View File

@@ -760,20 +760,22 @@ public:
const locator::topology& topo = _tm->get_topology();
migration_plan plan;
// Prepare plans for each DC separately and combine them to be executed in parallel.
for (auto&& dc : topo.get_datacenters()) {
if (_db.get_config().rf_rack_valid_keyspaces()) {
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
auto rack_plan = co_await make_plan(dc, rack);
auto level = rack_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migrations in rack {} in DC {}", rack_plan.size(), rack, dc);
plan.merge(std::move(rack_plan));
if (!utils::get_local_injector().enter("tablet_migration_bypass")) {
// Prepare plans for each DC separately and combine them to be executed in parallel.
for (auto&& dc : topo.get_datacenters()) {
if (_db.get_config().rf_rack_valid_keyspaces()) {
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
auto rack_plan = co_await make_plan(dc, rack);
auto level = rack_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migrations in rack {} in DC {}", rack_plan.size(), rack, dc);
plan.merge(std::move(rack_plan));
}
} else {
auto dc_plan = co_await make_plan(dc);
auto level = dc_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migrations in DC {}", dc_plan.size(), dc);
plan.merge(std::move(dc_plan));
}
} else {
auto dc_plan = co_await make_plan(dc);
auto level = dc_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migrations in DC {}", dc_plan.size(), dc);
plan.merge(std::move(dc_plan));
}
}

View File

@@ -145,6 +145,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
std::unordered_map<locator::host_id, locator::load_stats> _load_stats_per_node;
serialized_action _tablet_load_stats_refresh;
static constexpr std::chrono::seconds cdc_streams_gc_refresh_interval = std::chrono::seconds(60);
std::chrono::milliseconds _ring_delay;
gate::holder _group0_holder;
@@ -765,6 +767,42 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
}
future<> cdc_streams_gc_fiber() {
auto can_proceed = [this] { return !_async_gate.is_closed() && !_as.abort_requested(); };
while (can_proceed()) {
bool sleep = true;
try {
auto guard = co_await start_operation();
utils::chunked_vector<canonical_mutation> updates;
co_await _cdc_gens.garbage_collect_cdc_streams(updates, guard.write_timestamp());
if (!updates.empty()) {
co_await update_topology_state(std::move(guard), std::move(updates), "CDC streams GC");
} else {
release_guard(std::move(guard));
}
} catch (raft::request_aborted&) {
rtlogger.debug("CDC streams GC fiber aborted");
sleep = false;
} catch (seastar::abort_requested_exception&) {
rtlogger.debug("CDC streams GC fiber aborted");
sleep = false;
} catch (...) {
rtlogger.warn("CDC streams GC fiber got error {}", std::current_exception());
}
auto refresh_interval = utils::get_local_injector().is_enabled("short_cdc_streams_gc_refresh_interval") ?
std::chrono::seconds(1) : cdc_streams_gc_refresh_interval;
if (sleep && can_proceed()) {
try {
co_await seastar::sleep_abortable(refresh_interval, _as);
} catch (...) {
rtlogger.debug("CDC streams GC: sleep failed: {}", std::current_exception());
}
}
}
}
// If a node crashes after initiating gossip and before joining group0, it becomes orphan and
// remains in gossip. This background fiber periodically checks for such nodes and purges those
// entries from gossiper by committing the node as left in raft.
@@ -1471,10 +1509,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
utils::get_local_injector().inject("stream_tablet_fail_on_drain",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
}
utils::get_local_injector().inject("stream_tablet_fail",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
if (action_failed(tablet_state.streaming)) {
if (action_failed(tablet_state.streaming) || utils::get_local_injector().enter("stream_tablet_fail")) {
const bool cleanup = utils::get_local_injector().enter("stream_tablet_move_to_cleanup");
bool critical_disk_utilization = false;
if (auto stats = _tablet_allocator.get_load_stats()) {
@@ -1619,8 +1655,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del_transition(last_token)
.del_migration_task_info(last_token, _feature_service)
.build());
if (trinfo.pending_replica) {
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *trinfo.pending_replica, last_token);
auto leaving_replica = get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
if (leaving_replica) {
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *leaving_replica, last_token);
}
}
break;
@@ -1854,6 +1891,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// of token metadata will complete before we update topology.
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));
co_await utils::get_local_injector().inject("tablet_resize_finalization_post_barrier", utils::wait_for_message(std::chrono::minutes(2)));
auto tm = get_token_metadata_ptr();
auto plan = co_await _tablet_allocator.balance_tablets(tm, {}, get_dead_nodes());
@@ -1907,7 +1946,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet split finalization"));
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet resize finalization"));
}
future<> handle_truncate_table(group0_guard guard) {
@@ -3708,6 +3747,7 @@ future<> topology_coordinator::run() {
co_await fence_previous_coordinator();
auto cdc_generation_publisher = cdc_generation_publisher_fiber();
auto cdc_streams_gc = cdc_streams_gc_fiber();
auto tablet_load_stats_refresher = start_tablet_load_stats_refresher();
auto gossiper_orphan_remover = gossiper_orphan_remover_fiber();
auto group0_voter_refresher = group0_voter_refresher_fiber();
@@ -3751,6 +3791,7 @@ future<> topology_coordinator::run() {
co_await std::move(tablet_load_stats_refresher);
co_await _tablet_load_stats_refresh.join();
co_await std::move(cdc_generation_publisher);
co_await std::move(cdc_streams_gc);
co_await std::move(gossiper_orphan_remover);
co_await std::move(group0_voter_refresher);
co_await std::move(vb_coordinator_fiber);

View File

@@ -539,17 +539,13 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
}
}
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) const {
void compression_parameters::validate(dicts_feature_enabled dicts_enabled) const {
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
if (!dicts_enabled) {
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
"all nodes are upgraded to a versions which supports it",
algorithm_to_name(_algorithm)));
}
if (!dicts_allowed) {
throw std::runtime_error(std::format("sstable_compression {} has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`",
algorithm_to_name(_algorithm)));
}
}
if (_chunk_length) {
auto chunk_length = _chunk_length.value();

View File

@@ -106,8 +106,7 @@ public:
std::optional<int> zstd_compression_level() const { return _zstd_compression_level; }
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
void validate(dicts_feature_enabled, dicts_usage_allowed) const;
void validate(dicts_feature_enabled) const;
std::map<sstring, sstring> get_options() const;

View File

@@ -3280,7 +3280,7 @@ future<uint64_t> sstable::estimated_keys_for_range(const dht::token_range& range
std::vector<unsigned>
sstable::compute_shards_for_this_sstable(const dht::sharder& sharder_) const {
std::unordered_set<unsigned> shards;
dht::partition_range_vector token_ranges;
utils::chunked_vector<dht::partition_range> token_ranges;
const auto* sm = _components->scylla_metadata
? _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>()
: nullptr;
@@ -3298,7 +3298,7 @@ sstable::compute_shards_for_this_sstable(const dht::sharder& sharder_) const {
};
token_ranges = sm->token_ranges.elements
| std::views::transform(disk_token_range_to_ring_position_range)
| std::ranges::to<dht::partition_range_vector>();
| std::ranges::to<utils::chunked_vector<dht::partition_range>>();
}
sstlog.trace("{}: token_ranges={}", get_filename(), token_ranges);
auto sharder = dht::ring_position_range_vector_sharder(sharder_, std::move(token_ranges));
@@ -3642,7 +3642,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
auto cached_partitions_file = caching == use_caching::yes
? _cached_partitions_file
: seastar::make_shared<cached_file>(
_partitions_file,
uncached_partitions_file(),
_manager.get_cache_tracker().get_index_cached_file_stats(),
_manager.get_cache_tracker().get_lru(),
_manager.get_cache_tracker().region(),
@@ -3652,7 +3652,7 @@ std::unique_ptr<abstract_index_reader> sstable::make_index_reader(
auto cached_rows_file = caching == use_caching::yes
? _cached_rows_file
: seastar::make_shared<cached_file>(
_rows_file,
uncached_rows_file(),
_manager.get_cache_tracker().get_index_cached_file_stats(),
_manager.get_cache_tracker().get_lru(),
_manager.get_cache_tracker().region(),

Some files were not shown because too many files have changed in this diff Show More