Compare commits

..

59 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
91aab869b8 Fix exception name and remove unnecessary template keyword
- Use correct exception type: bufsize_mismatch_exception instead of bufsize_mismatch_error
- Remove unnecessary template keyword for read<pos_type>()
- Match parameter order with existing pattern (actual, expected)

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-11-06 12:01:01 +00:00
copilot-swe-agent[bot]
168e0a40e3 Fix oversized allocation in sstables::parse by using fragmented buffer
This addresses issue where reading summary positions could cause
large contiguous memory allocations (249856 bytes reported).
Added read_exactly_fragmented() method to random_access_reader to
support reading into fragmented buffers, avoiding oversized allocations.

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-11-06 11:57:10 +00:00
copilot-swe-agent[bot]
ac54f21504 Initial plan 2025-11-06 11:50:13 +00:00
Asias He
dbeca7c14d repair: Add metric for time spent on tablet repair
It is useful to check time spent on tablet repair. It can be used to
compare incremental repair and non-incremental repair. The time does not
include the time waiting for the tablet scheduler to schedule the tablet
repair task.

Fixes #26505

Closes scylladb/scylladb#26502
2025-11-06 10:00:20 +03:00
Wojciech Mitros
0a22ac3c9e mv: don't mark the view as built if the reader produced no partitions
When we build a materialized view we read the entire base table from start to
end to generate all required view udpates. If a view is created while another view
is being built on the same base table, this is optimized - we start generating
view udpates for the new view from the base table rows that we're currently
reading, and we read the missed initial range again after the previous view
finishes building.
The view building progress is only updated after generating view updates for
some read partitions. However, there are scenarios where we'll generate no
view updates for the entire read range. If this was not handled we could
end up in an infinite view building loop like we did in https://github.com/scylladb/scylladb/issues/17293
To handle this, we mark the view as built if the reader generated no partitions.
However, this is not always the correct conclusion. Another scenario where
the reader won't encounter any partitions is when view building is interrupted,
and then we perform a reshard. In this scenario, we set the reader for all
shards to the last unbuilt token for an existing partition before the reshard.
However, this partition may not exist on a shard after reshard, and if there
are also no partitions with higher tokens, the reader will generate no partitions
even though it hasn't finished view building.
Additionally, we already have a check that prevents infinite view building loops
without taking the partitions generated by the reader into account. At the end
of stream, before looping back to the start, we advance current_key to the end
of the built range and check for built views in that range. This handles the case
where the entire range is empty - the conditions for a built view are:
1. the "next_token" is no greater than "first_token" (the view building process
looped back, so we've built all tokens above "first_token")
2. the "current_token" is no less than "first_token" (after looping back, we've
built all tokens below "first_token")

If the range is empty, we'll pass these conditions on an empty range after advancing
"current_key" to the end because:
1. after looping back, "next_token" will be set to `dht::minimum_token`
2. "current_key" will be set to `dht::ring_position::max()`

In this patch we remove the check for partitions generated by the reader. This fixes
the issue with resharding and it does not resurrect the issue with infinite view building
that the check was introduced for.

Fixes https://github.com/scylladb/scylladb/issues/26523

Closes scylladb/scylladb#26635
2025-11-05 17:02:32 +02:00
Nadav Har'El
8a07b41ae4 test/cqlpy: add test confirming page_size=0 disables paging
In pull request #26384 a discussion started whether page_size=0 really
disables paging, or maybe one needs page_size=-1 to truly disable paging.

The reason for that discussion was commit 08c81427b that started to
use page_size=-1 for internal unpaged queries, and commit 76b31a3 that
incorrectly claimed that page_size>=0 means paging is enabled.

This patch introduces a test that confirms that with page_size=0, paging
is truly disabled - including the size-based (1MB) paging.

The new test is Scylla-only, because Cassandra is anyway missing the
size-based page cutoff (see CASSANDRA-11745).

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#26742
2025-11-05 15:52:16 +03:00
Tomasz Grabiec
f8879d797d tablet_allocator: Avoid load balancer failure when replacing the last node in a rack
Introduced in 9ebdeb2

The problem is specific to node replacing and rack-list RF. The
culprit is in the part of the load balancer which determines rack's
shard count. If we're replacing the last node, the rack will contain
no normal nodes, and shards_per_rack will have no entry for the rack,
on which the table still has replicas. This throws std::out_of_range
and fails the tablet draining stage, and node replace is failed.

No backport because the problem exists only on master.

Fixes #26768

Closes scylladb/scylladb#26783
2025-11-05 15:49:51 +03:00
Avi Kivity
8e480110c2 dist: housekeeping: set python.multiprocessing fork mode to "fork"
Python 3.14 changed the multiprocessing fork mode to "forkserver",
presumably for good reasons. However, it conflicts with our
relocatable Python system. "forkserver" forks and execs a Python
process at startup, but it does this without supplying our relocated
ld.so. The system ld.so detects a conflict and crashes.

Fix this by switching back to "fork", which is sufficient for
housekeeping's modest needs.

Closes scylladb/scylladb#26831
2025-11-05 15:47:38 +03:00
Wojciech Mitros
977fa91e3d view_building_coordinator: rollback tasks on the leaving tablet replica
When a tablet migration is started, we abort the corresponding view
building tasks (i.e. we change the state of those tasks to "ABORTED").
However, we don't change the host and shard of these tasks until the
migration successfully completes. When for some reason we have to
rollback the migration, that means the migration didn't finish and
the aborted task still has the host and shard of the migration
source. So when we recreate tasks that should no longer be aborted
due to a rolled-back migration, we should look at the aborted tasks
of the source (leaving) replica. But we don't do it and we look at
the aborted tasks of the target replica.
In this patch we adjust the rollback mechanism to recreate tasks
for the migration source instead of destination. We also fix the
test that should have detected this issue - the injection that
the test was using didn't make us rollback, but we simply retried
a stage of the tablet migration. By using one_shot=False and adding
a second injection, we can now guarantee that the migration will
eventually fail and we'll continue to the 'cleanup_target' and
'revert_migration' stages.

Fixes https://github.com/scylladb/scylladb/issues/26691

Closes scylladb/scylladb#26825
2025-11-05 10:44:06 +01:00
Pavel Emelyanov
2cb98fd612 Merge 'api: storage_service: tasks: unify sync and async compaction APIs' from Aleksandra Martyniuk
Currently, all apis that start a compaction have two versions:
synchronous and asynchronous. They share most of the implementation,
but some checks and params have diverged.

Unify the handlers of synchronous and asynchronous cleanup, major
compaction, and upgrade_sstables.

Fixes: https://github.com/scylladb/scylladb/issues/26715.

Requires backports to all live versions

Closes scylladb/scylladb#26746

* github.com:scylladb/scylladb:
  api: storage_service: tasks: unify upgrade_sstable
  api: storage_service: tasks: force_keyspace_cleanup
  api: storage_service: tasks: unify force_keyspace_compaction
2025-11-05 10:47:14 +03:00
Pavel Emelyanov
59019bc9a9 Merge 'Alternator: allow warning on auth errors before enabling enforcement' from Nadav Har'El
An Alternator user was recently "bit" when switching `alternator_enforce_authorization` from "false" to "true": ְְְAfter the configuration change, all application requests suddenly failed because unbeknownst to the user, their application used incorrect secret keys.

This series introduces a solution for users who want to **safely** switch `alternator_enforce_authorization`  from "false" to "true": Before switching from "false" to "true", the user can temporarily switch a new option, `alternator_warn_authorization`, to true. In this "warn" mode, authentication and authorization errors are counted in metrics (`scylla_alternator_authentication_failures` and `scylla_alternator_authorization_failures`) and logged as WARNings, but the user's application continues to work. The user can use these metrics or log messages to learn of errors in their application's setup, fix them, and only do the switch of `alternator_enforce_authorization` when the metrics or log messages show there are no more errors.

The first patch is the implementation of the the feature - the new configuration option, the metrics and the log messages,  the second patch is a test for the new feature, and the third patch is documentation recommending how to use the warn mode and the associated metrics or log messages to safely switch `alternaor_enforce_authorization` from false to true.

Fixes #25308

This is a feature that users need, so it should probably be backported to live branches.

Closes scylladb/scylladb#25457

* github.com:scylladb/scylladb:
  docs/alternator: explain alternator_warn_authorization
  test/alternator: tests for new auth failure metrics and log messages
  alternator: add alternator_warn_authorization config
2025-11-05 10:45:17 +03:00
Pavel Emelyanov
fc37518aff test: Check file existence directly
There's a test that checks if temporary-statistics file is gone at some
point. It does it by listing the directory it expects the file to be in
and then comparing the names met with the temp. stat. file name.

It looks like a single file_exists() call is enough for that purpose.

As a "sanity" check this patch adds a validation that non-temporary
statistics file is there, all the more so this file is removed after the
test.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#26743
2025-11-04 19:37:55 +01:00
Avi Kivity
95700c5f7f Merge 'Support counters with tablets' from Michael Litvak
Support the counters feature in tablets keyspaces.

The main change is to fix the counter update during tablets intranode migration.

Counter cell is c = map<host_id, value>. A counter update is applied by doing read-modify-write on a leader replica to retrieve the current host's counter value and transform the mutation to contain the updated value for the host, then apply the mutation and replicate it to other hosts. the read-modify-write is protected against concurrent updates by locking the counter cell.

When the counter is migrated between two shards, it's not enough to lock the counter on the read shard, because in the stage write_both_read_new the read shard is switched, and then we can have concurrent updates reach either the old or the new shard. In order to keep the counter update exclusive we lock both shards when in the stage write_both_read_new.

Also, when applying the transformed mutation we need to respect write_both stages and apply the mutation on both shards. We change it to use `apply_on_shards` similarly to other methods in storage proxy.

The change applies to both tablets and vnodes, they use the same implementation, but for vnodes the behavior should remain equivalent up to some small reordering of the code since it doesn't have intranode migration and reduces to single read shard = write shard.

Fixes https://github.com/scylladb/scylladb/issues/18180

no backport - new feature

Closes scylladb/scylladb#26636

* github.com:scylladb/scylladb:
  docs: counters now work with tablets
  pgo: enable counters with tablets
  test: enable counters tests with tablets
  test: add counters with tablets test
  cql3: remove warning when creating keyspace with tablets
  cql3: allow counters with tablets
  storage_proxy: lock all read shards for counter update
  storage_proxy: apply counter mutation on all write shards
  storage_proxy: move counter update coordination to storage proxy
  storage_proxy: refactor mutate_counter_on_leader
  replica/db: add counter update guard
  replica/db: split counter update helper functions
2025-11-03 22:28:10 +01:00
Raphael S. Carvalho
7f34366b9d sstables_loader: Don't bypass synchronization with busy topology
The patch c543059f86 fixed the synchronization issue between tablet
split and load-and-stream. The synchronization worked only with
raft topology, and therefore was disabled with gossip.
To do the check, storage_service::raft_topology_change_enabled()
but the topology kind is only available/set on shard 0, so it caused
the synchronization to be bypassed when load-and-stream runs on
any shard other than 0.

The reason the reproducer didn't catch it is that it was restricted
to single cpu. It will now run with multi cpu and catch the
problem observed.

Fixes #22707

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes scylladb/scylladb#26730
2025-11-03 18:10:08 +01:00
Michael Litvak
8555fd42df docs: counters now work with tablets
Counters are now supported in tablet-enabled keyspaces, so remove
the documentation that listed counters as an unsupported feature
and the note warning users about the limitation.
2025-11-03 16:04:37 +01:00
Michael Litvak
1337f4213f pgo: enable counters with tablets
Now that counters are supported with tablets, update the keyspace
statement for counters to allow it to run with tablets.
2025-11-03 16:04:37 +01:00
Michael Litvak
1dbf53ca29 test: enable counters tests with tablets
Enable all counters-related tests that were disabled for tablets because
counters was not supported with tablets until now.

Some tests were parametrized to run with both vnodes and tablets, and
the tablets case was skipped, in order to not lose coverage. We change
them to run with the default configuration since now counters is
supported with both vnodes and tablets, and the implementation is the
same, so there is no benefit in running them with both configurations.
2025-11-03 16:04:37 +01:00
Michael Litvak
a6c12ed1ef test: add counters with tablets test
add a new test for counters with tablets to test things that are
specific to tablets. test counter updates that are concurrent with
tablet internode and intranode migrations and verify it remains
consistent and no updates are lost.
2025-11-03 16:04:37 +01:00
Michael Litvak
60ac13d75d cql3: remove warning when creating keyspace with tablets
When creating a keyspace with tablets, a warning is shown with all the
unsupported features for tablets, which is only counters currently.

Now that counters is also supported with tablets, we can remove this
warning entirely.
2025-11-03 16:04:37 +01:00
Michael Litvak
9208b2f317 cql3: allow counters with tablets
Now that counters work with tablets, allow to create a table with
counters in a tablets-enabled keyspace, and remove the warning about
counters not being supported when creating a keyspace with tablets.

We allow to use counters with tablets only when all nodes are upgraded
and support counters with tablets. We add a new feature flag to
determine if this is the case.

Fixes scylladb/scylladb#18180
2025-11-03 16:04:37 +01:00
Michael Litvak
296b116ae2 storage_proxy: lock all read shards for counter update
Previously in a counter update we lock the read shard to protect the
counter's read-modify-write against concurrent updates.

This is not sufficient when the counter is migrated between different
shards, because there is a stage where the read shard switches from the
old shard to the new shard, and during that switch there can be
concurrent counter updates on both shards. If each shard takes only its
own lock, the operations will not be exclusive anymore, and this can
cause lost counter updates.

To fix this, we acquire the counter lock on both shards in the stage
write_both_read_new, when both shards can serve reads. This guarantees
that counter updates continue to be exclusive during intranode
migration.
2025-11-03 16:04:35 +01:00
Michael Litvak
de321218bc storage_proxy: apply counter mutation on all write shards
When applying a counter mutation, use apply_on_shards to apply the
mutation on all write shards, similarly to the way other mutations are
applied in the storage proxy. Previously the mutation was applied only
on the current shard which is the read shard.

This is needed to respect the write_both stages of intranode migration
where we need to apply the mutation on both the old and the new shards.
2025-11-03 16:03:29 +01:00
Michael Litvak
c7e7a9e120 storage_proxy: move counter update coordination to storage proxy
Refactor the counter update to split the functions and have them called
by the storage proxy to prepare for a later change.

Previously in mutate_counter the storage proxy calls the replica
function apply_counter_update that does a few things:
1. checks that the operation can be done: check timeout, disk utilization
2. acquire counter locks
3. do read-modify-write and transform the counter mutation
4. apply the mutation in the replica

In this commit we change it so that these functions are split and called
from the storage proxy, so that we have better control from the storage
proxy when we change it later to work across multiple shards. For
example, we will want to acquire locks on multiple shards, transform it
on one shard, and then apply the mutation on multiple shards.

After the change it works as follows in storage proxy:
1. acquire counter locks
2. call replica prepare to check the operation and transform the mutation
3. call replica apply to apply the transformed mutation
2025-11-03 15:59:46 +01:00
Tomasz Grabiec
e878042987 Revert "Revert "tests(lwt): new test for LWT testing during tablet resize""
This reverts commit 6cb14c7793.

The issue causing the previous revert was fixed in 88765f627a.
2025-11-03 10:38:00 +01:00
Michael Litvak
579031cfc8 storage_proxy: refactor mutate_counter_on_leader
Slightly reorganize the mutate counter function to prepare it for a
later change.

Move the code that finds the read shard and invokes the rest of the
function on the read shard to the caller function. This simplifies the
function mutate_counter_on_leader_and_replicate which now runs on the
read shard and will make it easier to extend.
2025-11-03 08:43:11 +01:00
Michael Litvak
7cc6b0d960 replica/db: add counter update guard
Add a RAII guard for counter update that holds the counter locks and the
table operation, and extract the creation of the guard to a separate
function.

This prepares it for a later change where we will want to obtain the
guard externally from the storage proxy.
2025-11-03 08:43:11 +01:00
Michael Litvak
88fd9a34c4 replica/db: split counter update helper functions
Split do_apply_counter_update to a few smaller and simpler functions to
help prepare for a later change.
2025-11-03 08:43:11 +01:00
Avi Kivity
9b6ce030d0 sstables: remove quadratic (and possibly exponential) compile time in parse()
parse() taking a list of elements is quadratic (during compile time) in
that it generates recursive calls to itself, each time with one fewer
parameter. The total size of the parameter lists in all these generated
functions is quadratic in the initial parameter list size.

It's also exponential if we ignore inlining limits, since each .then()
call expands to two branches - a ready future branch and a non-ready
future branch. If the compiler did not give up, we'd have 2^list_len
branches. For sure the compiler does not do so indefinitely, but the effort
getting there is wasted.

Simplify by using a fold expression over the comma operator. Instead
of passing the remaining parameter list in each step, we pass only
the parameter we are processing now, making processing linear, and not
generating unnecessary functions.

It would be better expressed using pack expansion statements, but these
are part of C++26.

The largest offender is probably stats_metadata, with 21 elements.

dev-mode sstables.o:

   text	   data	    bss	    dec	    hex	filename
1760059	   1312	   7673	1769044	 1afe54	sstables.o.before
1745533	   1312	   7673	1754518	 1ac596	sstables.o.after

We save about 15k of text with presumably a corresponding (small)
decrease in compile time.

Closes scylladb/scylladb#26735
2025-11-02 13:09:37 +01:00
Jenkins Promoter
cb30eb2e21 Update pgo profiles - aarch64 2025-11-01 05:23:52 +02:00
Jenkins Promoter
e3a0935482 Update pgo profiles - x86_64 2025-11-01 04:54:49 +02:00
Petr Gusev
88765f627a paxos_state: get_replica_lock: remove shard check
This check is incorrect: the current shard may be looking at
the old version of tablets map:
* an accept RPC comes to replica shard 0, which is already at write_both_read_new
* the new shard is shard 1, so paxos_state::accept is called on shard 1
* shard 1 is still at "streaming" -> shards_ready_for_reads() returns old
shard 0

Fixes scylladb/scylladb#26801

Closes scylladb/scylladb#26809
2025-10-31 21:37:39 +01:00
Avi Kivity
7a72155374 Merge 'Introduce nodetool excludenode' from Tomasz Grabiec
If a node is dead and cannot be brought back, tablet migrations are
stuck, until the node is explicitly marked as "permanently dead" /
"ignored node" / "excluded" (name differs in different contexts).

Currently, this is done during removenode and replace operations but
it should be possible to only mark the node as dead, for the purpose
of unblocking migrations or other topology operations, without doing
the actual removenode, because full removal might be currently
impossible, or not desirable due to lack of capacity or priorities.

This patch introduces this kind of API:

```
  nodetool excludenode <host-id> [ ... <host-id> ]
```

Having this kind of API is an improvement in user experience in
several cases. For example, when we lose a rack, the only viable
option for recovery is to run removenode with an extra
--ignore-dead-nodes option. This removenode will fail in the tablet
draining phase, as there is no live node in the rack to rebuild
replicas in. This is confusing to the operator. But necessary before
ALTER KEYSPACE can proceed in order to change replication options to
drop the rack from RF.

Having this API allows operators to have more unified procedures,
where "nodetool excludenode" is always the first step of recovery,
which unblocks further topology operations, both those which restore
capacity, but also auto-scaling, tablet split/merge, load balancing,
etc.

Fixes #21281

The PR also changes "nodetool status" to show excluded nodes,
they have 'X' in their status instead of 'D'.

Closes scylladb/scylladb#26659

* github.com:scylladb/scylladb:
  nodetool: status: Show excluded nodes as having status 'X'
  test: py: Test scenario involving excludenode API
  nodetool: Introduce excludenode command
2025-10-31 22:14:57 +02:00
Avi Kivity
d458dd41c6 Merge 'Avoid input_/output_stream-s default initialization and move-assignment' from Pavel Emelyanov
Recent seastar update deprecated in/out streams usage pattern when a stream is default constructed early and them move-assigned with the proper one (see scylladb/seastar#3051). This PR fixes few places in Scylla that still use one.

Adopting newer seastar API, no need to backport

Closes scylladb/scylladb#26747

* github.com:scylladb/scylladb:
  commitlog: Remove unused work::r stream variable
  ec2_snitch: Fix indentation after previous patch
  ec2_snitch: Coroutinize the aws_api_call_once()
  sstable: Construct output_stream for data instantly
  test: Don't reuse on-stack input stream
2025-10-31 21:22:41 +02:00
Avi Kivity
adf9c426c2 Merge 'db/config: Change default SSTable compressor to LZ4WithDictsCompressor' from Nikos Dragazis
`sstable_compression_user_table_options` allows configuring a node-global SSTable compression algorithm for user tables via scylla.yaml. The current default is LZ4Compressor (inherited from Cassandra).

Make LZ4WithDictsCompressor the new default. Metrics from real datasets in the field have shown significant improvements in compression ratios.

If the dictionary compression feature is not enabled in the cluster (e.g., during an upgrade), fall back to the `LZ4Compressor`. Once the feature is enabled, flip the default back to the dictionary compressor using with a listener callback.

Fixes #26610.

Closes scylladb/scylladb#26697

* github.com:scylladb/scylladb:
  test/cluster: Add test for default SSTable compressor
  db/config: Change default SSTable compressor to LZ4WithDictsCompressor
  db/config: Deprecate sstable_compression_dictionaries_allow_in_ddl
  boost/cql_query_test: Get expected compressor from config
2025-10-31 21:15:18 +02:00
Lakshmi Narayanan Sreethar
3eb7193458 backlog_controller: compute backlog even when static shares are set
The compaction manager backlog is exposed via metrics, but if static
shares are set, the backlog is never calculated. As a result, there is
no way to determine the backlog and if the static shares need
adjustment. Fix that by calculating backlog even when static shares are
set.

Fixes #26287

Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>

Closes scylladb/scylladb#26778
2025-10-31 18:18:36 +02:00
Michał Hudobski
fd521cee6f test: fix typo in vector_index test
Unfortunately in https://github.com/scylladb/scylladb/pull/26508
a typo than changes a behavior of a test was introduced.
This patch fixes the typo.

Closes scylladb/scylladb#26803
2025-10-31 18:35:02 +03:00
Tomasz Grabiec
284c73d466 scripts: pull_github_pr.sh: Fix auth problem detection
Before the patch, the script printed:

parse error: Invalid numeric literal at line 2, column 0

Closes scylladb/scylladb#26818
2025-10-31 18:32:58 +03:00
Michael Litvak
e7dbccd59e cdc: use chunked_vector instead of vector for stream ids
use utils::chunked_vector instead of std::vector to store cdc stream
sets for tablets.

a cdc stream set usually represents all streams for a specific table and
timestamp, and has a stream id per each tablet of the table. each stream
id is represented by 16 bytes. thus the vector could require quite large
contiguous allocations for a table that has many tablets. change it to
chunked_vector to avoid large contiguous allocations.

Fixes scylladb/scylladb#26791

Closes scylladb/scylladb#26792
2025-10-31 13:02:34 +01:00
Tomasz Grabiec
1c0d847281 Merge 'load_balancer: load_stats reconcile after tablet migration and table resize' from Ferenc Szili
This change adds the ability to move tablets sizes in load_stats after a tablet migration or table resize (split/merge). This is needed because the size based load balancer needs to have tablet size data which is as accurate as possible, in order to work on fresh tablet size distribution and issue correct tablet migrations.

This is the second part of the size based load balancing changes:

- First part for tablet size collection via load_stats: #26035
- Second part reconcile load_stats: #26152
- The third part for load_sketch changes: #26153
- The fourth part which performs tablet load balancing based on tablet size: #26254

This is a new feature and backport is not needed.

Closes scylladb/scylladb#26152

* github.com:scylladb/scylladb:
  load_balancer: load_stats reconcile after tablet migration and table resize
  load_stats: change data structure which contains tablet sizes
2025-10-31 09:58:25 +01:00
Tomasz Grabiec
2bd173da97 nodetool: status: Show excluded nodes as having status 'X'
Example:

$ build/dev/scylla nodetool status
Datacenter: dc1
===============
Status=Up/Down/eXcluded
|/ State=Normal/Leaving/Joining/Moving
-- Address   Load      Tokens Owns Host ID                              Rack
UN 127.0.0.1 783.42 KB 1      ?    753cb7b0-1b90-4614-ae17-2cfe470f5104 rack1
XN 127.0.0.2 785.10 KB 1      ?    92ccdd23-5526-4863-844a-5c8e8906fa55 rack2
UN 127.0.0.3 708.91 KB 1      ?    781646ad-c85b-4d77-b7e3-8d50c34f1f17 rack3
2025-10-31 09:03:20 +01:00
Tomasz Grabiec
87492d3073 test: py: Test scenario involving excludenode API 2025-10-31 09:03:20 +01:00
Tomasz Grabiec
55ecd92feb nodetool: Introduce excludenode command
If a node is dead and cannot be brought back, tablet migrations are
stuck, until the node is explicitly marked as "permanently dead" /
"ignored node" / "excluded" (name differs in different contexts).

Currently, this is done during removenode and replace operations but
it should be possible to only mark the node as dead, for the purpose
of unblocking migrations or other topology operations, without doing
the actual removenode, because full removal might be currently
impossible, or not desirable due to lack of capacity or priorities.

This patch introduces this kind of API:

  nodetool excludenode <host-id> [ ... <host-id> ]

Having this kind of API is an improvement in user experience in
several cases. For example, when we lose a rack, the only viable
option for recovery is to run removenode with an extra
--ignore-dead-nodes option. This removenode will fail in the tablet
draining phase, as there is no live node in the rack to rebuild
replicas in. This is confusing to the operator. But necessary before
ALTER KEYSPACE can proceed in order to change replication options to
drop the rack from RF.

Having this API allows operators to have more unified procedures,
where "nodetool excludenode" is always the first step of recovery,
which unblocks further topology operations, both those which restore
capacity, but also auto-scaling, tablet split/merge, load balancing,
etc.

Fixes #21281
2025-10-31 09:03:20 +01:00
Nikos Dragazis
a0bf932caa test/cluster: Add test for default SSTable compressor
The previous patch made the default compressor dependent on the
SSTABLE_COMPRESSION_DICTS feature:
* LZ4Compressor if the feature is disabled
* LZ4WithDictsCompressor if the feature is enabled

Add a test to verify that the cluster uses the right default in every
case.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
2025-10-30 15:53:54 +02:00
Nikos Dragazis
2fc812a1b9 db/config: Change default SSTable compressor to LZ4WithDictsCompressor
`sstable_compression_user_table_options` allows configuring a
node-global SSTable compression algorithm for user tables via
scylla.yaml. The current default is `LZ4Compressor` (inherited from
Cassandra).

Make `LZ4WithDictsCompressor` the new default. Metrics from real datasets
in the field have shown significant improvements in compression ratios.

If the dictionary compression feature is not enabled in the cluster
(e.g., during an upgrade), fall back to the `LZ4Compressor`. Once the
feature is enabled, flip the default back to the dictionary compressor
using with a listener callback.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
2025-10-30 15:53:49 +02:00
Aleksandra Martyniuk
fdd623e6bc api: storage_service: tasks: unify upgrade_sstable
Currently, all apis that start a compaction have two versions:
synchronous and asynchronous. They share most of the implementation,
but some checks and params have diverged.

Unify the handlers of /storage_service/keyspace_upgrade_sstables/{keyspace}
and /tasks/compaction/keyspace_upgrade_sstables/{keyspace}.
2025-10-30 11:42:48 +01:00
Aleksandra Martyniuk
044b001bb4 api: storage_service: tasks: force_keyspace_cleanup
Currently, all apis that start a compaction have two versions:
synchronous and asynchronous. They share most of the implementation,
but some checks and params have diverged.

Unify the handlers of /storage_service/keyspace_cleanup/{keyspace}
and /tasks/compaction/keyspace_cleanup/{keyspace}.
2025-10-30 11:42:47 +01:00
Aleksandra Martyniuk
12dabdec66 api: storage_service: tasks: unify force_keyspace_compaction
Currently, all apis that start a compaction have two versions:
synchronous and asynchronous. They share most of the implementation,
but some checks and params have diverged.

Add consider_only_existing_data parameter to /tasks/compaction/keyspace_compaction/{keyspace},
to match the synchronous version of the api (/storage_service/keyspace_compaction/{keyspace}).

Unify the handlers of both apis.
2025-10-30 11:33:17 +01:00
Nikos Dragazis
96e727d7b9 db/config: Deprecate sstable_compression_dictionaries_allow_in_ddl
The option is a knob that allows to reject dictionary-aware compressors
in the validation stage of CREATE/ALTER statements, and in the
validation of `sstable_compression_user_table_options`. It was
introduced in 7d26d3c7cb to allow the admins of Scylla Cloud to
selectively enable it in certain clusters. For more details, check:
https://github.com/scylladb/scylla-enterprise/issues/5435

As of this series, we want to start offering dictionary compression as
the default option in all clusters, i.e., treat it as a generally
available feature. This makes the knob redundant.

Additionally, making dictionary compression the default choice in
`sstable_compression_user_table_options` creates an awkward dependency
with the knob (disabling the knob should cause
`sstable_compression_user_table_options` to fall back to a non-dict
compressor as default). That may not be very clear to the end user.

For these reasons, mark the option as "Deprecated", remove all relevant
tests, and adjust the business logic as if dictionary compression is
always available.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
2025-10-29 20:13:08 +02:00
Nikos Dragazis
d95ebe7058 boost/cql_query_test: Get expected compressor from config
Since 5b6570be52, the default SSTable compression algorithm for user
tables is no longer hardcoded; it can be configured via the
`sstable_compression_user_table_options.sstable_compression` option in
scylla.yaml.

Modify the `test_table_compression` test to get the expected value from
the configuration.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
2025-10-29 14:52:43 +02:00
Nadav Har'El
492c664fbb docs/alternator: explain alternator_warn_authorization
The previous patches added the ability to set
alternator_warn_authorization. In this patch we add to our
documentation a recommendation that this setting be used as an
intermediate step when wanting to change alternator_enforce_authorization
from "false" to "true". We explain why this is useful and important.

The new documentation is in docs/alternator/compatibility.md, where
we previously explained the alternator_enforce_authorization configuration.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2025-10-29 11:16:29 +02:00
Nadav Har'El
2dbd1a85a3 test/alternator: tests for new auth failure metrics and log messages
This patch adds to test_metrics.py tests that authentication and
authorization errors increment, respectively, the new metrics

    scylla_alternator_authentication_failures
    scylla_alternator_authorization_failures

This patch also adds in test_logs.py tests that verify that that log
messages are generated on different types of authentication/authorization
failures.

The tests also check how configuring alternator_enforce_authorization
and alternator_warn_authorization changes these behaviors:
  * alternator_enforce_authorization determines whether an auth error
    will cause the request to fail, or the failure is counted but then
    ignored.
  * alternator_warn_authorization determines whether an auth error will
    cause a WARN-level log message to be generated (and also the failure
    is counted.
  * If both configuration flags are false, Alternator doesn't even
    attempt to check authentication or authorization - so errors aren't
    even counted.

Because the new tests live-update the alternator_*_authorization
configuration options, they also serve as a test that live-updating
this option works correctly.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2025-10-29 11:16:29 +02:00
Nadav Har'El
51186b2f2c alternator: add alternator_warn_authorization config
Before this patch, the configuration alternator_enforce_authorization
is a boolean: true means enforce authentication checks (i.e., each
request is signed by a valid user) and authorization checks (the user
who signed the request is allowed by RBAC to perform this request).

This patch adds a second boolean configuration option,
alternator_warn_authorization. When alternator_enforce_authorization
is false but alternator_warn_authorization is true, authentication and
authorization checks are performed as in enforce mode, but failures
are ignored and counted in two new metrics:

    scylla_alternator_authentication_failures
    scylla_alternator_authorization_failures

additionally,also each authentication or authorization error is logged as
a WARN-level log message. Some users prefer those log messages over
metrics, as the log messages contain additional information about the
failure that can be useful - such as the address of the misconfigured
client, or the username attempted in the request.

All combinations of the two configuration options are allowed:
 * If just "enforce" is true, auth failures cause a request failure.
   The failures are counted, but not logged.
 * If both "enforce" and "warn" are true, auth failures cause a request
   failure. The failures are both counted and logged.
 * If just "warn" is true, auth failures are ignored (the request
   is allowed to compelete) but are counted and logged.
 * If neither "enforce" nor "warn" are true, no authentication or
   authorization check are done at all. So we don't know about failures,
   so naturally we don't count them and don't log them.

This patch is fairly straightforward, doing mainly the following
things:

1. Add an alternator_warn_authorization config parameter.

2. Make sure alternator_enforce_authorization is live-updatable (we'll
   use this in a test in the next patch). It "almost" was, but a typo
   prevented the live update from working properly.

3. Add the two new metrics, and increment them in every type of
   authentication or authorization error.
   Some code that needs to increment these new metrics didn't have
   access to the "stats" object, so we had to pass it around more.

4. Add log messages when alternator_warn_authorization is true.

5. If alternator_enforce_authorization is false, allow the auth check
   to allow the request to proceed (after having counted and/or logged
   the auth error).

A separate patch will follow and add documentation suggesting to users
how to use the new "warn" options to safely switch between non-enforcing
to enforcing mode. Another patch will add tests for the new configuration
options, new metrics and new log messages.

Fixes #25308.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2025-10-29 11:16:26 +02:00
Pavel Emelyanov
e99c8eee08 commitlog: Remove unused work::r stream variable
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-10-28 19:46:29 +03:00
Pavel Emelyanov
92462e502f ec2_snitch: Fix indentation after previous patch
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-10-28 19:31:08 +03:00
Pavel Emelyanov
7640ade04d ec2_snitch: Coroutinize the aws_api_call_once()
The method connects a socket, grabs in/out streams from it then writes
HTTP request and reads+parses the response. For that it uses class
variables for socket and streams, but there's no real need for that --
all three actually exists throughput the method "lifetime".

To fix it, coroutinizes the method. The same could be achieved my moving
the connected socket and streams into do_with() context, but coroutine
is better than that.

(indentation is left broken)

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-10-28 19:29:25 +03:00
Pavel Emelyanov
5d89816fed sstable: Construct output_stream for data instantly
This changes makes local output_stream variable be constructed in the
declaration statement with the help of ternary operator thus avoiding
both -- default-initialization and move-assignment depending on the
standalone condition checking.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-10-28 19:27:22 +03:00
Pavel Emelyanov
37b9cccc1c test: Don't reuse on-stack input stream
The test consists of several snippets, each creating an input_stream for
some short operation and checking the result. Each snipped over-writes
the local `input_stream in` variable with the new one.

This change wraps each of those snippets into own code block in order to
have own new `input_stream in` variable in each.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2025-10-28 19:25:07 +03:00
Ferenc Szili
10f07fb95a load_balancer: load_stats reconcile after tablet migration and table resize
This change adds the ability to move tablets sizes in load_stats after a
tablet migration or table resize (split/merge). This is needed because
the size based load balancer needs to have tablet size data which is as
accurate as possible, in order to issue migrations which improve
load balance.
2025-10-28 12:12:09 +01:00
Ferenc Szili
b4ca12b39a load_stats: change data structure which contains tablet sizes
This patch changes the tablet size map in load_stats. Previously, this
data structure was:

std::unordered_map<range_based_tablet_id, uint64_t> tablet_sizes;

and is changed into:

std::unordered_map<table_id, std::unordered_map<dht::token_range, uint64_t>> tablet_sizes;

This allows for improved performance of tablet tablet size reconciliation.
2025-10-24 14:37:00 +02:00
109 changed files with 2233 additions and 740 deletions

1
.gitignore vendored
View File

@@ -37,4 +37,3 @@ clang_build
.idea/
nuke
rust/target

View File

@@ -136,6 +136,7 @@ future<> controller::start_server() {
[this, addr, alternator_port, alternator_https_port, creds = std::move(creds)] (server& server) mutable {
return server.init(addr, alternator_port, alternator_https_port, creds,
_config.alternator_enforce_authorization,
_config.alternator_warn_authorization,
_config.alternator_max_users_query_size_in_trace_output,
&_memory_limiter.local().get_semaphore(),
_config.max_concurrent_requests_per_shard);

View File

@@ -245,7 +245,8 @@ executor::executor(gms::gossiper& gossiper,
_mm(mm),
_sdks(sdks),
_cdc_metadata(cdc_metadata),
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization()),
_enforce_authorization(_proxy.data_dictionary().get_config().alternator_enforce_authorization),
_warn_authorization(_proxy.data_dictionary().get_config().alternator_warn_authorization),
_ssg(ssg),
_parsed_expression_cache(std::make_unique<parsed::expression_cache>(
parsed::expression_cache::config{_proxy.data_dictionary().get_config().alternator_max_expression_cache_entries_per_shard},
@@ -881,15 +882,37 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
co_return rjson::print(std::move(response));
}
// This function increments the authorization_failures counter, and may also
// log a warn-level message and/or throw an access_denied exception, depending
// on what enforce_authorization and warn_authorization are set to.
// Note that if enforce_authorization is false, this function will return
// without throwing. So a caller that doesn't want to continue after an
// authorization_error must explicitly return after calling this function.
static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) {
stats.authorization_failures++;
if (enforce_authorization) {
if (warn_authorization) {
elogger.warn("alternator_warn_authorization=true: {}", msg);
}
throw api_error::access_denied(std::move(msg));
} else {
if (warn_authorization) {
elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg);
}
}
}
// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY,
// SELECT, DROP, etc.) on the given table. When permission is denied an
// appropriate user-readable api_error::access_denied is thrown.
future<> verify_permission(
bool enforce_authorization,
bool warn_authorization,
const service::client_state& client_state,
const schema_ptr& schema,
auth::permission permission_to_check) {
if (!enforce_authorization) {
auth::permission permission_to_check,
alternator::stats& stats) {
if (!enforce_authorization && !warn_authorization) {
co_return;
}
// Unfortunately, the fix for issue #23218 did not modify the function
@@ -904,31 +927,33 @@ future<> verify_permission(
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
throw api_error::access_denied(fmt::format(
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
"Write access denied on internal table {}.{} to role {} because it is not a superuser",
schema->ks_name(), schema->cf_name(), username));
co_return;
}
}
auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name());
if (!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
if (!client_state.user() || !client_state.user()->name ||
!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
sstring username = "<anonymous>";
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
// Using exceptions for errors makes this function faster in the
// success path (when the operation is allowed).
throw api_error::access_denied(format(
"{} access on table {}.{} is denied to role {}",
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
"{} access on table {}.{} is denied to role {}, client address {}",
auth::permissions::to_string(permission_to_check),
schema->ks_name(), schema->cf_name(), username));
schema->ks_name(), schema->cf_name(), username, client_state.get_client_address()));
}
}
// Similar to verify_permission() above, but just for CREATE operations.
// Those do not operate on any specific table, so require permissions on
// ALL KEYSPACES instead of any specific table.
future<> verify_create_permission(bool enforce_authorization, const service::client_state& client_state) {
if (!enforce_authorization) {
static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) {
if (!enforce_authorization && !warn_authorization) {
co_return;
}
auto resource = auth::resource(auth::resource_kind::data);
@@ -937,7 +962,7 @@ future<> verify_create_permission(bool enforce_authorization, const service::cli
if (client_state.user() && client_state.user()->name) {
username = client_state.user()->name.value();
}
throw api_error::access_denied(format(
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
"CREATE access on ALL KEYSPACES is denied to role {}", username));
}
}
@@ -954,7 +979,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
schema_ptr schema = get_table(_proxy, request);
rjson::value table_description = co_await fill_table_description(schema, table_status::deleting, _proxy, client_state, trace_state, permit);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::DROP);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::DROP, _stats);
co_await _mm.container().invoke_on(0, [&, cs = client_state.move_to_other_shard()] (service::migration_manager& mm) -> future<> {
size_t retries = mm.get_concurrent_ddl_retries();
for (;;) {
@@ -1292,7 +1317,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
if (tags->Size() < 1) {
co_return api_error::validation("The number of tags must be at least 1") ;
}
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::add_tags);
});
@@ -1313,7 +1338,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
schema_ptr schema = get_table_from_arn(_proxy, rjson::to_string_view(*arn));
get_stats_from_schema(_proxy, *schema)->api_operations.untag_resource++;
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [tags](std::map<sstring, sstring>& tags_map) {
update_tags_map(*tags, tags_map, update_tags_action::delete_tags);
});
@@ -1516,7 +1541,7 @@ static future<> mark_view_schemas_as_built(utils::chunked_vector<mutation>& out,
}
}
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization) {
static future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, bool enforce_authorization, bool warn_authorization, stats& stats) {
SCYLLA_ASSERT(this_shard_id() == 0);
// We begin by parsing and validating the content of the CreateTable
@@ -1722,7 +1747,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
set_table_creation_time(tags_map, db_clock::now());
builder.add_extension(db::tags_extension::NAME, ::make_shared<db::tags_extension>(tags_map));
co_await verify_create_permission(enforce_authorization, client_state);
co_await verify_create_permission(enforce_authorization, warn_authorization, client_state, stats);
schema_ptr schema = builder.build();
for (auto& view_builder : view_builders) {
@@ -1823,9 +1848,9 @@ future<executor::request_return_type> executor::create_table(client_state& clien
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization)]
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container(), client_state_other_shard = client_state.move_to_other_shard(), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization)]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization);
co_return co_await create_table_on_shard0(client_state_other_shard.get(), tr, std::move(request), sp.local(), mm, g.local(), enforce_authorization, warn_authorization, _stats);
});
}
@@ -1878,7 +1903,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
verify_billing_mode(request);
}
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request]
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state)), enforce_authorization = bool(_enforce_authorization), warn_authorization = bool(_warn_authorization), client_state_other_shard = client_state.move_to_other_shard(), empty_request, &e = this->container()]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
schema_ptr schema;
size_t retries = mm.get_concurrent_ddl_retries();
@@ -2049,7 +2074,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
co_return api_error::validation("UpdateTable requires one of GlobalSecondaryIndexUpdates, StreamSpecification or BillingMode to be specified");
}
co_await verify_permission(enforce_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER);
co_await verify_permission(enforce_authorization, warn_authorization, client_state_other_shard.get(), schema, auth::permission::ALTER, e.local()._stats);
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, std::vector<view_ptr>(), group0_guard.write_timestamp());
for (view_ptr view : new_views) {
auto m2 = co_await service::prepare_new_view_announcement(p.local(), view, group0_guard.write_timestamp());
@@ -2817,7 +2842,7 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -2921,7 +2946,7 @@ future<executor::request_return_type> executor::delete_item(client_state& client
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -3199,7 +3224,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
per_table_wcu.emplace_back(std::make_pair(per_table_stats, schema));
}
for (const auto& b : mutation_builders) {
co_await verify_permission(_enforce_authorization, client_state, b.first, auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, b.first, auth::permission::MODIFY, _stats);
}
// If alternator_force_read_before_write is true we will first get the previous item size
// and only then do send the mutation.
@@ -4425,7 +4450,7 @@ future<executor::request_return_type> executor::update_item(client_state& client
tracing::add_table_name(trace_state, op->schema()->ks_name(), op->schema()->cf_name());
const bool needs_read_before_write = _proxy.data_dictionary().get_config().alternator_force_read_before_write() || op->needs_read_before_write();
co_await verify_permission(_enforce_authorization, client_state, op->schema(), auth::permission::MODIFY);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, op->schema(), auth::permission::MODIFY, _stats);
auto cas_shard = op->shard_for_execute(needs_read_before_write);
@@ -4536,7 +4561,7 @@ future<executor::request_return_type> executor::get_item(client_state& client_st
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem");
rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM);
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
service::storage_proxy::coordinator_query_result qr =
co_await _proxy.query(
schema, std::move(command), std::move(partition_ranges), cl,
@@ -4668,7 +4693,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
}
for (const table_requests& tr : requests) {
co_await verify_permission(_enforce_authorization, client_state, tr.schema, auth::permission::SELECT);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats);
}
_stats.api_operations.batch_get_item_batch_total += batch_size;
@@ -5128,10 +5153,11 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
filter filter,
query::partition_slice::option_set custom_opts,
service::client_state& client_state,
cql3::cql_stats& cql_stats,
alternator::stats& stats,
tracing::trace_state_ptr trace_state,
service_permit permit,
bool enforce_authorization) {
bool enforce_authorization,
bool warn_authorization) {
lw_shared_ptr<service::pager::paging_state> old_paging_state = nullptr;
tracing::trace(trace_state, "Performing a database query");
@@ -5158,7 +5184,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
old_paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
}
co_await verify_permission(enforce_authorization, client_state, table_schema, auth::permission::SELECT);
co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats);
auto regular_columns =
table_schema->regular_columns() | std::views::transform(&column_definition::id)
@@ -5194,9 +5220,9 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state));
}
if (has_filter) {
cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
// update our "filtered_row_matched_total" for all the rows matched, despited the filter
cql_stats.filtered_rows_matched_total += size;
stats.cql_stats.filtered_rows_matched_total += size;
}
if (opt_items) {
if (opt_items->size() >= max_items_for_rapidjson_array) {
@@ -5320,7 +5346,7 @@ future<executor::request_return_type> executor::scan(client_state& client_state,
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan");
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), query::partition_slice::option_set(), client_state, _stats.cql_stats, trace_state, std::move(permit), _enforce_authorization);
std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization);
}
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
@@ -5801,7 +5827,7 @@ future<executor::request_return_type> executor::query(client_state& client_state
query::partition_slice::option_set opts;
opts.set_if<query::partition_slice::option::reversed>(!forward);
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), opts, client_state, _stats.cql_stats, std::move(trace_state), std::move(permit), _enforce_authorization);
std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization);
}
future<executor::request_return_type> executor::list_tables(client_state& client_state, service_permit permit, rjson::value request) {

View File

@@ -139,6 +139,7 @@ class executor : public peering_sharded_service<executor> {
db::system_distributed_keyspace& _sdks;
cdc::metadata& _cdc_metadata;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
// An smp_service_group to be used for limiting the concurrency when
// forwarding Alternator request between shards - if necessary for LWT.
smp_service_group _ssg;
@@ -264,7 +265,7 @@ bool is_big(const rjson::value& val, int big_size = 100'000);
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
// SELECT, DROP, etc.) on the given table. When permission is denied an
// appropriate user-readable api_error::access_denied is thrown.
future<> verify_permission(bool enforce_authorization, const service::client_state&, const schema_ptr&, auth::permission);
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
/**
* Make return type for serializing the object "streamed",

View File

@@ -31,6 +31,7 @@
#include "utils/overloaded_functor.hh"
#include "utils/aws_sigv4.hh"
#include "client_data.hh"
#include "utils/updateable_value.hh"
static logging::logger slogger("alternator-server");
@@ -270,24 +271,57 @@ protected:
}
};
// This function increments the authentication_failures counter, and may also
// log a warn-level message and/or throw an exception, depending on what
// enforce_authorization and warn_authorization are set to.
// The username and client address are only used for logging purposes -
// they are not included in the error message returned to the client, since
// the client knows who it is.
// Note that if enforce_authorization is false, this function will return
// without throwing. So a caller that doesn't want to continue after an
// authentication_error must explicitly return after calling this function.
template<typename Exception>
static void authentication_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, Exception&& e, std::string_view user, gms::inet_address client_address) {
stats.authentication_failures++;
if (enforce_authorization) {
if (warn_authorization) {
slogger.warn("alternator_warn_authorization=true: {} for user {}, client address {}", e.what(), user, client_address);
}
throw std::move(e);
} else {
if (warn_authorization) {
slogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {} for user {}, client address {}", e.what(), user, client_address);
}
}
}
future<std::string> server::verify_signature(const request& req, const chunked_content& content) {
if (!_enforce_authorization) {
if (!_enforce_authorization.get() && !_warn_authorization.get()) {
slogger.debug("Skipping authorization");
return make_ready_future<std::string>();
}
auto host_it = req._headers.find("Host");
if (host_it == req._headers.end()) {
throw api_error::invalid_signature("Host header is mandatory for signature verification");
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature("Host header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
}
auto authorization_it = req._headers.find("Authorization");
if (authorization_it == req._headers.end()) {
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::missing_authentication_token("Authorization header is mandatory for signature verification"),
"", req.get_client_address());
return make_ready_future<std::string>();
}
std::string host = host_it->second;
std::string_view authorization_header = authorization_it->second;
auto pos = authorization_header.find_first_of(' ');
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header)),
"", req.get_client_address());
return make_ready_future<std::string>();
}
authorization_header.remove_prefix(pos+1);
std::string credential;
@@ -322,7 +356,9 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
throw api_error::validation(fmt::format("Incorrect credential information format: {}", credential));
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::validation(fmt::format("Incorrect credential information format: {}", credential)), "", req.get_client_address());
return make_ready_future<std::string>();
}
std::string user(credential_split[0]);
std::string datestamp(credential_split[1]);
@@ -346,7 +382,7 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
return get_key_from_roles(proxy, as, std::move(username));
};
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
return _key_cache.get_ptr(user, cache_getter).then_wrapped([this, &req, &content,
user = std::move(user),
host = std::move(host),
datestamp = std::move(datestamp),
@@ -354,18 +390,32 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
signed_headers_map = std::move(signed_headers_map),
region = std::move(region),
service = std::move(service),
user_signature = std::move(user_signature)] (key_cache::value_ptr key_ptr) {
user_signature = std::move(user_signature)] (future<key_cache::value_ptr> key_ptr_fut) {
key_cache::value_ptr key_ptr(nullptr);
try {
key_ptr = key_ptr_fut.get();
} catch (const api_error& e) {
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
e, user, req.get_client_address());
return std::string();
}
std::string signature;
try {
signature = utils::aws::get_signature(user, *key_ptr, std::string_view(host), "/", req._method,
datestamp, signed_headers_str, signed_headers_map, &content, region, service, "");
} catch (const std::exception& e) {
throw api_error::invalid_signature(e.what());
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::invalid_signature(fmt::format("invalid signature: {}", e.what())),
user, req.get_client_address());
return std::string();
}
if (signature != std::string_view(user_signature)) {
_key_cache.remove(user);
throw api_error::unrecognized_client("The security token included in the request is invalid.");
authentication_error(_executor._stats, _enforce_authorization.get(), _warn_authorization.get(),
api_error::unrecognized_client("wrong signature"),
user, req.get_client_address());
return std::string();
}
return user;
});
@@ -618,7 +668,6 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
, _auth_service(auth_service)
, _sl_controller(sl_controller)
, _key_cache(1024, 1min, slogger)
, _enforce_authorization(false)
, _max_users_query_size_in_trace_output(1024)
, _enabled_servers{}
, _pending_requests("alternator::server::pending_requests")
@@ -700,10 +749,11 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos
}
future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests) {
_memory_limiter = memory_limiter;
_enforce_authorization = std::move(enforce_authorization);
_warn_authorization = std::move(warn_authorization);
_max_concurrent_requests = std::move(max_concurrent_requests);
_max_users_query_size_in_trace_output = std::move(max_users_query_size_in_trace_output);
if (!port && !https_port) {

View File

@@ -47,6 +47,7 @@ class server : public peering_sharded_service<server> {
key_cache _key_cache;
utils::updateable_value<bool> _enforce_authorization;
utils::updateable_value<bool> _warn_authorization;
utils::updateable_value<uint64_t> _max_users_query_size_in_trace_output;
utils::small_vector<std::reference_wrapper<seastar::httpd::http_server>, 2> _enabled_servers;
named_gate _pending_requests;
@@ -99,7 +100,7 @@ public:
server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller);
future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
utils::updateable_value<bool> enforce_authorization, utils::updateable_value<bool> warn_authorization, utils::updateable_value<uint64_t> max_users_query_size_in_trace_output,
semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
future<> stop();
// get_client_data() is called (on each shard separately) when the virtual

View File

@@ -188,6 +188,16 @@ static void register_metrics_with_optional_table(seastar::metrics::metric_groups
seastar::metrics::make_total_operations("expression_cache_misses", stats.expression_cache.requests[stats::expression_types::PROJECTION_EXPRESSION].misses,
seastar::metrics::description("Counts number of misses of cached expressions"), labels)(expression_label("ProjectionExpression")).aggregate(aggregate_labels).set_skip_when_empty()
});
// Only register the following metrics for the global metrics, not per-table
if (!has_table) {
metrics.add_group("alternator", {
seastar::metrics::make_counter("authentication_failures", stats.authentication_failures,
seastar::metrics::description("total number of authentication failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
seastar::metrics::make_counter("authorization_failures", stats.authorization_failures,
seastar::metrics::description("total number of authorization failures"), labels).aggregate({seastar::metrics::shard_label}).set_skip_when_empty(),
});
}
}
void register_metrics(seastar::metrics::metric_groups& metrics, const stats& stats) {

View File

@@ -105,6 +105,17 @@ public:
// The sizes are the the written items' sizes grouped per table.
utils::estimated_histogram batch_write_item_op_size_kb{30};
} operation_sizes;
// Count of authentication and authorization failures, counted if either
// alternator_enforce_authorization or alternator_warn_authorization are
// set to true. If both are false, no authentication or authorization
// checks are performed, so failures are not recognized or counted.
// "authentication" failure means the request was not signed with a valid
// user and key combination. "authorization" failure means the request was
// authenticated to a valid user - but this user did not have permissions
// to perform the operation (considering RBAC settings and the user's
// superuser status).
uint64_t authentication_failures = 0;
uint64_t authorization_failures = 0;
// Miscellaneous event counters
uint64_t total_operations = 0;
uint64_t unsupported_operations = 0;

View File

@@ -827,7 +827,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::SELECT);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
db::consistency_level cl = db::consistency_level::LOCAL_QUORUM;
partition_key pk = iter.shard.id.to_partition_key(*schema);

View File

@@ -95,7 +95,7 @@ future<executor::request_return_type> executor::update_time_to_live(client_state
}
sstring attribute_name(v->GetString(), v->GetStringLength());
co_await verify_permission(_enforce_authorization, client_state, schema, auth::permission::ALTER);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::ALTER, _stats);
co_await db::modify_tags(_mm, schema->ks_name(), schema->cf_name(), [&](std::map<sstring, sstring>& tags_map) {
if (enabled) {
if (tags_map.contains(TTL_TAG_KEY)) {

View File

@@ -220,6 +220,25 @@
}
]
},
{
"path":"/storage_service/nodes/excluded",
"operations":[
{
"method":"GET",
"summary":"Retrieve host ids of nodes which are marked as excluded",
"type":"array",
"items":{
"type":"string"
},
"nickname":"get_excluded_nodes",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/storage_service/nodes/joining",
"operations":[
@@ -1571,6 +1590,30 @@
}
]
},
{
"path":"/storage_service/exclude_node",
"operations":[
{
"method":"POST",
"summary":"Marks the node as permanently down (excluded).",
"type":"void",
"nickname":"exclude_node",
"produces":[
"application/json"
],
"parameters":[
{
"name":"hosts",
"description":"Comma-separated list of host ids to exclude",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/removal_status",
"operations":[

View File

@@ -42,6 +42,14 @@
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"consider_only_existing_data",
"description":"Set to \"true\" to flush all memtables and force tombstone garbage collection to check only the sstables being compacted (false by default). The memtable, commitlog and other uncompacted sstables will not be checked during tombstone garbage collection.",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
}
]
}

View File

@@ -844,6 +844,25 @@ rest_remove_node(sharded<service::storage_service>& ss, std::unique_ptr<http::re
});
}
static
future<json::json_return_type>
rest_exclude_node(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto hosts = utils::split_comma_separated_list(req->get_query_param("hosts"))
| std::views::transform([] (const sstring& s) { return locator::host_id(utils::UUID(s)); })
| std::ranges::to<std::vector<locator::host_id>>();
auto& topo = ss.local().get_token_metadata().get_topology();
for (auto host : hosts) {
if (!topo.has_node(host)) {
throw bad_param_exception(fmt::format("Host ID {} does not belong to this cluster", host));
}
}
apilog.info("exclude_node: hosts={}", hosts);
co_await ss.local().mark_excluded(hosts);
co_return json_void();
}
static
future<json::json_return_type>
rest_get_removal_status(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
@@ -1769,6 +1788,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::decommission.set(r, rest_bind(rest_decommission, ss));
ss::move.set(r, rest_bind(rest_move, ss));
ss::remove_node.set(r, rest_bind(rest_remove_node, ss));
ss::exclude_node.set(r, rest_bind(rest_exclude_node, ss));
ss::get_removal_status.set(r, rest_bind(rest_get_removal_status, ss));
ss::force_remove_completion.set(r, rest_bind(rest_force_remove_completion, ss));
ss::set_logging_level.set(r, rest_bind(rest_set_logging_level));
@@ -1846,6 +1866,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::decommission.unset(r);
ss::move.unset(r);
ss::remove_node.unset(r);
ss::exclude_node.unset(r);
ss::get_removal_status.unset(r);
ss::force_remove_completion.unset(r);
ss::set_logging_level.unset(r);

View File

@@ -38,76 +38,78 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
};
}
static future<shared_ptr<compaction::major_keyspace_compaction_task_impl>> force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush && !consider_only_existing_data) {
fmopt = compaction::flush_mode::skip;
}
return compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
}
static future<shared_ptr<compaction::upgrade_sstables_compaction_task_impl>> upgrade_sstables(http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) {
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
return compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
}
static future<shared_ptr<compaction::cleanup_keyspace_compaction_task_impl>> force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
if (rs.is_local() || !rs.is_vnode_based()) {
auto reason = rs.is_local() ? "require" : "support";
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
co_return nullptr;
}
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
co_return co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
}
void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& snap_ctl) {
t::force_keyspace_compaction_async.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& db = ctx.db;
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
apilog.debug("force_keyspace_compaction_async: keyspace={} tables={}, flush={}", keyspace, table_infos, flush);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush) {
fmopt = compaction::flush_mode::skip;
}
auto task = co_await compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt);
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
co_return json::json_return_type(task->get_status().id.to_sstring());
});
ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& db = ctx.db;
auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf");
auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true);
auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false);
apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
std::optional<compaction::flush_mode> fmopt;
if (!flush && !consider_only_existing_data) {
fmopt = compaction::flush_mode::skip;
}
auto task = co_await compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
co_await task->done();
co_return json_void();
});
t::force_keyspace_cleanup_async.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
apilog.info("force_keyspace_cleanup_async: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup_async: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
tasks::task_id id = tasks::task_id::create_null_id();
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
id = task->get_status().id;
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>({}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
co_return json::json_return_type(task->get_status().id.to_sstring());
co_return json::json_return_type(id.to_sstring());
});
ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
if (rs.is_local() || !rs.is_vnode_based()) {
auto reason = rs.is_local() ? "require" : "support";
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
co_return json::json_return_type(0);
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
co_await task->done();
}
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
co_await task->done();
co_return json::json_return_type(0);
});
@@ -129,25 +131,12 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
}));
t::upgrade_sstables_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
co_return json::json_return_type(task->get_status().id.to_sstring());
}));
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {
auto& db = ctx.db;
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos));
co_await task->done();
co_return json::json_return_type(0);
}));

View File

@@ -62,6 +62,17 @@ void set_token_metadata(http_context& ctx, routes& r, sharded<locator::shared_to
return addr | std::ranges::to<std::vector>();
});
ss::get_excluded_nodes.set(r, [&tm](const_req req) {
const auto& local_tm = *tm.local().get();
std::vector<sstring> eps;
local_tm.get_topology().for_each_node([&] (auto& node) {
if (node.is_excluded()) {
eps.push_back(node.host_id().to_sstring());
}
});
return eps;
});
ss::get_joining_nodes.set(r, [&tm, &g](const_req req) {
const auto& local_tm = *tm.local().get();
const auto& points = local_tm.get_bootstrap_tokens();
@@ -130,6 +141,7 @@ void unset_token_metadata(http_context& ctx, routes& r) {
ss::get_leaving_nodes.unset(r);
ss::get_moving_nodes.unset(r);
ss::get_joining_nodes.unset(r);
ss::get_excluded_nodes.unset(r);
ss::get_host_id_map.unset(r);
httpd::endpoint_snitch_info_json::get_datacenter.unset(r);
httpd::endpoint_snitch_info_json::get_rack.unset(r);

View File

@@ -1209,7 +1209,7 @@ future<mutation> create_table_streams_mutation(table_id table, db_clock::time_po
co_return std::move(m);
}
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const std::vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
future<mutation> create_table_streams_mutation(table_id table, db_clock::time_point stream_ts, const utils::chunked_vector<cdc::stream_id>& stream_ids, api::timestamp_type ts) {
auto s = db::system_keyspace::cdc_streams_state();
mutation m(s, partition_key::from_single_value(*s,
@@ -1252,24 +1252,24 @@ future<> generation_service::load_cdc_tablet_streams(std::optional<std::unordere
tables_to_process = _cdc_metadata.get_tables_with_cdc_tablet_streams() | std::ranges::to<std::unordered_set<table_id>>();
}
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) -> future<> {
auto read_streams_state = [this] (const std::optional<std::unordered_set<table_id>>& tables, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) -> future<> {
if (tables) {
for (auto table : *tables) {
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
co_await _sys_ks.local().read_cdc_streams_state(table, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
return f(table, base_ts, std::move(base_stream_set));
});
}
} else {
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
co_await _sys_ks.local().read_cdc_streams_state(std::nullopt, [&] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
return f(table, base_ts, std::move(base_stream_set));
});
}
};
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, std::vector<cdc::stream_id> base_stream_set) -> future<> {
co_await read_streams_state(changed_tables, [this, &tables_to_process] (table_id table, db_clock::time_point base_ts, utils::chunked_vector<cdc::stream_id> base_stream_set) -> future<> {
table_streams new_table_map;
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, std::vector<cdc::stream_id> stream_set) {
auto append_stream = [&new_table_map] (db_clock::time_point stream_tp, utils::chunked_vector<cdc::stream_id> stream_set) {
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(stream_tp.time_since_epoch()).count();
new_table_map[ts] = committed_stream_set {stream_tp, std::move(stream_set)};
};
@@ -1345,7 +1345,7 @@ future<> generation_service::query_cdc_timestamps(table_id table, bool ascending
}
}
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
future<> generation_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
const auto& all_tables = _cdc_metadata.get_all_tablet_streams();
auto table_it = all_tables.find(table);
if (table_it == all_tables.end()) {
@@ -1402,8 +1402,8 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
co_return;
}
std::vector<cdc::stream_id> new_streams;
new_streams.reserve(new_tablet_map.tablet_count());
utils::chunked_vector<cdc::stream_id> new_streams;
co_await utils::reserve_gently(new_streams, new_tablet_map.tablet_count());
for (auto tid : new_tablet_map.tablet_ids()) {
new_streams.emplace_back(new_tablet_map.get_last_token(tid), 0);
co_await coroutine::maybe_yield();
@@ -1425,7 +1425,7 @@ future<> generation_service::generate_tablet_resize_update(utils::chunked_vector
muts.emplace_back(std::move(mut));
}
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts) {
utils::chunked_vector<mutation> muts;
muts.reserve(2);

View File

@@ -143,12 +143,12 @@ stream_state read_stream_state(int8_t val);
struct committed_stream_set {
db_clock::time_point ts;
std::vector<cdc::stream_id> streams;
utils::chunked_vector<cdc::stream_id> streams;
};
struct cdc_stream_diff {
std::vector<stream_id> closed_streams;
std::vector<stream_id> opened_streams;
utils::chunked_vector<stream_id> closed_streams;
utils::chunked_vector<stream_id> opened_streams;
};
using table_streams = std::map<api::timestamp_type, committed_stream_set>;
@@ -220,11 +220,11 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const locator::tablet_map&, api::timestamp_type);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const std::vector<cdc::stream_id>&, api::timestamp_type);
future<mutation> create_table_streams_mutation(table_id, db_clock::time_point, const utils::chunked_vector<cdc::stream_id>&, api::timestamp_type);
utils::chunked_vector<mutation> make_drop_table_streams_mutations(table_id, api::timestamp_type ts);
future<mutation> get_switch_streams_mutation(table_id table, db_clock::time_point stream_ts, cdc_stream_diff diff, api::timestamp_type ts);
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const std::vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
future<utils::chunked_vector<mutation>> get_cdc_stream_gc_mutations(table_id table, db_clock::time_point base_ts, const utils::chunked_vector<cdc::stream_id>& base_stream_set, api::timestamp_type ts);
table_streams::const_iterator get_new_base_for_gc(const table_streams&, std::chrono::seconds ttl);
} // namespace cdc

View File

@@ -149,7 +149,7 @@ public:
future<> load_cdc_tablet_streams(std::optional<std::unordered_set<table_id>> changed_tables);
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> generate_tablet_resize_update(utils::chunked_vector<canonical_mutation>& muts, table_id table, const locator::tablet_map& new_tablet_map, api::timestamp_type ts);

View File

@@ -54,7 +54,7 @@ cdc::stream_id get_stream(
}
static cdc::stream_id get_stream(
const std::vector<cdc::stream_id>& streams,
const utils::chunked_vector<cdc::stream_id>& streams,
dht::token tok) {
if (streams.empty()) {
on_internal_error(cdc_log, "get_stream: streams empty");
@@ -159,7 +159,7 @@ cdc::stream_id cdc::metadata::get_vnode_stream(api::timestamp_type ts, dht::toke
return ret;
}
const std::vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
const utils::chunked_vector<cdc::stream_id>& cdc::metadata::get_tablet_stream_set(table_id tid, api::timestamp_type ts) const {
auto now = api::new_timestamp();
if (ts > now + get_generation_leeway().count()) {
throw exceptions::invalid_request_exception(seastar::format(
@@ -259,10 +259,10 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
return !it->second;
}
future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
const std::vector<cdc::stream_id>& prev_stream_set,
std::vector<cdc::stream_id> opened,
const std::vector<cdc::stream_id>& closed) {
future<utils::chunked_vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
utils::chunked_vector<cdc::stream_id> opened,
const utils::chunked_vector<cdc::stream_id>& closed) {
if (closed.size() == prev_stream_set.size()) {
// all previous streams are closed, so the next stream set is just the opened streams.
@@ -273,8 +273,8 @@ future<std::vector<cdc::stream_id>> cdc::metadata::construct_next_stream_set(
// streams and removing the closed streams. we assume each stream set is
// sorted by token, and the result is sorted as well.
std::vector<cdc::stream_id> next_stream_set;
next_stream_set.reserve(prev_stream_set.size() + opened.size() - closed.size());
utils::chunked_vector<cdc::stream_id> next_stream_set;
co_await utils::reserve_gently(next_stream_set, prev_stream_set.size() + opened.size() - closed.size());
auto next_prev = prev_stream_set.begin();
auto next_closed = closed.begin();
@@ -318,8 +318,8 @@ std::vector<table_id> cdc::metadata::get_tables_with_cdc_tablet_streams() const
return _tablet_streams | std::views::keys | std::ranges::to<std::vector<table_id>>();
}
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const std::vector<stream_id>& before, const std::vector<stream_id>& after) {
std::vector<stream_id> closed, opened;
future<cdc::cdc_stream_diff> cdc::metadata::generate_stream_diff(const utils::chunked_vector<stream_id>& before, const utils::chunked_vector<stream_id>& after) {
utils::chunked_vector<stream_id> closed, opened;
auto before_it = before.begin();
auto after_it = after.begin();

View File

@@ -49,7 +49,7 @@ class metadata final {
container_t::const_iterator gen_used_at(api::timestamp_type ts) const;
const std::vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
const utils::chunked_vector<stream_id>& get_tablet_stream_set(table_id tid, api::timestamp_type ts) const;
public:
/* Is a generation with the given timestamp already known or obsolete? It is obsolete if and only if
@@ -111,14 +111,14 @@ public:
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
static future<std::vector<stream_id>> construct_next_stream_set(
const std::vector<cdc::stream_id>& prev_stream_set,
std::vector<cdc::stream_id> opened,
const std::vector<cdc::stream_id>& closed);
static future<utils::chunked_vector<stream_id>> construct_next_stream_set(
const utils::chunked_vector<cdc::stream_id>& prev_stream_set,
utils::chunked_vector<cdc::stream_id> opened,
const utils::chunked_vector<cdc::stream_id>& closed);
static future<cdc_stream_diff> generate_stream_diff(
const std::vector<stream_id>& before,
const std::vector<stream_id>& after);
const utils::chunked_vector<stream_id>& before,
const utils::chunked_vector<stream_id>& after);
};

View File

@@ -145,9 +145,7 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
}
compression_parameters cp(*compression_options);
cp.validate(
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
cp.validate(compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)));
}
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);

View File

@@ -112,14 +112,8 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, utils::chun
ksm->strategy_name(),
locator::replication_strategy_params(ksm->strategy_options(), ksm->initial_tablets(), ksm->consistency_option()),
tmptr->get_topology());
if (rs->uses_tablets()) {
warnings.push_back(
"Tables in this keyspace will be replicated using Tablets "
"and will not support counters features. To use counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
if (ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
}
if (rs->uses_tablets() && ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
}
// If `rf_rack_valid_keyspaces` is enabled, it's forbidden to create an RF-rack-invalid keyspace.

View File

@@ -222,7 +222,7 @@ std::unique_ptr<prepared_statement> create_table_statement::raw_statement::prepa
throw exceptions::invalid_request_exception("Cannot set default_time_to_live on a table with counters");
}
if (ks_uses_tablets && pt.is_counter()) {
if (ks_uses_tablets && pt.is_counter() && !db.features().counters_with_tablets) {
throw exceptions::invalid_request_exception(format("Cannot use the 'counter' type for table {}.{}: Counters are not yet supported with tablets", keyspace(), cf_name));
}

View File

@@ -3329,7 +3329,6 @@ db::commitlog::read_log_file(const replay_state& state, sstring filename, sstrin
commit_load_reader_func func;
input_stream<char> fin;
replay_state::impl& state;
input_stream<char> r;
uint64_t id = 0;
size_t pos = 0;
size_t next = 0;

View File

@@ -1315,15 +1315,15 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, sstable_format(this, "sstable_format", liveness::LiveUpdate, value_status::Used, "me", "Default sstable file format", {"md", "me", "ms"})
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{},
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{compression_parameters::algorithm::lz4_with_dicts},
"Server-global user table compression options. If enabled, all user tables"
"will be compressed using the provided options, unless overridden"
"by compression options in the table schema. The available options are:\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor (default), LZ4WithDictsCompressor, SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor, LZ4WithDictsCompressor (default), SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
"* compression_level: (Default: 3) Compression level for ZstdCompressor and ZstdWithDictsCompressor. Higher levels provide better compression ratios at the cost of speed. Allowed values are integers between 1 and 22.")
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Deprecated, true,
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
@@ -1425,7 +1425,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, alternator_port(this, "alternator_port", value_status::Used, 0, "Alternator API port.")
, alternator_https_port(this, "alternator_https_port", value_status::Used, 0, "Alternator API HTTPS port.")
, alternator_address(this, "alternator_address", value_status::Used, "0.0.0.0", "Alternator API listening address.")
, alternator_enforce_authorization(this, "alternator_enforce_authorization", value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
, alternator_enforce_authorization(this, "alternator_enforce_authorization", liveness::LiveUpdate, value_status::Used, false, "Enforce checking the authorization header for every request in Alternator.")
, alternator_warn_authorization(this, "alternator_warn_authorization", liveness::LiveUpdate, value_status::Used, false, "Count and log warnings about failed authentication or authorization")
, alternator_write_isolation(this, "alternator_write_isolation", value_status::Used, "", "Default write isolation policy for Alternator.")
, alternator_streams_time_window_s(this, "alternator_streams_time_window_s", value_status::Used, 10, "CDC query confidence window for alternator streams.")
, alternator_timeout_in_ms(this, "alternator_timeout_in_ms", liveness::LiveUpdate, value_status::Used, 10000,

View File

@@ -458,6 +458,7 @@ public:
named_value<uint16_t> alternator_https_port;
named_value<sstring> alternator_address;
named_value<bool> alternator_enforce_authorization;
named_value<bool> alternator_warn_authorization;
named_value<sstring> alternator_write_isolation;
named_value<uint32_t> alternator_streams_time_window_s;
named_value<uint32_t> alternator_timeout_in_ms;

View File

@@ -2463,14 +2463,14 @@ future<bool> system_keyspace::cdc_is_rewritten() {
}
future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f) {
noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f) {
static const sstring all_tables_query = format("SELECT table_id, timestamp, stream_id FROM {}.{}", NAME, CDC_STREAMS_STATE);
static const sstring single_table_query = format("SELECT table_id, timestamp, stream_id FROM {}.{} WHERE table_id = ?", NAME, CDC_STREAMS_STATE);
struct cur_t {
table_id tid;
db_clock::time_point ts;
std::vector<cdc::stream_id> streams;
utils::chunked_vector<cdc::stream_id> streams;
};
std::optional<cur_t> cur;
@@ -2487,7 +2487,7 @@ future<> system_keyspace::read_cdc_streams_state(std::optional<table_id> table,
if (cur) {
co_await f(cur->tid, cur->ts, std::move(cur->streams));
}
cur = { tid, ts, std::vector<cdc::stream_id>() };
cur = { tid, ts, utils::chunked_vector<cdc::stream_id>() };
}
cur->streams.push_back(std::move(stream_id));

View File

@@ -601,7 +601,7 @@ public:
future<bool> cdc_is_rewritten();
future<> cdc_set_rewritten(std::optional<cdc::generation_id_v1>);
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, std::vector<cdc::stream_id>)> f);
future<> read_cdc_streams_state(std::optional<table_id> table, noncopyable_function<future<>(table_id, db_clock::time_point, utils::chunked_vector<cdc::stream_id>)> f);
future<> read_cdc_streams_history(table_id table, std::optional<db_clock::time_point> from, noncopyable_function<future<>(table_id, db_clock::time_point, cdc::cdc_stream_diff)> f);
// Load Raft Group 0 id from scylla.local

View File

@@ -3311,15 +3311,6 @@ public:
_step.base->schema()->cf_name(), _step.current_token(), view_names);
}
if (_step.reader.is_end_of_stream() && _step.reader.is_buffer_empty()) {
if (_step.current_key.key().is_empty()) {
// consumer got end-of-stream without consuming a single partition
vlogger.debug("Reader didn't produce anything, marking views as built");
while (!_step.build_status.empty()) {
_built_views.views.push_back(std::move(_step.build_status.back()));
_step.build_status.pop_back();
}
}
// before going back to the minimum token, advance current_key to the end
// and check for built views in that range.
_step.current_key = { _step.prange.end().value_or(dht::ring_position::max()).value().token(), partition_key::make_empty()};
@@ -3338,6 +3329,7 @@ public:
// Called in the context of a seastar::thread.
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
inject_failure("dont_start_build_step");
gc_clock::time_point now = gc_clock::now();
auto compaction_state = make_lw_shared<compact_for_query_state>(
*step.reader.schema(),
@@ -3372,6 +3364,7 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
seastar::when_all_succeed(bookkeeping_ops.begin(), bookkeeping_ops.end()).handle_exception([] (std::exception_ptr ep) {
vlogger.warn("Failed to update materialized view bookkeeping ({}), continuing anyway.", ep);
}).get();
utils::get_local_injector().inject("delay_finishing_build_step", utils::wait_for_message(60s)).get();
}
future<> view_builder::mark_as_built(view_ptr view) {

View File

@@ -1278,7 +1278,7 @@ public:
static_assert(int(cdc::stream_state::current) < int(cdc::stream_state::closed));
static_assert(int(cdc::stream_state::closed) < int(cdc::stream_state::opened));
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
co_await _ss.query_cdc_streams(table, [&] (db_clock::time_point ts, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff diff) -> future<> {
co_await emit_stream_set(ts, cdc::stream_state::current, current);
co_await emit_stream_set(ts, cdc::stream_state::closed, diff.closed_streams);
co_await emit_stream_set(ts, cdc::stream_state::opened, diff.opened_streams);

View File

@@ -21,6 +21,11 @@ import urllib.request
from pkg_resources import parse_version
import multiprocessing as mp
# Python 3.14 changed the default to 'forkserver', which is not compatible
# with our relocatable python. It execs our Python binary, but without our
# ld.so. Change it back to 'fork' to avoid issues.
mp.set_start_method('fork')
VERSION = "1.0"
quiet = False
# Temporary url for the review

View File

@@ -109,6 +109,32 @@ to do what, configure the following in ScyllaDB's configuration:
alternator_enforce_authorization: true
```
Note: switching `alternator_enforce_authorization` from `false` to `true`
before the client application has the proper secret keys and permission
tables set up will cause the application's requests to immediately fail.
Therefore, we recommend to begin by keeping `alternator_enforce_authorization`
set to `false` and setting `alternator_warn_authorization` to `true`.
This setting will continue to allow all requests without failing on
authentication or authorization errors - but will _count_ would-be
authentication and authorization failures in the two metrics:
* `scylla_alternator_authentication_failures`
* `scylla_alternator_authorization_failures`
`alternator_warn_authorization=true` also generates a WARN-level log message
on each authentication or authorization failure. These log messages each
includes the string `alternator_enforce_authorization=true`, and information
that can help pinpoint the source of the error - such as the username
involved in the attempt, and the address of the client sending the request.
When you see that both metrics are not increasing (or, alternatively, that no
more log messages appear), you can be sure that the application is properly
set up and can finally set `alternator_enforce_authorization` to `true`.
You can leave `alternator_warn_authorization` set or unset, depending on
whether or not you want to see log messages when requests fail on
authentication/authorization (in any case, the metric counts these failures,
and the client will also get the error).
Alternator implements the same [signature protocol](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html)
as DynamoDB and the rest of AWS. Clients use, as usual, an access key ID and
a secret access key to prove their identity and the authenticity of their

View File

@@ -197,12 +197,6 @@ Limitations and Unsupported Features
throughout its lifetime. Failing to keep that invariant satisfied may result in data inconsistencies,
performance problems, or other issues.
The following ScyllaDB features are not supported if a keyspace has tablets
enabled. If you plan to use any of the features listed below, CREATE your keyspace
:ref:`with tablets disabled <tablets-enable-tablets>`.
* Counters
To enable materialized views and secondary indexes for tablet keyspaces, use
the `--rf-rack-valid-keyspaces` See :ref:`Views with tablets <admin-views-with-tablets>` for details.

View File

@@ -3,8 +3,6 @@
ScyllaDB Counters
==================
.. note:: Counters are not supported in keyspaces with :doc:`tablets</architecture/tablets>` enabled.
Counters are useful for any application where you need to increment a count, such as keeping a track of:
* The number of web page views on a website.

View File

@@ -0,0 +1,47 @@
Nodetool excludenode
====================
.. warning::
You must never use the ``nodetool excludenode`` on a running node that can be reached by other nodes in the cluster.
Before using the command, make sure the node is permanently down and cannot be recovered.
Running ``excludenode`` will mark given nodes as permanently down (excluded).
The cluster will no longer attempt to contact excluded nodes, which unblocks
tablet load balancing, replication changes, etc.
The nodes will be permanently banned from the cluster, meaning you won't be able to bring them back.
Data ownership is not changed, and the nodes are still cluster members,
so have to be eventually removed or replaced.
After nodes are excluded, there is no need to pass them in the list of ignored
nodes to removnenode, replace, or repair.
Prerequisites
------------------------
* Using ``excludenode`` requires at least a quorum of nodes in a cluster to be available.
If the quorum is lost, it must be restored before you change the cluster topology.
See :doc:`Handling Node Failures </troubleshooting/handling-node-failures>` for details.
Usage
--------
Provide the Host IDs of the nodes you want to mark as permanently down.
.. code-block:: console
nodetool excludenode <Host ID> [ ... <Host ID>]
Examples:
.. code-block:: console
nodetool excludenode 2d1e1b0a-4ecb-4128-ba45-36ba558f7aee
.. code-block:: console
nodetool excludenode 2d1e1b0a-4ecb-4128-ba45-36ba558f7aee 73adf19e-2912-4cf6-b9ab-bbc74297b8de
.. include:: nodetool-index.rst

View File

@@ -47,6 +47,12 @@ Example:
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
To only mark the node as permanently down without doing actual removal, use :doc:`nodetool excludenode </operating-scylla/nodetool-commands/excludenode>`:
.. code-block:: console
nodetool excludenode <Host ID of the node>
.. _removenode-ignore-dead-nodes:

View File

@@ -18,84 +18,93 @@ Example output:
Datacenter: datacenter1
=======================
Status=Up/Down
Status=Up/Down/eXcluded
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 127.0.0.1 394.97 MB 256 33.4% 292a6c7f-2063-484c-b54d-9015216f1750 rack1
UN 127.0.0.2 151.07 MB 256 34.3% 102b6ecd-2081-4073-8172-bf818c35e27b rack1
UN 127.0.0.3 249.07 MB 256 32.3% 20db6ecd-2981-447s-l172-jf118c17o27y rack1
XN 127.0.0.4 149.07 MB 256 32.3% dd961642-c7c6-4962-9f5a-ea774dbaed77 rack1
+----------+---------------------------------------+
|Parameter |Description |
| | |
| | |
| | |
| | |
+==========+=======================================+
|Datacenter|The data center that holds |
| |the information. |
| | |
| | |
| | |
| | |
+----------+---------------------------------------+
|Status |``U`` - The node is up. |
| | |
| |``D`` - The node is down. |
+----------+---------------------------------------+
|State |``N`` - Normal |
| | |
| |``L`` - Leaving |
| | |
| |``J`` - Joining |
| | |
| |``M`` - Moving |
+----------+---------------------------------------+
|Address |The IP address of the node. |
| | |
+----------+---------------------------------------+
|Load |The size on disk the ScyllaDB data |
| | takes up (updates every 60 seconds). |
| | |
| | |
| | |
| | |
+----------+---------------------------------------+
|Tokens |The number of tokens per node. |
| | |
| | |
| | |
+----------+---------------------------------------+
|Owns |The percentage of data owned by |
| |the node (per datacenter) multiplied by|
| |the replication factor you are using. |
| | |
| |For example, if the node owns 25% of |
| |the data and the replication factor |
| |is 4, the value will equal 100%. |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
+----------+---------------------------------------+
|Host ID |The unique identifier (UUID) |
| |automatically assigned to the node. |
| | |
+----------+---------------------------------------+
|Rack |The name of the rack. |
+----------+---------------------------------------+
+----------+---------------------------------------------------------------+
|Parameter |Description |
| | |
| | |
| | |
| | |
+==========+===============================================================+
|Datacenter|The data center that holds |
| |the information. |
| | |
| | |
| | |
| | |
+----------+---------------------------------------------------------------+
|Status |``U`` - The node is up. |
| | |
| |``D`` - The node is down. |
| | |
| |``X`` - The node is :ref:`excluded <status-excluded>`. |
+----------+---------------------------------------------------------------+
|State |``N`` - Normal |
| | |
| |``L`` - Leaving |
| | |
| |``J`` - Joining |
| | |
| |``M`` - Moving |
+----------+---------------------------------------------------------------+
|Address |The IP address of the node. |
| | |
+----------+---------------------------------------------------------------+
|Load |The size on disk the ScyllaDB data |
| | takes up (updates every 60 seconds). |
| | |
| | |
| | |
| | |
+----------+---------------------------------------------------------------+
|Tokens |The number of tokens per node. |
| | |
| | |
| | |
+----------+---------------------------------------------------------------+
|Owns |The percentage of data owned by |
| |the node (per datacenter) multiplied by |
| |the replication factor you are using. |
| | |
| |For example, if the node owns 25% of |
| |the data and the replication factor |
| |is 4, the value will equal 100%. |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
+----------+---------------------------------------------------------------+
|Host ID |The unique identifier (UUID) |
| |automatically assigned to the node. |
| | |
+----------+---------------------------------------------------------------+
|Rack |The name of the rack. |
+----------+---------------------------------------------------------------+
.. _status-excluded:
Nodes in the excluded status (``X``) are down nodes which were marked as excluded
by ``removenode``, ``excludenode``` or node replace, and means that they are considered permanently lost.
See :doc:`nodetool excludenode </operating-scylla/nodetool-commands/excludenode>` for more information.
.. include:: nodetool-index.rst

View File

@@ -30,6 +30,7 @@ Nodetool
nodetool-commands/enablebackup
nodetool-commands/enablebinary
nodetool-commands/enablegossip
nodetool-commands/excludenode
nodetool-commands/flush
nodetool-commands/getcompactionthroughput
nodetool-commands/getendpoints
@@ -104,6 +105,7 @@ Operations that are not listed below are currently not available.
* :doc:`enablebackup </operating-scylla/nodetool-commands/enablebackup/>` - Enable incremental backup.
* :doc:`enablebinary </operating-scylla/nodetool-commands/enablebinary/>` - Re-enable native transport (binary protocol).
* :doc:`enablegossip </operating-scylla/nodetool-commands/enablegossip/>` - Re-enable gossip.
* :doc:`excludenode </operating-scylla/nodetool-commands/excludenode/>`- Mark nodes as permanently down.
* :doc:`flush </operating-scylla/nodetool-commands/flush/>` - Flush one or more column families.
* :doc:`getcompactionthroughput </operating-scylla/nodetool-commands/getcompactionthroughput>` - Print the throughput cap for compaction in the system
* :doc:`getendpoints <nodetool-commands/getendpoints/>` :code:`<keyspace>` :code:`<table>` :code:`<key>`- Print the end points that owns the key.

View File

@@ -159,6 +159,7 @@ public:
gms::feature workload_prioritization { *this, "WORKLOAD_PRIORITIZATION"sv };
gms::feature colocated_tablets { *this, "COLOCATED_TABLETS"sv };
gms::feature cdc_with_tablets { *this, "CDC_WITH_TABLETS"sv };
gms::feature counters_with_tablets { *this, "COUNTERS_WITH_TABLETS"sv };
gms::feature file_stream { *this, "FILE_STREAM"sv };
gms::feature compression_dicts { *this, "COMPRESSION_DICTS"sv };
gms::feature tablet_options { *this, "TABLET_OPTIONS"sv };

View File

@@ -37,7 +37,9 @@ struct tablet_load_stats final {
// Sum of all tablet sizes on a node and available disk space.
uint64_t effective_capacity;
std::unordered_map<locator::range_based_tablet_id, uint64_t> tablet_sizes;
// Contains tablet sizes per table. The token ranges must be in the form
// (a, b] and only such ranges are allowed
std::unordered_map<::table_id, std::unordered_map<dht::token_range, uint64_t>> tablet_sizes;
};
struct load_stats {

View File

@@ -90,60 +90,53 @@ future<sstring> ec2_snitch::aws_api_call(sstring addr, uint16_t port, sstring cm
}
future<sstring> ec2_snitch::aws_api_call_once(sstring addr, uint16_t port, sstring cmd, std::optional<sstring> token) {
return connect(socket_address(inet_address{addr}, port))
.then([this, addr, cmd, token] (connected_socket fd) {
_sd = std::move(fd);
_in = _sd.input();
_out = _sd.output();
connected_socket fd = co_await connect(socket_address(inet_address{addr}, port));
auto in = fd.input();
auto out = fd.output();
if (token) {
_req = sstring("GET ") + cmd +
sstring(" HTTP/1.1\r\nHost: ") +addr +
sstring("\r\nX-aws-ec2-metadata-token: ") + *token +
sstring("\r\n\r\n");
} else {
_req = sstring("PUT ") + cmd +
sstring(" HTTP/1.1\r\nHost: ") + addr +
sstring("\r\nX-aws-ec2-metadata-token-ttl-seconds: 60") +
sstring("\r\n\r\n");
}
if (token) {
_req = sstring("GET ") + cmd +
sstring(" HTTP/1.1\r\nHost: ") +addr +
sstring("\r\nX-aws-ec2-metadata-token: ") + *token +
sstring("\r\n\r\n");
} else {
_req = sstring("PUT ") + cmd +
sstring(" HTTP/1.1\r\nHost: ") + addr +
sstring("\r\nX-aws-ec2-metadata-token-ttl-seconds: 60") +
sstring("\r\n\r\n");
}
return _out.write(_req.c_str()).then([this] {
return _out.flush();
});
}).then([this] {
_parser.init();
return _in.consume(_parser).then([this] {
if (_parser.eof()) {
return make_exception_future<sstring>("Bad HTTP response");
}
co_await out.write(_req.c_str());
co_await out.flush();
// Read HTTP response header first
auto _rsp = _parser.get_parsed_response();
auto rc = _rsp->_status;
// Verify EC2 instance metadata access
if (rc == http::reply::status_type(403)) {
return make_exception_future<sstring>(std::runtime_error("Error: Unauthorized response received when trying to communicate with instance metadata service."));
}
if (_rsp->_status != http::reply::status_type::ok) {
return make_exception_future<sstring>(std::runtime_error(format("Error: HTTP response status {}", _rsp->_status)));
}
_parser.init();
co_await in.consume(_parser);
if (_parser.eof()) {
co_await coroutine::return_exception(std::runtime_error("Bad HTTP response"));
}
auto it = _rsp->_headers.find("Content-Length");
if (it == _rsp->_headers.end()) {
return make_exception_future<sstring>("Error: HTTP response does not contain: Content-Length\n");
}
// Read HTTP response header first
auto _rsp = _parser.get_parsed_response();
auto rc = _rsp->_status;
// Verify EC2 instance metadata access
if (rc == http::reply::status_type(403)) {
co_await coroutine::return_exception(std::runtime_error("Error: Unauthorized response received when trying to communicate with instance metadata service."));
}
if (_rsp->_status != http::reply::status_type::ok) {
co_await coroutine::return_exception(std::runtime_error(format("Error: HTTP response status {}", _rsp->_status)));
}
auto content_len = std::stoi(it->second);
auto it = _rsp->_headers.find("Content-Length");
if (it == _rsp->_headers.end()) {
co_await coroutine::return_exception(std::runtime_error("Error: HTTP response does not contain: Content-Length\n"));
}
// Read HTTP response body
return _in.read_exactly(content_len).then([] (temporary_buffer<char> buf) {
sstring res(buf.get(), buf.size());
auto content_len = std::stoi(it->second);
return make_ready_future<sstring>(std::move(res));
});
});
});
// Read HTTP response body
temporary_buffer<char> buf = co_await in.read_exactly(content_len);
sstring res(buf.get(), buf.size());
co_return res;
}
future<sstring> ec2_snitch::read_property_file() {

View File

@@ -30,9 +30,6 @@ protected:
future<sstring> aws_api_call(sstring addr, uint16_t port, const sstring cmd, std::optional<sstring> token);
future<sstring> read_property_file();
private:
connected_socket _sd;
input_stream<char> _in;
output_stream<char> _out;
http_response_parser _parser;
sstring _req;
exponential_backoff_retry _ec2_api_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(2560));

View File

@@ -865,9 +865,11 @@ table_load_stats& table_load_stats::operator+=(const table_load_stats& s) noexce
uint64_t tablet_load_stats::add_tablet_sizes(const tablet_load_stats& tls) {
uint64_t table_sizes_sum = 0;
for (auto& [rb_tid, tablet_size] : tls.tablet_sizes) {
tablet_sizes[rb_tid] = tablet_size;
table_sizes_sum += tablet_size;
for (auto& [table, sizes] : tls.tablet_sizes) {
for (auto& [range, tablet_size] : sizes) {
tablet_sizes[table][range] = tablet_size;
table_sizes_sum += tablet_size;
}
}
return table_sizes_sum;
}
@@ -894,16 +896,87 @@ load_stats& load_stats::operator+=(const load_stats& s) {
}
std::optional<uint64_t> load_stats::get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const {
if (auto node_i = tablet_stats.find(host); node_i != tablet_stats.end()) {
const tablet_load_stats& tls = node_i->second;
if (auto ts_i = tls.tablet_sizes.find(rb_tid); ts_i != tls.tablet_sizes.end()) {
return ts_i->second;
if (auto host_i = tablet_stats.find(host); host_i != tablet_stats.end()) {
auto& sizes_per_table = host_i->second.tablet_sizes;
if (auto table_i = sizes_per_table.find(rb_tid.table); table_i != sizes_per_table.end()) {
auto& tablet_sizes = table_i->second;
if (auto size_i = tablet_sizes.find(rb_tid.range); size_i != tablet_sizes.end()) {
return size_i->second;
}
}
}
tablet_logger.debug("Unable to find tablet size on host: {} for tablet: {}", host, rb_tid);
return std::nullopt;
}
lw_shared_ptr<load_stats> load_stats::reconcile_tablets_resize(const std::unordered_set<table_id>& tables, const token_metadata& old_tm, const token_metadata& new_tm) const {
lw_shared_ptr<load_stats> reconciled_stats { make_lw_shared<load_stats>(*this) };
load_stats& new_stats = *reconciled_stats;
for (table_id table : tables) {
if (!new_tm.tablets().has_tablet_map(table)) {
// Table has been dropped, remove it from stats
for (auto& [host, tls] : new_stats.tablet_stats) {
tls.tablet_sizes.erase(table);
}
continue;
}
const auto& old_tmap = old_tm.tablets().get_tablet_map(table);
const auto& new_tmap = new_tm.tablets().get_tablet_map(table);
size_t old_tablet_count = old_tmap.tablet_count();
size_t new_tablet_count = new_tmap.tablet_count();
if (old_tablet_count == new_tablet_count * 2) {
// Reconcile for merge
for (size_t i = 0; i < old_tablet_count; i += 2) {
range_based_tablet_id rb_tid1 { table, old_tmap.get_token_range(tablet_id(i)) };
range_based_tablet_id rb_tid2 { table, old_tmap.get_token_range(tablet_id(i + 1)) };
auto& tinfo = old_tmap.get_tablet_info(tablet_id(i));
for (auto& replica : tinfo.replicas) {
auto tablet_size_opt1 = new_stats.get_tablet_size(replica.host, rb_tid1);
auto tablet_size_opt2 = new_stats.get_tablet_size(replica.host, rb_tid2);
if (!tablet_size_opt1 || !tablet_size_opt2) {
if (!tablet_size_opt1) {
tablet_logger.debug("Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid1, replica.host);
}
if (!tablet_size_opt2) {
tablet_logger.debug("Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid2, replica.host);
}
return nullptr;
}
dht::token_range new_range { new_tmap.get_token_range(tablet_id(i / 2)) };
auto& sizes_for_table = new_stats.tablet_stats.at(replica.host).tablet_sizes.at(table);
uint64_t merged_tablet_size = *tablet_size_opt1 + *tablet_size_opt2;
sizes_for_table[new_range] = merged_tablet_size;
sizes_for_table.erase(rb_tid1.range);
sizes_for_table.erase(rb_tid2.range);
}
}
} else if (old_tablet_count == new_tablet_count / 2) {
// Reconcile for split
for (size_t i = 0; i < old_tablet_count; i++) {
range_based_tablet_id rb_tid { table, old_tmap.get_token_range(tablet_id(i)) };
auto& tinfo = old_tmap.get_tablet_info(tablet_id(i));
for (auto& replica : tinfo.replicas) {
auto tablet_size_opt = new_stats.get_tablet_size(replica.host, rb_tid);
if (!tablet_size_opt) {
tablet_logger.debug("Unable to find tablet size in stats for table resize reconcile for tablet {} on host {}", rb_tid, replica.host);
return nullptr;
}
dht::token_range new_range1 { new_tmap.get_token_range(tablet_id(i * 2)) };
dht::token_range new_range2 { new_tmap.get_token_range(tablet_id(i * 2 + 1)) };
auto& sizes_for_table = new_stats.tablet_stats.at(replica.host).tablet_sizes.at(table);
uint64_t split_tablet_size = *tablet_size_opt / 2;
sizes_for_table[new_range1] = split_tablet_size;
sizes_for_table[new_range2] = split_tablet_size;
sizes_for_table.erase(rb_tid.range);
}
}
}
}
return reconciled_stats;
}
tablet_range_splitter::tablet_range_splitter(schema_ptr schema, const tablet_map& tablets, host_id host, const dht::partition_range_vector& ranges)
: _schema(std::move(schema))
, _ranges(ranges)

View File

@@ -66,6 +66,9 @@ struct global_tablet_id {
struct range_based_tablet_id {
table_id table;
// This represents the token range of the tablet in the form (a, b]
// and only such ranges are allowed
dht::token_range range;
bool operator==(const range_based_tablet_id&) const = default;
@@ -445,7 +448,9 @@ struct tablet_load_stats {
// Sum of all tablet sizes on a node and available disk space.
uint64_t effective_capacity = 0;
std::unordered_map<range_based_tablet_id, uint64_t> tablet_sizes;
// Contains tablet sizes per table.
// The token ranges must be in the form (a, b] and only such ranges are allowed
std::unordered_map<table_id, std::unordered_map<dht::token_range, uint64_t>> tablet_sizes;
// returns the aggregated size of all the tablets added
uint64_t add_tablet_sizes(const tablet_load_stats& tls);
@@ -479,6 +484,12 @@ struct load_stats {
}
std::optional<uint64_t> get_tablet_size(host_id host, const range_based_tablet_id& rb_tid) const;
// Modifies the tablet sizes in load_stats for the given table after a split or merge. The old_tm argument has
// to contain the token_metadata pre-resize. The function returns load_stats with tablet token ranges
// corresponding to the post-resize tablet_map.
// In case any pre-resize tablet replica is not found, the function returns nullptr
lw_shared_ptr<load_stats> reconcile_tablets_resize(const std::unordered_set<table_id>& tables, const token_metadata& old_tm, const token_metadata& new_tm) const;
};
using load_stats_v2 = load_stats;

43
main.cc
View File

@@ -2221,12 +2221,47 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
//
// Also, if the dictionary compression feature is not enabled, use
// LZ4Compressor as the default algorithm instead of LZ4WithDictsCompressor.
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
auto& sstable_compression_options = cfg->sstable_compression_user_table_options;
gms::feature::listener_registration reg_listener;
if (!sstable_compression_options.is_set() && !dicts_feature_enabled) {
if (sstable_compression_options().get_algorithm() != compression_parameters::algorithm::lz4_with_dicts) {
on_internal_error(startlog, "expected LZ4WithDictsCompressor as default algorithm for sstable_compression_user_table_options.");
}
startlog.info("SSTABLE_COMPRESSION_DICTS feature is disabled. Overriding default SSTable compression to use LZ4Compressor instead of LZ4WithDictsCompressor.");
compression_parameters original_params{sstable_compression_options().get_options()};
auto params = sstable_compression_options().get_options();
params[compression_parameters::SSTABLE_COMPRESSION] = sstring(compression_parameters::algorithm_to_name(compression_parameters::algorithm::lz4));
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
sstable_compression_options(compression_parameters{params}, utils::config_file::config_source::None);
}
}).get();
// Register a callback to update the default compression algorithm when the feature is enabled.
// Precondition:
// The callback must run inside seastar::async context:
// - If the listener fires immediately, we are running inside seastar::async already.
// - If the listener is deferred, `feature_service::enable()` runs it inside seastar::async.
reg_listener = feature_service.local().sstable_compression_dicts.when_enabled([&sstable_compression_options, params = std::move(original_params)] {
startlog.info("SSTABLE_COMPRESSION_DICTS feature is now enabled. Overriding default SSTable compression to use LZ4WithDictsCompressor.");
smp::invoke_on_all([&sstable_compression_options, params = std::move(params)] {
if (!sstable_compression_options.is_set()) { // guard check; in case we ever make the option live updateable
sstable_compression_options(params, utils::config_file::config_source::None);
}
}).get();
});
}
try {
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
const auto& dicts_usage_allowed = cfg->sstable_compression_dictionaries_allow_in_ddl();
cfg->sstable_compression_user_table_options().validate(
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)),
compression_parameters::dicts_usage_allowed(dicts_usage_allowed));
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)));
} catch (const std::exception& e) {
startlog.error("Invalid sstable_compression_user_table_options: {}", e.what());
throw bad_configuration_error();

View File

@@ -1,6 +1,5 @@
DROP KEYSPACE IF EXISTS counters;
-- FIXME: use tablets after https://github.com/scylladb/scylladb/issues/18180 is done.
CREATE KEYSPACE IF NOT EXISTS counters WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'} AND TABLETS = {'enabled': false};
CREATE KEYSPACE IF NOT EXISTS counters WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '3'};
CREATE TABLE IF NOT EXISTS counters.counter1 (key blob PRIMARY KEY, "C0" counter, "C1" counter, "C2" counter, "C3" counter, "C4" counter);

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:a9742362bc16ca16e9f962af993d6df7d6c4301182528d2882d50ec01b27b043
size 6314928
oid sha256:012ccbeb5c93878bf260f751ff55faa723f235ee796dc7e31c4e14c1bcc0efae
size 6408088

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3c9d0a4c8289a7edf8ffb58d23cd71b686d98730ab1ac75921ac3a2d533eb66a
size 6325416
oid sha256:acb4310f476a7dac4a645ae6babae22af2541c37ed368eac666ff3bd24f1a56a
size 6406700

View File

@@ -196,6 +196,7 @@ struct row_level_repair_metrics {
uint64_t rx_hashes_nr{0};
uint64_t inc_sst_skipped_bytes{0};
uint64_t inc_sst_read_bytes{0};
uint64_t tablet_time_ms{0};
row_level_repair_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("repair", {
@@ -219,6 +220,8 @@ struct row_level_repair_metrics {
sm::description("Total number of bytes skipped from sstables for incremental repair on this shard.")),
sm::make_counter("inc_sst_read_bytes", inc_sst_read_bytes,
sm::description("Total number of bytes read from sstables for incremental repair on this shard.")),
sm::make_counter("tablet_time_ms", tablet_time_ms,
sm::description("Time spent on tablet repair on this shard in milliseconds.")),
});
}
};
@@ -3477,7 +3480,16 @@ future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
service::frozen_topology_guard topo_guard) {
auto start_time = flush_time;
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time, topo_guard);
co_return co_await repair.run();
bool is_tablet = shard_task.db.local().find_column_family(table_id).uses_tablets();
bool is_tablet_rebuild = shard_task.sched_info.for_tablet_rebuild;
auto t = std::chrono::steady_clock::now();
auto update_time = seastar::defer([&] {
if (is_tablet && !is_tablet_rebuild) {
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t);
_metrics.tablet_time_ms += duration.count();
}
});
co_await repair.run();
}
class row_level_repair_gossip_helper : public gms::i_endpoint_state_change_subscriber {

View File

@@ -483,13 +483,14 @@ locator::static_effective_replication_map_ptr keyspace::get_static_effective_rep
} // namespace replica
void backlog_controller::adjust() {
// Compute and update the backlog even when static shares are set to
// ensure that the backlog metrics reflect the current state.
auto backlog = _current_backlog();
if (controller_disabled()) {
update_controller(_static_shares);
return;
}
auto backlog = _current_backlog();
if (backlog >= _control_points.back().input) {
update_controller(_control_points.back().output);
return;
@@ -510,7 +511,7 @@ void backlog_controller::adjust() {
float backlog_controller::backlog_of_shares(float shares) const {
size_t idx = 1;
if (controller_disabled() || _control_points.size() == 0) {
if (_control_points.size() == 0) {
return 1.0f;
}
while ((idx < _control_points.size() - 1) && (_control_points[idx].output < shares)) {
@@ -1896,12 +1897,7 @@ std::ostream& operator<<(std::ostream& out, const database& db) {
return out;
}
future<mutation> database::do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema,
db::timeout_clock::time_point timeout,tracing::trace_state_ptr trace_state) {
auto m = fm.unfreeze(m_schema);
m.upgrade(cf.schema());
// prepare partition slice
static query::partition_slice partition_slice_for_counter_update(const mutation& m) {
query::column_id_vector static_columns;
static_columns.reserve(m.partition().static_row().size());
m.partition().static_row().for_each_cell([&] (auto id, auto&&) {
@@ -1924,20 +1920,18 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
regular_columns.erase(std::unique(regular_columns.begin(), regular_columns.end()),
regular_columns.end());
auto slice = query::partition_slice(std::move(cr_ranges), std::move(static_columns),
return query::partition_slice(std::move(cr_ranges), std::move(static_columns),
std::move(regular_columns), { }, { }, query::max_rows);
}
auto op = cf.write_in_progress();
tracing::trace(trace_state, "Acquiring counter locks");
auto locks = co_await cf.lock_counter_cells(m, timeout);
future<mutation> database::read_and_transform_counter_mutation_to_shards(mutation m, column_family& cf, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
// Before counter update is applied it needs to be transformed from
// deltas to counter shards. To do that, we need to read the current
// counter state for each modified cell...
tracing::trace(trace_state, "Reading counter values from the CF");
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(cf.schema(), "counter-read-before-write", timeout, trace_state);
auto slice = partition_slice_for_counter_update(m);
auto mopt = co_await counter_write_query(cf.schema(), cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state);
if (utils::get_local_injector().enter("apply_counter_update_delay_100ms")) {
@@ -1948,14 +1942,8 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz
// cells we can look for our shard in each of them, increment
// its clock and apply the delta.
transform_counter_updates_to_shards(m, mopt ? &*mopt : nullptr, cf.failed_counter_applies_to_memtable(), get_token_metadata().get_my_id());
tracing::trace(trace_state, "Applying counter update");
co_await apply_with_commitlog(cf, m, timeout);
if (utils::get_local_injector().enter("apply_counter_update_delay_5s")) {
co_await seastar::sleep(std::chrono::seconds(5));
}
co_return m;
co_return std::move(m);
}
max_purgeable memtable_list::get_max_purgeable(const dht::decorated_key& dk, is_shadowable is, api::timestamp_type max_seen_timestamp) const noexcept {
@@ -2050,29 +2038,70 @@ future<> database::apply_in_memory(const mutation& m, column_family& cf, db::rp_
return cf.apply(m, std::move(h), timeout);
}
future<mutation> database::apply_counter_update(schema_ptr s, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
future<counter_update_guard> database::acquire_counter_locks(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
auto& cf = find_column_family(fm.column_family_id());
auto m = fm.unfreeze(s);
m.upgrade(cf.schema());
auto op = cf.write_in_progress();
tracing::trace(trace_state, "Acquiring counter locks");
return do_with(std::move(m), [this, &cf, op = std::move(op), timeout] (mutation& m) mutable {
return update_write_metrics_if_failed([&m, &cf, op = std::move(op), timeout] mutable -> future<counter_update_guard> {
return cf.lock_counter_cells(m, timeout).then([op = std::move(op)] (std::vector<locked_cell> locks) mutable {
return counter_update_guard{std::move(op), std::move(locks)};
});
}());
});
}
future<mutation> database::prepare_counter_update(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
if (timeout <= db::timeout_clock::now() || utils::get_local_injector().is_enabled("database_apply_counter_update_force_timeout")) {
update_write_metrics_for_timed_out_write();
return make_exception_future<mutation>(timed_out_error{});
}
auto& cf = find_column_family(m.column_family_id());
auto& cf = find_column_family(fm.column_family_id());
if (is_in_critical_disk_utilization_mode() && cf.is_eligible_to_write_rejection_on_critical_disk_utilization()) {
update_write_metrics_for_rejected_writes();
return make_exception_future<mutation>(replica::critical_disk_utilization_exception{"rejected counter update mutation"});
}
return update_write_metrics(seastar::futurize_invoke([&] {
if (!s->is_synced()) {
throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}",
s->ks_name(), s->cf_name(), s->version()));
auto m = fm.unfreeze(s);
m.upgrade(cf.schema());
return update_write_metrics_if_failed(
read_and_transform_counter_mutation_to_shards(std::move(m), cf, std::move(trace_state), timeout));
}
future<> database::apply_counter_update(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state) {
auto& cf = find_column_family(fm.column_family_id());
auto m = fm.unfreeze(s);
m.upgrade(cf.schema());
tracing::trace(trace_state, "Applying counter update");
auto f = co_await coroutine::as_future(update_write_metrics(seastar::futurize_invoke([&] {
if (!s->is_synced()) {
throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}",
s->ks_name(), s->cf_name(), s->version()));
}
try {
return apply_with_commitlog(cf, m, timeout);
} catch (no_such_column_family&) {
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
throw;
}
})));
if (f.failed()) {
co_await coroutine::return_exception_ptr(f.get_exception());
}
try {
return do_apply_counter_update(cf, m, s, timeout, std::move(trace_state));
} catch (no_such_column_family&) {
dblog.error("Attempting to mutate non-existent table {}", m.column_family_id());
throw;
if (utils::get_local_injector().enter("apply_counter_update_delay_5s")) {
co_await seastar::sleep(std::chrono::seconds(5));
}
}));
}
// #9919 etc. The initiative to wrap exceptions here
@@ -2309,6 +2338,24 @@ Future database::update_write_metrics(Future&& f) {
});
}
template<typename Future>
Future database::update_write_metrics_if_failed(Future&& f) {
return f.then_wrapped([s = _stats] (auto f) {
if (f.failed()) {
++s->total_writes;
++s->total_writes_failed;
auto ep = f.get_exception();
if (is_timeout_exception(ep)) {
++s->total_writes_timedout;
} else if (try_catch<replica::rate_limit_exception>(ep)) {
++s->total_writes_rate_limited;
}
return futurize<Future>::make_exception_future(std::move(ep));
}
return f;
});
}
void database::update_write_metrics_for_timed_out_write() {
++_stats->total_writes;
++_stats->total_writes_failed;

View File

@@ -19,6 +19,7 @@
#include "types/user.hh"
#include "utils/assert.hh"
#include "utils/hash.hh"
#include "cell_locking.hh"
#include "db_clock.hh"
#include "gc_clock.hh"
#include <chrono>
@@ -1500,6 +1501,11 @@ struct string_pair_eq {
bool operator()(spair lhs, spair rhs) const;
};
struct counter_update_guard {
utils::phased_barrier::operation op;
std::vector<locked_cell> locks;
};
class db_user_types_storage;
// Policy for sharded<database>:
@@ -1728,11 +1734,12 @@ private:
future<> do_apply_many(const utils::chunked_vector<frozen_mutation>&, db::timeout_clock::time_point timeout);
future<> apply_with_commitlog(column_family& cf, const mutation& m, db::timeout_clock::time_point timeout);
future<mutation> do_apply_counter_update(column_family& cf, const frozen_mutation& fm, schema_ptr m_schema, db::timeout_clock::time_point timeout,
tracing::trace_state_ptr trace_state);
future<mutation> read_and_transform_counter_mutation_to_shards(mutation m, column_family& cf, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout);
template<typename Future>
Future update_write_metrics(Future&& f);
template<typename Future>
Future update_write_metrics_if_failed(Future&& f);
void update_write_metrics_for_timed_out_write();
void update_write_metrics_for_rejected_writes();
future<std::unique_ptr<keyspace>> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, const locator::token_metadata_ptr& token_metadata, system_keyspace system);
@@ -1912,7 +1919,11 @@ public:
// Mutations may be partially visible to reads until restart on exception (FIXME).
future<> apply(const utils::chunked_vector<frozen_mutation>&, db::timeout_clock::time_point timeout);
future<> apply_hint(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout);
future<mutation> apply_counter_update(schema_ptr, const frozen_mutation& m, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
future<counter_update_guard> acquire_counter_locks(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
future<mutation> prepare_counter_update(schema_ptr s, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
future<> apply_counter_update(schema_ptr, const frozen_mutation& fm, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_state);
const sstring& get_snitch_name() const;
/*!
* \brief clear snapshot based on a tag

View File

@@ -2811,8 +2811,10 @@ locator::combined_load_stats tablet_storage_group_manager::table_load_stats(std:
if (tablet_filter(*_tablet_map, gid)) {
const uint64_t tablet_size = sg.live_disk_space_used();
table_stats.size_in_bytes += tablet_size;
const locator::range_based_tablet_id rb_tid {gid.table, _tablet_map->get_token_range(gid.tablet)};
tablet_stats.tablet_sizes[rb_tid] = tablet_size;
const dht::token_range trange = _tablet_map->get_token_range(gid.tablet);
// Make sure the token range is in the form (a, b]
SCYLLA_ASSERT(!trange.start()->is_inclusive() && trange.end()->is_inclusive());
tablet_stats.tablet_sizes[gid.table][trange] = tablet_size;
}
});
return locator::combined_load_stats{

View File

@@ -192,9 +192,7 @@ tablet_map_to_mutations(const tablet_map& tablets, table_id id, const sstring& k
m.set_clustered_cell(ck, "session", data_value(tr_info->session_id.uuid()), ts);
}
}
if (auto next_tid = tablets.next_tablet(tid)) {
tid = *next_tid;
}
tid = *tablets.next_tablet(tid);
}
co_await process_mutation(std::move(m));
}
@@ -577,7 +575,7 @@ void update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hi
namespace {
std::optional<tablet_id> process_one_row(replica::database* db, table_id table, tablet_map& map, tablet_id tid, const cql3::untyped_result_set_row& row) {
tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map, tablet_id tid, const cql3::untyped_result_set_row& row) {
tablet_replica_set tablet_replicas;
if (row.has("replicas")) {
tablet_replicas = deserialize_replica_set(row.get_view("replicas"));
@@ -661,7 +659,7 @@ std::optional<tablet_id> process_one_row(replica::database* db, table_id table,
persisted_last_token, current_last_token, table, tid));
}
return map.next_tablet(tid);
return *map.next_tablet(tid);
}
struct tablet_metadata_builder {
@@ -716,9 +714,7 @@ struct tablet_metadata_builder {
}
if (row.has("last_token")) {
if (auto next_tid = process_one_row(db, current->table, current->map, current->tid, row)) {
current->tid = *next_tid;
}
current->tid = process_one_row(db, current->table, current->map, current->tid, row);
}
}

View File

@@ -133,10 +133,11 @@ check_jenkins_job_status() {
lastCompletedJobName="$jenkins_url/job/$jenkins_job/lastCompletedBuild"
getBuildResult=$(curl -s --user $JENKINS_USERNAME:$JENKINS_API_TOKEN $lastCompletedJobName/api/json?tree=result)
if [[ "$getBuildResult" == "*Unauthorized*" ]]; then
echo -e "${ORANGE}WARNING:${NC} Failed to authenticate with Jenkins. please check your JENKINS_USERNAME and JENKINS_API_TOKEN setting"
if [[ $getBuildResult =~ (Access Denied|401 Unauthorized) ]]; then
echo -e "${ORANGE}WARNING:${NC} Access Denied to $lastCompletedJobName. \nPlease check your JENKINS_USERNAME and JENKINS_API_TOKEN setting"
exit 1
fi
lastCompleted=$(echo "$getBuildResult" | jq -r '.result')
if [[ "$lastCompleted" == "SUCCESS" ]]; then

View File

@@ -66,19 +66,6 @@ future<paxos_state::replica_guard> paxos_state::get_replica_lock(const schema& s
// Once the global barrier completes, no requests remain on the old shard,
// so we can safely switch to acquiring locks only on the new shard.
auto shards = s.table().get_effective_replication_map()->shards_ready_for_reads(s, token);
if (const auto it = std::ranges::find(shards, this_shard_id()); it == shards.end()) {
const auto& erm = s.table().get_effective_replication_map();
const auto& rs = erm->get_replication_strategy();
sstring tablet_map_desc;
if (rs.uses_tablets()) {
const auto& tablet_map = erm->get_token_metadata().tablets().get_tablet_map(s.id());
tablet_map_desc = ::format(", tablet id {}, tablet map {}",
tablet_map.get_tablet_id(token), tablet_map);
}
on_internal_error(paxos_state::logger,
format("invalid shard, shards {}, token {}{}", shards, token, tablet_map_desc));
}
std::ranges::sort(shards);
replica_guard replica_guard;

View File

@@ -3582,32 +3582,60 @@ storage_proxy::mutate_counters_on_leader(utils::chunked_vector<frozen_mutation_a
{
auto& update_ms = mutations;
co_await coroutine::parallel_for_each(update_ms, [&] (frozen_mutation_and_schema& fm_a_s) -> future<> {
co_await mutate_counter_on_leader_and_replicate(fm_a_s.s, std::move(fm_a_s.fm), cl, timeout, trace_state, permit, fence, caller);
auto erm = _db.local().find_column_family(fm_a_s.s).get_effective_replication_map();
auto shard = erm->get_sharder(*fm_a_s.s).shard_for_reads(fm_a_s.fm.token(*fm_a_s.s));
bool local = shard == this_shard_id();
get_stats().replica_cross_shard_ops += !local;
return container().invoke_on(shard, {_write_smp_service_group, timeout}, [gs = global_schema_ptr(fm_a_s.s), &fm = fm_a_s.fm, cl, timeout, gt = tracing::global_trace_state_ptr(trace_state), permit, local, fence, caller] (storage_proxy& sp) mutable -> future<> {
auto p = local ? std::move(permit) : empty_service_permit(); // FIXME: either obtain a real permit on this shard or hold original one across shard
return sp.mutate_counter_on_leader_and_replicate(gs, fm, cl, timeout, gt.get(), std::move(p), fence, caller);
});
});
}
}
future<>
storage_proxy::mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation fm, db::consistency_level cl, clock_type::time_point timeout,
storage_proxy::mutate_counter_on_leader_and_replicate(schema_ptr s, const frozen_mutation& fm, db::consistency_level cl, clock_type::time_point timeout,
tracing::trace_state_ptr trace_state, service_permit permit,
fencing_token fence, locator::host_id caller) {
auto erm = _db.local().find_column_family(s).get_effective_replication_map();
// FIXME: This does not handle intra-node tablet migration properly.
// Refs https://github.com/scylladb/scylladb/issues/18180
auto shard = erm->get_sharder(*s).shard_for_reads(fm.token(*s));
bool local = shard == this_shard_id();
get_stats().replica_cross_shard_ops += !local;
return _db.invoke_on(shard, {_write_smp_service_group, timeout}, [&proxy = container(), gs = global_schema_ptr(s), fm = std::move(fm), cl, timeout, gt = tracing::global_trace_state_ptr(std::move(trace_state)), permit = std::move(permit), local, fence, caller] (replica::database& db) {
auto trace_state = gt.get();
auto p = local ? std::move(permit) : /* FIXME: either obtain a real permit on this shard or hold original one across shard */ empty_service_permit();
const auto& rs = gs.get()->table().get_effective_replication_map()->get_replication_strategy();
return proxy.local().run_fenceable_write(rs, fence, caller,
[&db, fm = std::move(fm), timeout, trace_state, gs = std::move(gs)]() mutable {
return db.apply_counter_update(std::move(gs), fm, timeout, std::move(trace_state));
})
.then([&proxy, cl, timeout, trace_state, p = std::move(p)] (mutation m) mutable {
return proxy.local().replicate_counter_from_leader(std::move(m), cl, std::move(trace_state), timeout, std::move(p));
const auto& rs = s->table().get_effective_replication_map()->get_replication_strategy();
return run_fenceable_write(rs, fence, caller, [this, erm, s, &fm, timeout, trace_state] (this auto) -> future<mutation> {
// lock the reading shards in sorted order.
// usually there is only a single read shard, which is the current shard. we need to lock the
// counter on this shard to protect the counter's read-modify-write operation against concurrent updates.
// during intranode migration, the read shard switches, creating a phase where both shards
// may receive updates concurrently for the same counter. therefore, we need to lock both shards.
auto lock_shards = erm->shards_ready_for_reads(*s, fm.token(*s));
std::ranges::sort(lock_shards);
using foreign_counter_guard = foreign_ptr<lw_shared_ptr<replica::counter_update_guard>>;
utils::small_vector<foreign_counter_guard, 2> counter_locks;
for (auto shard : lock_shards) {
counter_locks.push_back(co_await _db.invoke_on(shard, {_write_smp_service_group, timeout},
[s = global_schema_ptr(s), &fm, gtr = tracing::global_trace_state_ptr(trace_state), timeout] (replica::database& db) mutable -> future<foreign_counter_guard> {
return db.acquire_counter_locks(s, fm, timeout, gtr.get()).then([] (replica::counter_update_guard g) {
return make_foreign(make_lw_shared<replica::counter_update_guard>(std::move(g)));
});
}));
}
// read the current counter value and transform the counter update mutation
auto m = co_await _db.local().prepare_counter_update(s, fm, timeout, trace_state);
auto apply = [this, erm, &m, trace_state, timeout] (shard_id shard) {
return _db.invoke_on(shard, {_write_smp_service_group, timeout},
[s = global_schema_ptr(m.schema()), fm = freeze(m), gtr = tracing::global_trace_state_ptr(trace_state), timeout] (replica::database& db) mutable -> future<> {
return db.apply_counter_update(s, std::move(fm), timeout, gtr.get());
});
};
co_await apply_on_shards(erm, *s, m.token(), std::move(apply));
co_return std::move(m);
}).then([this, cl, timeout, trace_state, permit = std::move(permit)] (mutation m) mutable {
return replicate_counter_from_leader(std::move(m), cl, std::move(trace_state), timeout, std::move(permit));
});
}

View File

@@ -495,7 +495,7 @@ private:
future<> mutate_counters_on_leader(utils::chunked_vector<frozen_mutation_and_schema> mutations, db::consistency_level cl, clock_type::time_point timeout,
tracing::trace_state_ptr trace_state, service_permit permit,
fencing_token fence, locator::host_id caller);
future<> mutate_counter_on_leader_and_replicate(const schema_ptr& s, frozen_mutation m, db::consistency_level cl, clock_type::time_point timeout,
future<> mutate_counter_on_leader_and_replicate(schema_ptr s, const frozen_mutation& m, db::consistency_level cl, clock_type::time_point timeout,
tracing::trace_state_ptr trace_state, service_permit permit,
fencing_token fence, locator::host_id caller);

View File

@@ -4293,6 +4293,36 @@ future<> storage_service::raft_removenode(locator::host_id host_id, locator::hos
rtlogger.info("Removenode succeeded. Request ID: {}", request_id);
}
future<> storage_service::mark_excluded(const std::vector<locator::host_id>& hosts) {
while (true) {
auto guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
std::unordered_set<raft::server_id> raft_hosts;
for (auto host : hosts) {
if (_gossiper.is_alive(host)) {
const std::string message = ::format("Cannot mark host {} as excluded because it's alive", host);
rtlogger.warn("{}", message);
throw std::runtime_error(message);
}
raft_hosts.insert(raft::server_id(host.uuid()));
}
topology_mutation_builder builder(guard.write_timestamp());
builder.add_ignored_nodes(raft_hosts);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("Mark as excluded: {}", hosts));
rtlogger.info("Marking nodes as excluded: {}, previous set: {}", hosts, _topology_state_machine._topology.ignored_nodes);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
} catch (group0_concurrent_modification&) {
rtlogger.info("mark_excluded: concurrent operation is detected, retrying.");
continue;
}
rtlogger.info("Nodes marked as excluded: {}", hosts);
break;
}
}
future<> storage_service::removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params) {
return run_with_api_lock_in_gossiper_mode_only(sstring("removenode"), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
@@ -8236,6 +8266,20 @@ void storage_service::set_topology_change_kind(topology_change_kind kind) {
_gossiper.set_topology_state_machine(kind == topology_change_kind::raft ? & _topology_state_machine : nullptr);
}
bool storage_service::raft_topology_change_enabled() const {
if (this_shard_id() != 0) {
on_internal_error(slogger, "raft_topology_change_enabled() must run on shard 0");
}
return _topology_change_kind_enabled == topology_change_kind::raft;
}
bool storage_service::legacy_topology_change_enabled() const {
if (this_shard_id() != 0) {
on_internal_error(slogger, "legacy_topology_change_enabled() must run on shard 0");
}
return _topology_change_kind_enabled == topology_change_kind::legacy;
}
future<> storage_service::register_protocol_server(protocol_server& server, bool start_instantly) {
_protocol_servers.push_back(&server);
if (start_instantly) {
@@ -8251,7 +8295,7 @@ future<> storage_service::query_cdc_timestamps(table_id table, bool ascending, n
return _cdc_gens.local().query_cdc_timestamps(table, ascending, std::move(f));
}
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
future<> storage_service::query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f) {
return _cdc_gens.local().query_cdc_streams(table, std::move(f));
}

View File

@@ -390,7 +390,7 @@ public:
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const std::vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
private:
inet_address get_broadcast_address() const noexcept {
@@ -785,6 +785,7 @@ public:
* @param hostIdString token for the node
*/
future<> removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes);
future<> mark_excluded(const std::vector<locator::host_id>&);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req);
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
@@ -914,12 +915,8 @@ private:
topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const;
public:
bool raft_topology_change_enabled() const {
return _topology_change_kind_enabled == topology_change_kind::raft;
}
bool legacy_topology_change_enabled() const {
return _topology_change_kind_enabled == topology_change_kind::legacy;
}
bool raft_topology_change_enabled() const;
bool legacy_topology_change_enabled() const;
private:
future<> _raft_state_monitor = make_ready_future<>();

View File

@@ -1263,7 +1263,13 @@ public:
}
} else {
for (auto rack : rf_in_dc->get_rack_list()) {
auto shards = shards_per_rack.at(endpoint_dc_rack{dc, rack});
size_t shards = 0;
auto dc_rack = endpoint_dc_rack{dc, rack};
if (!shards_per_rack.contains(dc_rack)) {
lblogger.warn("No shards for rack {}, but table {}.{} replicates there", rack, s.ks_name(), s.cf_name());
} else {
shards = shards_per_rack.at(dc_rack);
}
size_t tablets_in_rack = std::ceil(min_per_shard_tablet_count * shards);
lblogger.debug("Estimated {} tablets due to min_per_shard_tablet_count={:.3f} for table={}.{} in rack {} ({} shards) in DC {}",
tablets_in_rack, min_per_shard_tablet_count, s.ks_name(), s.cf_name(), rack, shards, dc);

View File

@@ -1511,10 +1511,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
utils::get_local_injector().inject("stream_tablet_fail_on_drain",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
}
utils::get_local_injector().inject("stream_tablet_fail",
[] { throw std::runtime_error("stream_tablet failed due to error injection"); });
if (action_failed(tablet_state.streaming)) {
if (action_failed(tablet_state.streaming) || utils::get_local_injector().enter("stream_tablet_fail")) {
const bool cleanup = utils::get_local_injector().enter("stream_tablet_move_to_cleanup");
bool critical_disk_utilization = false;
if (auto stats = _tablet_allocator.get_load_stats()) {
@@ -1659,12 +1657,19 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del_transition(last_token)
.del_migration_task_info(last_token, _feature_service)
.build());
if (trinfo.pending_replica) {
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *trinfo.pending_replica, last_token);
auto leaving_replica = get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
if (leaving_replica) {
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *leaving_replica, last_token);
}
}
break;
case locator::tablet_transition_stage::end_migration: {
// Move the tablet size in load_stats
auto leaving = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
auto pending = trinfo.pending_replica;
const dht::token_range trange {tmap.get_token_range(gid.tablet)};
migrate_tablet_size(leaving->host, pending->host, gid, trange);
// Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs.
// See do_tablet_operation() doc.
bool defer_transition = utils::get_local_injector().enter("handle_tablet_migration_end_migration");
@@ -1884,6 +1889,39 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
}
void migrate_tablet_size(locator::host_id leaving, locator::host_id pending, locator::global_tablet_id gid, const dht::token_range trange) {
auto has_tablet_size = [&] (const locator::load_stats& stats, locator::host_id host) {
if (auto host_i = stats.tablet_stats.find(host); host_i != stats.tablet_stats.end()) {
auto& tables = host_i->second.tablet_sizes;
if (auto table_i = tables.find(gid.table); table_i != tables.find(gid.table)) {
if (auto size_i = table_i->second.find(trange); size_i != table_i->second.find(trange)) {
return true;
}
}
}
return false;
};
if (leaving != pending) {
auto old_load_stats = _tablet_allocator.get_load_stats();
if (old_load_stats) {
const locator::load_stats& stats = *old_load_stats;
if (has_tablet_size(stats, leaving) && !has_tablet_size(stats, pending)) {
rtlogger.debug("Moving tablet size for tablet: {} from: {} to: {}", gid, leaving, pending);
auto new_load_stats = make_lw_shared<locator::load_stats>(*old_load_stats);
auto& new_leaving_ts = new_load_stats->tablet_stats.at(leaving);
auto& new_pending_ts = new_load_stats->tablet_stats.at(pending);
auto map_node = new_leaving_ts.tablet_sizes.at(gid.table).extract(trange);
new_pending_ts.tablet_sizes[gid.table].insert(std::move(map_node));
if (new_leaving_ts.tablet_sizes.at(gid.table).empty()) {
new_leaving_ts.tablet_sizes.erase(gid.table);
}
_tablet_allocator.set_load_stats(std::move(new_load_stats));
}
}
}
}
future<> handle_tablet_resize_finalization(group0_guard g) {
co_await utils::get_local_injector().inject("handle_tablet_resize_finalization_wait", [] (auto& handler) -> future<> {
rtlogger.info("handle_tablet_resize_finalization: waiting");
@@ -1953,6 +1991,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet resize finalization"));
if (auto old_load_stats = _tablet_allocator.get_load_stats()) {
guard = co_await start_operation();
auto new_tm = get_token_metadata_ptr();
auto reconciled_stats = old_load_stats->reconcile_tablets_resize(plan.resize_plan().finalize_resize, *tm, *new_tm);
if (reconciled_stats) {
_tablet_allocator.set_load_stats(reconciled_stats);
}
}
}
future<> handle_truncate_table(group0_guard guard) {

View File

@@ -539,17 +539,13 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
}
}
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) const {
void compression_parameters::validate(dicts_feature_enabled dicts_enabled) const {
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
if (!dicts_enabled) {
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
"all nodes are upgraded to a versions which supports it",
algorithm_to_name(_algorithm)));
}
if (!dicts_allowed) {
throw std::runtime_error(std::format("sstable_compression {} has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`",
algorithm_to_name(_algorithm)));
}
}
if (_chunk_length) {
auto chunk_length = _chunk_length.value();

View File

@@ -106,8 +106,7 @@ public:
std::optional<int> zstd_compression_level() const { return _zstd_compression_level; }
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
void validate(dicts_feature_enabled, dicts_usage_allowed) const;
void validate(dicts_feature_enabled) const;
std::map<sstring, sstring> get_options() const;

View File

@@ -11,6 +11,7 @@
#include "sstables/random_access_reader.hh"
#include "utils/disk-error-handler.hh"
#include "utils/log.hh"
#include "utils/fragmented_temporary_buffer.hh"
namespace sstables {
@@ -24,6 +25,15 @@ future <temporary_buffer<char>> random_access_reader::read_exactly(size_t n) noe
}
}
future<fragmented_temporary_buffer> random_access_reader::read_exactly_fragmented(size_t n) noexcept {
try {
fragmented_temporary_buffer::reader reader;
return reader.read_exactly(*_in, n);
} catch (...) {
return current_exception_as_future<fragmented_temporary_buffer>();
}
}
static future<> close_if_needed(std::unique_ptr<input_stream<char>> in) {
if (!in) {
return make_ready_future<>();

View File

@@ -17,6 +17,7 @@
#include <seastar/core/iostream.hh>
#include <seastar/core/temporary_buffer.hh>
#include "seastarx.hh"
#include "utils/fragmented_temporary_buffer.hh"
namespace sstables {
@@ -33,6 +34,8 @@ protected:
public:
future <temporary_buffer<char>> read_exactly(size_t n) noexcept;
future<fragmented_temporary_buffer> read_exactly_fragmented(size_t n) noexcept;
future<> seek(uint64_t pos) noexcept;
bool eof() const noexcept { return _in->eof(); }

View File

@@ -36,6 +36,7 @@
#include "utils/error_injection.hh"
#include "utils/to_string.hh"
#include "utils/fragmented_temporary_buffer.hh"
#include "data_dictionary/storage_options.hh"
#include "dht/sharder.hh"
#include "writer.hh"
@@ -275,9 +276,11 @@ future<> parse(const schema&, sstable_version_types, random_access_reader& in, T
// All composite parsers must come after this
template<typename First, typename... Rest>
future<> parse(const schema& s, sstable_version_types v, random_access_reader& in, First& first, Rest&&... rest) {
return parse(s, v, in, first).then([v, &s, &in, &rest...] {
return parse(s, v, in, std::forward<Rest>(rest)...);
});
auto fut = parse(s, v, in, first);
(..., (void)(fut = fut.then([&s, v, &in, &rest] () mutable {
return parse(s, v, in, std::forward<Rest>(rest));
})));
return fut;
}
// Intended to be used for a type that describes itself through describe_type().
@@ -516,16 +519,26 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read
s.header.memory_size,
s.header.sampling_level,
s.header.size_at_full_sampling);
auto buf = co_await in.read_exactly(s.header.size * sizeof(pos_type));
auto len = s.header.size * sizeof(pos_type);
check_buf_size(buf, len);
// Use fragmented buffer to avoid large contiguous allocations
auto frag_buf = co_await in.read_exactly_fragmented(len);
if (frag_buf.empty()) {
throw bufsize_mismatch_exception(0, len);
}
if (frag_buf.size_bytes() != len) {
throw bufsize_mismatch_exception(frag_buf.size_bytes(), len);
}
// Positions are encoded in little-endian.
auto b = buf.get();
auto stream = frag_buf.get_istream();
s.positions.reserve(s.header.size + 1);
while (s.positions.size() != s.header.size) {
s.positions.push_back(seastar::read_le<pos_type>(b));
b += sizeof(pos_type);
auto pos_result = stream.read<pos_type>();
if (!pos_result) {
std::rethrow_exception(pos_result.assume_error());
}
s.positions.push_back(seastar::le_to_cpu(*pos_result));
co_await coroutine::maybe_yield();
}
// Since the keys in the index are not sized, we need to calculate
@@ -2895,18 +2908,16 @@ future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_
co_return ret;
}
input_stream<char> data_stream;
if (sst->get_compression()) {
data_stream = co_await sst->data_stream(0, sst->ondisk_data_size(), permit,
nullptr, nullptr, sstable::raw_stream::yes);
} else {
data_stream = co_await sst->data_stream(0, sst->data_size(), permit,
input_stream<char> data_stream = co_await (sst->get_compression()
? sst->data_stream(0, sst->ondisk_data_size(), permit,
nullptr, nullptr, sstable::raw_stream::yes)
: sst->data_stream(0, sst->data_size(), permit,
nullptr, nullptr, sstable::raw_stream::no,
integrity_check::yes, [&ret](sstring msg) {
sstlog.error("{}", msg);
ret.status = validate_checksums_status::invalid;
});
}
})
);
auto valid = true;
std::exception_ptr ex;

View File

@@ -550,10 +550,13 @@ future<locator::effective_replication_map_ptr> sstables_loader::await_topology_q
auto expected_topology_version = erm->get_token_metadata().get_version();
auto& ss = _ss.local();
// The awaiting only works with raft enabled, and we only need it with tablets,
// so let's bypass the awaiting when tablet is disabled.
if (!t.uses_tablets()) {
break;
}
// optimistically attempt to grab an erm on quiesced topology
// The awaiting is only needed with tablet over raft, so we're bypassing the check
// when raft is disabled.
if (!ss.raft_topology_change_enabled() || co_await ss.verify_topology_quiesced(expected_topology_version)) {
if (co_await ss.verify_topology_quiesced(expected_topology_version)) {
break;
}
erm = nullptr;

View File

@@ -0,0 +1,273 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
###############################################################################
# Most Alternator tests should limit themselves to the DynamoDB API provided by
# the AWS SDK, and should work when running against any DynamoDB-compatible
# database, including DynamoDB and Scylla - the latter running locally or
# remotely. In particular, tests should generally *not* attempt to look at
# Scylla's log file - as this log file is not available at all when testing
# a remote server.
#
# Nevertheless, in some cases we might want to verify that some specific
# log messages do appear in Scylla's log file. These tests should be
# concentrated in this source file. All these tests are skipped when the
# test is not running against Scylla, or Scylla's log file cannot be
# found - e.g., because Scylla is running on a remote machine, or configured
# not to write to a log file.
#
# The code below automatically figures out where the log file is - when
# Scylla is running locally. First the local Scylla process is detected
# (it is the process listening to the our HTTP requests, if we can find
# one), then its standard output is guessed to be the log file - and then
# we verify that it really is.
#############################################################################
import pytest
import os
import io
import time
import re
import urllib.parse
from contextlib import contextmanager
from botocore.exceptions import ClientError
from .util import new_test_table, scylla_config_temporary
from .test_cql_rbac import new_dynamodb, new_role
# Utility function for trying to find a local process which is listening to
# the given local IP address and port. If such a process exists, return its
# process id (as a string). Otherwise, return None. Note that the local
# process needs to belong to the same user running this test, or it cannot
# be found.
def local_process_id(ip, port):
# Implement something like the shell "lsof -Pni @{ip}:{port}", just
# using /proc without any external shell command.
# First, we look in /proc/net/tcp for a LISTEN socket (state 0x0A) at the
# desired local address. The address is specially-formatted hex of the ip
# and port, with 0100007F:2352 for 127.0.0.1:9042. We check for two
# listening addresses: one is the specific IP address given, and the
# other is listening on address 0 (INADDR_ANY).
ip2hex = lambda ip: ''.join([f'{int(x):02X}' for x in reversed(ip.split('.'))])
port2hex = lambda port: f'{int(port):04X}'
try:
addr1 = ip2hex(ip) + ':' + port2hex(port)
except:
return None
addr2 = ip2hex('0.0.0.0') + ':' + port2hex(port)
LISTEN = '0A'
with open('/proc/net/tcp', 'r') as f:
for line in f:
cols = line.split()
if cols[3] == LISTEN and (cols[1] == addr1 or cols[1] == addr2):
inode = cols[9]
break
else:
# Didn't find a process listening on the given address
return None
# Now look in /proc/*/fd/* for processes that have this socket "inode"
# as one of its open files. We can only find a process that belongs to
# the same user.
target = f'socket:[{inode}]'
for proc in os.listdir('/proc'):
if not proc.isnumeric():
continue
dir = f'/proc/{proc}/fd/'
try:
for fd in os.listdir(dir):
if os.readlink(dir + fd) == target:
# Found the process!
return proc
except:
# Ignore errors. We can't check processes we don't own.
pass
return None
# A fixture to find the Scylla log file, returning the log file's path.
# If the log file cannot be found, or it's not Scylla, the fixture calls
# pytest.skip() to skip any test which uses it. The fixture has module
# scope, so looking for the log file only happens once. Individual tests
# should use the function-scope fixture "logfile" below, which takes care
# of opening the log file for reading in the right place.
# We look for the log file by looking for a local process listening to the
# given DynamoDB API connection, assuming its standard output is the log file,
# and then verifying that this file is a proper Scylla log file.
@pytest.fixture(scope="module")
def logfile_path(dynamodb):
# Split the endpoint URL into host and port part. If the port is not
# explicitly specified, we need to assume it is the default http port
# (80) or default https (443) port, depending on the scheme.
endpoint_url = dynamodb.meta.client._endpoint.host
p = urllib.parse.urlparse(endpoint_url)
# If hostname is a string not an ip address, it's unlikely to be a local
# process anyway... test/alternator/run uses an IP address.
ip = p.hostname
port = p.port
if port is None:
port = 443 if p.scheme == 'https' else 80
pid = local_process_id(ip, port)
if not pid:
pytest.skip("Can't find local process")
# Now that we know the process id, use /proc to find if its standard
# output is redirected to a file. If it is, that's the log file. If it
# isn't a file, we don't known where the user is writing the log...
try:
log = os.readlink(f'/proc/{pid}/fd/1')
except:
pytest.skip("Can't find local log file")
# If the process's standard output is some pipe or device, it's
# not the log file we were hoping for...
if not log.startswith('/') or not os.path.isfile(log):
pytest.skip("Can't find local log file")
# Scylla can be configured to put the log in syslog, not in the standard
# output. So let's verify that the file which we found actually looks
# like a Scylla log and isn't just empty or something... The Scylla log
# file always starts with the line: "Scylla version ... starting ..."
with open(log, 'r') as f:
head = f.read(7)
if head != 'Scylla ':
pytest.skip("Not a Scylla log file")
yield log
# The "logfile" fixture returns the log file open for reading at the end.
# Testing if something appears in the log usually involves taking this
# fixture, and then checking with wait_for_log() for the desired message to
# have appeared. Because each logfile fixture opens the log file separately,
# it is ok if several tests are run in parallel - but they will see each
# other's log messages so should try to ensure that unique strings (e.g.,
# random table names) appear in the log message.
@pytest.fixture(scope="function")
def logfile(logfile_path):
with open(logfile_path, 'r') as f:
f.seek(0, io.SEEK_END)
yield f
# wait_for_log() checks if the log, starting at its current position
# (probably set by the logfile fixture), contains the given message -
# and if it doesn't, calls pytest.fail().
# Because it may take time for the log message to be flushed, and sometimes
# we may want to look for messages about various delayed events, this
# function doesn't give up when it reaches the end of file, and rather
# retries until a given timeout. The timeout may be long, because successful
# tests will not wait for it. Note, however, that long timeouts will make
# xfailing tests slow.
def wait_for_log(logfile, pattern, re_flags=re.MULTILINE, timeout=5):
contents = logfile.read()
prog = re.compile(pattern, re_flags)
if prog.search(contents):
return
end = time.time() + timeout
while time.time() < end:
s = logfile.read()
if s:
# Though unlikely, it is possible that a single message got
# split into two reads, so we need to check (and recheck)
# the entire content since the beginning of this wait :-(
contents = contents + s
if prog.search(contents):
return
time.sleep(0.1)
pytest.fail(f'Timed out ({timeout} seconds) looking for {pattern} in log file. Got:\n' + contents)
# A simple example of testing the log file - we check that a table creation,
# and table deletion, both cause messages to appear on the log.
def test_log_table_operations(dynamodb, logfile):
schema = {
'KeySchema': [{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
'AttributeDefinitions': [{ 'AttributeName': 'p', 'AttributeType': 'S' }]
}
with new_test_table(dynamodb, **schema) as table:
wait_for_log(logfile, f'Creating keyspace alternator_{table.name}')
wait_for_log(logfile, f'Dropping keyspace alternator_{table.name}')
# Test that when alternator_warn_authorization is set to true, WARN-level
# log messages are generated on authentication or authorization errors.
# This is in addition to the metric counting these errors, which are tested
# in test_metrics.py. These are tests for issue #25308.
# We check that alternator_warn_authorization enables log messages regardless
# of what alternator_enforce_authorization is set to. If enforce_authorization
# is also true, a message is logged and the request failed - and if it is
# false the same message is logged and the request succeeds.
#
# It's important to have a regression test that these log messages appear,
# because it is documented that they appear in "warn" mode and users may
# rely on them instead of metrics to learn about auth setup problems.
#
# Note that we do not test that when alternator_warn_authorization is
# set false, warnings are NOT logged - this is less important, and also
# trickier to test what does NOT appear on the log (we definitely don't want
# to wait for a timeout).
#
# We have several tests here, for several kinds of authentication and
# authorization errors.
@contextmanager
def scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
with scylla_config_temporary(dynamodb, 'alternator_enforce_authorization', 'true' if enforce_auth else 'false'):
with scylla_config_temporary(dynamodb, 'alternator_warn_authorization', 'true' if warn_auth else 'false'):
yield
# authentication failure 1: bogus username and secret key
@pytest.mark.parametrize("enforce_auth", [True, False])
def test_log_authentication_failure_1(dynamodb, logfile, test_table_s, enforce_auth):
with scylla_config_auth_temporary(dynamodb, enforce_auth, True):
with new_dynamodb(dynamodb, 'bogus_username', 'bogus_secret_key') as d:
tab = d.Table(test_table_s.name)
# We don't expect get_item() to find any item, it should either
# pass or fail depending on enforce_auth - but in any case it
# should log the error.
try:
tab.get_item(Key={'p': 'dog'})
operation_succeeded = True
except ClientError as e:
assert 'UnrecognizedClientException' in str(e)
operation_succeeded = False
if enforce_auth:
assert not operation_succeeded
else:
assert operation_succeeded
wait_for_log(logfile, '^WARN .*user bogus_username.*client address')
# authentication failure 2: real username, wrong secret key
# Unfortunately, tests that create a new role need to use CQL too.
@pytest.mark.parametrize("enforce_auth", [True, False])
def test_log_authentication_failure_2(dynamodb, cql, logfile, test_table_s, enforce_auth):
with scylla_config_auth_temporary(dynamodb, enforce_auth, True):
with new_role(cql) as (role, key):
with new_dynamodb(dynamodb, role, 'bogus_secret_key') as d:
tab = d.Table(test_table_s.name)
try:
tab.get_item(Key={'p': 'dog'})
operation_succeeded = True
except ClientError as e:
assert 'UnrecognizedClientException' in str(e)
operation_succeeded = False
if enforce_auth:
assert not operation_succeeded
else:
assert operation_succeeded
wait_for_log(logfile, f'^WARN .*wrong signature for user {role}.*client address')
# Authorization failure - a valid user but without permissions to do a
# given operation.
@pytest.mark.parametrize("enforce_auth", [True, False])
def test_log_authorization_failure(dynamodb, cql, logfile, test_table_s, enforce_auth):
with scylla_config_auth_temporary(dynamodb, enforce_auth, True):
with new_role(cql) as (role, key):
with new_dynamodb(dynamodb, role, key) as d:
tab = d.Table(test_table_s.name)
# The new role is not a superuser, so should not have
# permissions to read from this table created earlier by
# the superuser.
try:
tab.get_item(Key={'p': 'dog'})
operation_succeeded = True
except ClientError as e:
assert 'AccessDeniedException' in str(e)
operation_succeeded = False
if enforce_auth:
assert not operation_succeeded
else:
assert operation_succeeded
wait_for_log(logfile, f'^WARN .*SELECT access on table.*{test_table_s.name} is denied to role {role}.*client address')

View File

@@ -34,6 +34,7 @@ import requests
from botocore.exceptions import ClientError
from test.alternator.test_manual_requests import get_signed_request
from test.alternator.test_cql_rbac import new_dynamodb, new_role
from test.alternator.util import random_string, new_test_table, is_aws, scylla_config_read, scylla_config_temporary
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
@@ -861,6 +862,119 @@ def test_ttl_stats(dynamodb, metrics, alternator_ttl_period_in_seconds):
time.sleep(0.1)
assert not 'Item' in table.get_item(Key={'p': p0})
# The following tests check the authentication and authorization failure
# counters:
# * scylla_alternator_authentication_failures
# * scylla_alternator_authorization_failures
# as well as their interaction with the alternator_enforce_authorization
# and alternator_warn_authorization configuration options:
#
# 1. When alternator_enforce_authorization and alternator_warn_authorization
# are both set to "false", these two metrics aren't incremented (and
# operations are allowed).
# 2. When alternator_enforce_authorization is set to false but
# alternator_warn_authorization is set to true, the two metrics are
# incremented but the operations are still allowed.
# 3. When alternator_enforce_authorization is set to "true", the two metrics
# are incremented and the operations are not allowed.
#
# We have several tests here, for several kinds of authentication and
# authorization errors. These are tests for issue #25308.
@contextmanager
def scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
with scylla_config_temporary(dynamodb, 'alternator_enforce_authorization', 'true' if enforce_auth else 'false'):
with scylla_config_temporary(dynamodb, 'alternator_warn_authorization', 'true' if warn_auth else 'false'):
yield
# authentication failure 1: bogus username and secret key
@pytest.mark.parametrize("enforce_auth", [True, False])
@pytest.mark.parametrize("warn_auth", [True, False])
def test_authentication_failure_1(dynamodb, metrics, test_table_s, enforce_auth, warn_auth):
with scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
with new_dynamodb(dynamodb, 'bogus_username', 'bogus_secret_key') as d:
# We don't expect get_item() to find any item, we just care if
# to see if it experiences an authentication failure, and if
# it increments the authentication failure metric.
saved_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
tab = d.Table(test_table_s.name)
try:
tab.get_item(Key={'p': 'dog'})
operation_succeeded = True
except ClientError as e:
assert 'UnrecognizedClientException' in str(e)
operation_succeeded = False
if enforce_auth:
assert not operation_succeeded
else:
assert operation_succeeded
new_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
if warn_auth or enforce_auth:
# If Alternator has any reason to check the authentication
# headers (i.e., either warn_auth or enforce_auth is enabled)
# then it will count the errors.
assert new_auth_failures == saved_auth_failures + 1
else:
# If Alternator has no reason to check the authentication
# headers (i.e., both warn_auth and enforce_auth are off)
# it won't check - and won't find or count auth errors.
assert new_auth_failures == saved_auth_failures
# authentication failure 2: real username, wrong secret key
# Unfortunately, tests that create a new role need to use CQL too.
@pytest.mark.parametrize("enforce_auth", [True, False])
@pytest.mark.parametrize("warn_auth", [True, False])
def test_authentication_failure_2(dynamodb, cql, metrics, test_table_s, enforce_auth, warn_auth):
with scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
with new_role(cql) as (role, key):
with new_dynamodb(dynamodb, role, 'bogus_secret_key') as d:
saved_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
tab = d.Table(test_table_s.name)
try:
tab.get_item(Key={'p': 'dog'})
operation_succeeded = True
except ClientError as e:
assert 'UnrecognizedClientException' in str(e)
operation_succeeded = False
if enforce_auth:
assert not operation_succeeded
else:
assert operation_succeeded
new_auth_failures = get_metric(metrics, 'scylla_alternator_authentication_failures')
if enforce_auth or warn_auth:
assert new_auth_failures == saved_auth_failures + 1
else:
assert new_auth_failures == saved_auth_failures
# Authorization failure - a valid user but without permissions to do a
# given operation.
@pytest.mark.parametrize("enforce_auth", [True, False])
@pytest.mark.parametrize("warn_auth", [True, False])
def test_authorization_failure(dynamodb, cql, metrics, test_table_s, enforce_auth, warn_auth):
with scylla_config_auth_temporary(dynamodb, enforce_auth, warn_auth):
with new_role(cql) as (role, key):
with new_dynamodb(dynamodb, role, key) as d:
saved_auth_failures = get_metric(metrics, 'scylla_alternator_authorization_failures')
tab = d.Table(test_table_s.name)
try:
# Note that the new role is not a superuser, so should
# not have permissions to read from this table created
# earlier by the superuser.
tab.get_item(Key={'p': 'dog'})
operation_succeeded = True
except ClientError as e:
assert 'AccessDeniedException' in str(e)
operation_succeeded = False
if enforce_auth:
assert not operation_succeeded
else:
assert operation_succeeded
new_auth_failures = get_metric(metrics, 'scylla_alternator_authorization_failures')
if enforce_auth or warn_auth:
assert new_auth_failures == saved_auth_failures + 1
else:
assert new_auth_failures == saved_auth_failures
# TODO: there are additional metrics which we don't yet test here. At the
# time of this writing they are:
# reads_before_write, write_using_lwt, shard_bounce_for_lwt,

View File

@@ -2202,7 +2202,7 @@ SEASTAR_THREAD_TEST_CASE(test_construct_next_stream_set) {
};
auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) {
std::vector<cdc::stream_id> stream_ids;
utils::chunked_vector<cdc::stream_id> stream_ids;
for (auto t : tokens) {
stream_ids.push_back(stream_id_for_token(t));
}
@@ -2311,7 +2311,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) {
};
auto tokens_to_stream_ids = [&stream_id_for_token] (const stream_set& tokens) {
std::vector<cdc::stream_id> stream_ids;
utils::chunked_vector<cdc::stream_id> stream_ids;
for (auto t : tokens) {
stream_ids.push_back(stream_id_for_token(t));
}
@@ -2406,7 +2406,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_generate_stream_diff) {
struct cdc_gc_test_config {
table_id table;
std::vector<std::vector<cdc::stream_id>> streams;
std::vector<utils::chunked_vector<cdc::stream_id>> streams;
size_t new_base_stream;
};
@@ -2522,11 +2522,11 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) {
// as the base and the history is empty
auto table = table_id(utils::UUID_gen::get_time_UUID());
std::vector<cdc::stream_id> streams0;
utils::chunked_vector<cdc::stream_id> streams0;
for (auto t : {10, 20, 30}) {
streams0.emplace_back(dht::token(t), 0);
}
std::vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
utils::chunked_vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
cdc_gc_test_config test1 = {
.table = table,
@@ -2551,12 +2551,12 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_mutations) {
// as the base and one history entry for open 50
auto table = table_id(utils::UUID_gen::get_time_UUID());
std::vector<cdc::stream_id> streams0;
utils::chunked_vector<cdc::stream_id> streams0;
for (auto t : {10, 20, 30}) {
streams0.emplace_back(dht::token(t), 0);
}
std::vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
std::vector<cdc::stream_id> streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)};
utils::chunked_vector<cdc::stream_id> streams1 = {streams0[0], streams0[2], cdc::stream_id(dht::token(40), 0)};
utils::chunked_vector<cdc::stream_id> streams2 = {streams0[0], streams0[2], streams1[2], cdc::stream_id(dht::token(50), 0)};
cdc_gc_test_config test2 = {
.table = table,
@@ -2584,7 +2584,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_gc_get_new_base) {
auto tp = base_time + offset;
auto ts = std::chrono::duration_cast<api::timestamp_clock::duration>(tp.time_since_epoch()).count();
streams_map[ts] = cdc::committed_stream_set{tp, std::vector<cdc::stream_id>{}};
streams_map[ts] = cdc::committed_stream_set{tp, utils::chunked_vector<cdc::stream_id>{}};
}
return streams_map;
};

View File

@@ -2015,7 +2015,7 @@ SEASTAR_TEST_CASE(test_table_compression) {
e.execute_cql("create table tb6 (foo text PRIMARY KEY, bar text);").get();
BOOST_REQUIRE(e.local_db().has_schema("ks", "tb6"));
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb6")->get_compressor_params().get_algorithm() == compression_parameters::algorithm::lz4);
BOOST_REQUIRE(e.local_db().find_schema("ks", "tb6")->get_compressor_params().get_algorithm() == e.local_db().get_config().sstable_compression_user_table_options().get_algorithm());
});
}

View File

@@ -270,19 +270,16 @@ SEASTAR_THREAD_TEST_CASE(sstable_directory_test_temporary_statistics) {
auto sst = make_sstable_for_this_shard(std::bind(new_env_sstable, std::ref(env.local())));
auto tempstr = test(sst).filename(component_type::TemporaryStatistics);
tests::touch_file(tempstr.native()).get();
auto tempstat = fs::canonical(tempstr);
auto statstr = test(sst).filename(component_type::Statistics);
with_sstable_directory(env, [&] (sharded<sstables::sstable_directory>& sstdir_ok) {
auto expect_ok = distributed_loader_for_tests::process_sstable_dir(sstdir_ok, {});
BOOST_REQUIRE_NO_THROW(expect_ok.get());
const auto& dir = env.local().tempdir();
lister::scan_dir(dir.path(), lister::dir_entry_types::of<directory_entry_type::regular>(), [tempstat] (fs::path parent_dir, directory_entry de) {
BOOST_REQUIRE(fs::canonical(parent_dir / fs::path(de.name)) != tempstat);
return make_ready_future<>();
}).get();
BOOST_REQUIRE(!file_exists(tempstr.native()).get());
BOOST_REQUIRE(file_exists(statstr.native()).get()); // sanity check that we didn't miss the directory itself
});
remove_file(test(sst).filename(sstables::component_type::Statistics).native()).get();
remove_file(statstr.native()).get();
with_sstable_directory(env, [] (sharded<sstables::sstable_directory>& sstdir_fatal) {
auto expect_malformed_sstable = distributed_loader_for_tests::process_sstable_dir(sstdir_fatal, {});

View File

@@ -692,36 +692,48 @@ SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) {
BOOST_REQUIRE(b.empty());
};
{
auto in = make_is();
expect(in, buf1);
expect(in, buf2);
expect_eof(in);
}
in = make_is();
{
auto in = make_is();
in.skip(0).get();
expect(in, buf1);
expect(in, buf2);
expect_eof(in);
}
in = make_is();
{
auto in = make_is();
expect(in, buf1);
in.skip(0).get();
expect(in, buf2);
expect_eof(in);
}
in = make_is();
{
auto in = make_is();
expect(in, buf1);
in.skip(opts.buffer_size).get();
expect_eof(in);
}
in = make_is();
{
auto in = make_is();
in.skip(opts.buffer_size * 2).get();
expect_eof(in);
}
in = make_is();
{
auto in = make_is();
in.skip(opts.buffer_size).get();
in.skip(opts.buffer_size).get();
expect_eof(in);
}
});
}

View File

@@ -2142,6 +2142,30 @@ SEASTAR_THREAD_TEST_CASE(test_per_shard_count_respected_with_rack_list) {
}, cfg).get();
}
// Reproduces https://github.com/scylladb/scylladb/issues/26768
SEASTAR_THREAD_TEST_CASE(test_replacing_last_node_in_rack_with_rack_list_rf) {
cql_test_config cfg{};
cfg.db_config->tablets_initial_scale_factor.set(10);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto dc = topo.dc();
auto host1 = topo.add_node(node_state::normal, 1, rack1);
auto host2 = topo.add_node(node_state::normal, 1, rack2);
auto ks_name = add_keyspace_racks(e, {{dc, {rack1.rack, rack2.rack}}});
auto table = add_table(e, ks_name).get();
topo.set_node_state(host2, node_state::left);
rebalance_tablets(e);
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_shrinks_respecting_rack_allocation) {
cql_test_config cfg{};
cfg.db_config->tablets_per_shard_goal.set(10);
@@ -3681,6 +3705,143 @@ SEASTAR_THREAD_TEST_CASE(test_creating_lots_of_tables_doesnt_overflow_metadata)
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile) {
auto cfg = tablet_cql_test_config();
// This test checks the correctness of the load_stats reconciliation algorithm.
// We only attempt to reconcile tablet_sizes after a merge or a split.
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
const size_t tablet_count = 16;
auto host = topo.add_node(node_state::normal, 4);
auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count);
sstring table_name = "table_1";
e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get();
table_id table = e.local_db().find_schema(ks_name, table_name)->id();
auto& stm = e.shared_token_metadata().local();
token_metadata_ptr old_tmptr = stm.get();
auto& tmap = stm.get()->tablets().get_tablet_map(table);
auto set_tablet_count = [&] (size_t new_tablet_count) {
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map new_tmap(new_tablet_count);
tmeta.set_tablet_map(table, std::move(new_tmap));
return make_ready_future<>();
});
};
// This checks if the tablet sizes have been correctly reconciled after a merge
{
locator::load_stats stats;
locator::tablet_load_stats& tls = stats.tablet_stats[host];
for (size_t i = 0; i < tablet_count; ++i) {
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
tls.tablet_sizes[table][range] = i;
}
size_t tablet_count_after_merge = tablet_count / 2;
set_tablet_count(tablet_count_after_merge);
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get());
BOOST_REQUIRE(reconciled_stats_ptr);
locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host];
BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_merge);
locator::tablet_map tmap_after_merge(tablet_count_after_merge);
for (size_t i = 0; i < tablet_count_after_merge; ++i) {
dht::token_range trange {tmap_after_merge.get_token_range(locator::tablet_id{i})};
const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange);
uint64_t expected_sum = 0;
for (uint64_t i_sum = 0; i_sum < 2; ++i_sum) {
expected_sum += i * 2 + i_sum;
}
BOOST_REQUIRE_EQUAL(reconciled_tablet_size, expected_sum);
}
}
// This checks if the tablet sizes have been correctly reconciled after a split
{
locator::load_stats stats;
locator::tablet_load_stats& tls = stats.tablet_stats[host];
for (size_t i = 0; i < tablet_count; ++i) {
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
tls.tablet_sizes[table][range] = i * 2;
}
size_t tablet_count_after_split = tablet_count * 2;
set_tablet_count(tablet_count_after_split);
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tmptr, *stm.get());
BOOST_REQUIRE(reconciled_stats_ptr);
locator::tablet_load_stats& reconciled_tls = reconciled_stats_ptr->tablet_stats[host];
BOOST_REQUIRE_EQUAL(reconciled_tls.tablet_sizes.at(table).size(), tablet_count_after_split);
locator::tablet_map tmap_after_split(tablet_count_after_split);
for (size_t i = 0; i < tablet_count_after_split; ++i) {
dht::token_range trange {tmap_after_split.get_token_range(locator::tablet_id{i})};
const uint64_t reconciled_tablet_size = reconciled_tls.tablet_sizes.at(table).at(trange);
BOOST_REQUIRE_EQUAL(reconciled_tablet_size, i / 2);
}
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_stats_tablet_reconcile_tablet_not_found) {
auto cfg = tablet_cql_test_config();
// This test checks if the reconcile tablet algorithm returns nullptr when it
// can't find all the tablet sizes in load_stats
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
auto dc = topo.dc();
const size_t tablet_count = 16;
auto host = topo.add_node(node_state::normal, 4);
auto ks_name = add_keyspace(e, {{dc, 1}}, tablet_count);
sstring table_name = "table_1";
e.execute_cql(fmt::format("CREATE TABLE {}.{} (p1 text, r1 int, PRIMARY KEY (p1))", ks_name, table_name)).get();
table_id table = e.local_db().find_schema(ks_name, table_name)->id();
auto& stm = e.shared_token_metadata().local();
auto& tmap = stm.get()->tablets().get_tablet_map(table);
locator::load_stats stats;
locator::tablet_load_stats& tls = stats.tablet_stats[host];
// Add all tablet sizes except the last one. This will cause reconcile to return a nullptr
for (size_t i = 0; i < tablet_count - 1; ++i) {
const dht::token_range range {tmap.get_token_range(tablet_id(i))};
tls.tablet_sizes[table][range] = i;
}
token_metadata_ptr old_tm { stm.get() };
auto set_tablet_count = [&] (size_t new_tablet_count) {
mutate_tablets(e, [&] (tablet_metadata& tmeta) -> future<> {
tablet_map new_tmap(new_tablet_count);
tmeta.set_tablet_map(table, std::move(new_tmap));
return make_ready_future<>();
});
};
// Test if merge reconcile detects a missing sibling tablet in load_stats
set_tablet_count(tablet_count / 2);
auto reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get());
BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr);
// Test if split reconcile detects a missing tablet in load_stats
set_tablet_count(tablet_count * 2);
reconciled_stats_ptr = stats.reconcile_tablets_resize({ table }, *old_tm, *stm.get());
BOOST_REQUIRE_EQUAL(reconciled_stats_ptr.get(), nullptr);
}, cfg).get();
}
SEASTAR_TEST_CASE(test_tablet_id_and_range_side) {
static constexpr size_t tablet_count = 128;
locator::tablet_map tmap(tablet_count);

View File

@@ -11,6 +11,7 @@ import logging
import random
import re
import time
from collections import defaultdict
from functools import cached_property
from functools import wraps
from typing import List, Dict, Callable
@@ -147,13 +148,24 @@ class Worker:
update.serial_consistency_level = ConsistencyLevel.LOCAL_SERIAL
try:
res = await self.cql.run_async(update)
applied = bool(res and res[0].applied)
assert applied, f"LWT not applied: pk={pk} s{self.worker_id} new={new_val} guard={guard_vals} prev={prev_val}"
except (WriteTimeout, OperationTimedOut, ReadTimeout) as e:
if not is_uncertainty_timeout(e):
raise
applied = await self.verify_update_through_select(pk, new_val, prev_val)
else:
applied = bool(res and res[0].applied)
if not applied:
logger.error(
"LWT_NOT_APPLIED pk=%r worker=s%d new=%r guard=%r prev=%r ts_ms=%d",
pk,
self.worker_id,
new_val,
guard_vals,
prev_val,
)
raise AssertionError(
f"LWT not applied: pk={pk} s{self.worker_id} new={new_val} guard={guard_vals} prev={prev_val}"
)
if applied:
self.on_applied(pk, self.worker_id, new_val)
self.success_counts[pk] += 1
@@ -189,7 +201,7 @@ class BaseLWTTester:
self.pk_to_token: Dict[int, int] = {}
self.migrations = 0
self.phase = "warmup" # "warmup" -> "migrating" -> "post"
self.phase_ops = {"warmup": 0, "migrating": 0, "post": 0}
self.phase_ops = defaultdict(int)
def _get_lower_bound(self, pk: int, col_idx: int) -> int:
return self.lb_counts[pk][col_idx]

View File

@@ -0,0 +1,224 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import random
import pytest
from test.cluster.conftest import skip_mode
from test.cluster.lwt.lwt_common import (
BaseLWTTester,
wait_for_tablet_count,
DEFAULT_WORKERS,
DEFAULT_NUM_KEYS,
)
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
from test.pylib.tablets import get_tablet_count
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Test constants
TARGET_RESIZE_COUNT = 20
WARMUP_LWT_CNT = 100
POST_LWT_CNT = 100
PHASE_WARMUP = "warmup"
PHASE_POST = "post"
PHASE_RESIZE = "resize"
MIN_TABLETS = 1
MAX_TABLETS = 20
RESIZE_TIMEOUT = 240
def powers_of_two_in_range(lo: int, hi: int):
if lo > hi or hi < 1:
return []
lo = max(1, lo)
start_e = (lo - 1).bit_length()
end_e = hi.bit_length()
return [1 << e for e in range(start_e, end_e + 1) if (1 << e) <= hi]
async def run_random_resizes(
stop_event_: asyncio.Event,
manager: ManagerClient,
servers,
tester: BaseLWTTester,
ks: str,
table: str,
target_steps: int = TARGET_RESIZE_COUNT,
pause_range=(0.5, 2.0)
):
"""
Perform randomized tablet count changes (splits/merges) until target resize count is reached
or stop_event_ is set. Returns a dict with simple stats.
"""
split_count = 0
merge_count = 0
current_resize_count = 0
pow2_targets = powers_of_two_in_range(MIN_TABLETS, MAX_TABLETS)
while not stop_event_.is_set() and current_resize_count < target_steps:
current_count = await get_tablet_count(manager, servers[0], ks, table)
candidates = [t for t in pow2_targets if t != current_count]
target_cnt = random.choice(candidates)
direction = "split" if target_cnt > current_count else "merge"
logger.info(
"[%s] starting: %s.%s tablet_count %d -> target %d",
direction.upper(),
ks,
table,
current_count,
target_cnt,
)
# Apply resize
await tester.cql.run_async(
f"ALTER TABLE {ks}.{table} WITH tablets = {{'min_tablet_count': {target_cnt}}}"
)
count_after_resize = await wait_for_tablet_count(
manager, servers[0], tester.ks, tester.tbl,
predicate=(
(lambda c, tgt=target_cnt: c >= tgt)
if direction == "split"
else (lambda c, tgt=target_cnt: c <= tgt)
),
target=target_cnt,
timeout_s=RESIZE_TIMEOUT
)
if direction == "split":
logger.info(
"[SPLIT] converged: %s.%s tablet_count %d -> %d (target %d)",
ks,
table,
current_count,
count_after_resize,
target_cnt,
)
assert count_after_resize >= current_count, (
f"Tablet count expected to be increased during split (was {current_count}, now {count_after_resize})"
)
split_count += 1
else:
logger.info(
"[MERGE] converged: %s.%s tablet_count %d -> %d (target %d)",
ks,
table,
current_count,
count_after_resize,
target_cnt,
)
assert count_after_resize <= current_count, (
f"Tablet count expected to be decreased during merge (was {current_count}, now {count_after_resize})"
)
merge_count += 1
current_resize_count += 1
await asyncio.sleep(random.uniform(*pause_range))
return {
"steps_done": current_resize_count,
"seen_split": split_count,
"seen_merge": merge_count,
}
@pytest.mark.asyncio
@skip_mode("release", "error injections are not supported in release mode")
@skip_mode("debug", "debug mode is too slow for this test")
async def test_multi_column_lwt_during_split_merge(manager: ManagerClient):
"""
Test scenario:
1. Start N servers with tablets enabled
2. Create keyspace/table
3. Insert rows, precompute pk->token
4. Start LWT workers
5. Run randomized tablet resizing in parallel
6. Stop workers and verify consistency
"""
cfg = {
"enable_tablets": True,
"tablet_load_stats_refresh_interval_in_seconds": 1,
"target-tablet-size-in-bytes": 1024 * 16,
}
properties = [
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"},
]
cmdline = [
'--logger-log-level', 'paxos=trace'
]
servers = await manager.servers_add(6, config=cfg, cmdline=cmdline, property_file=properties)
async with new_test_keyspace(
manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} "
"AND tablets = {'initial': 1}",
) as ks:
stop_event_ = asyncio.Event()
table = "lwt_split_merge_table"
tester = BaseLWTTester(
manager,
ks,
table,
num_workers=DEFAULT_WORKERS,
num_keys=DEFAULT_NUM_KEYS,
)
await tester.create_schema()
await tester.initialize_rows()
await tester.start_workers(stop_event_)
try:
# Phase 1: warmup LWT (100 applied CAS)
tester.set_phase(PHASE_WARMUP)
logger.info("LWT warmup: waiting for %d applied CAS", WARMUP_LWT_CNT)
await tester.wait_for_phase_ops(stop_event_, PHASE_WARMUP, WARMUP_LWT_CNT, timeout=180, poll=0.2)
logger.info("LWT warmup complete: %d ops", tester.get_phase_ops(PHASE_WARMUP))
# Phase 2: randomized resizes with LWT running
logger.info(f"LWT during split/merge phase starting")
tester.set_phase(PHASE_RESIZE)
resize_stats = await run_random_resizes(
stop_event_=stop_event_,
manager=manager,
servers=servers,
tester=tester,
ks=ks,
table=table,
target_steps=TARGET_RESIZE_COUNT,
)
logger.info("LWT resize complete: %d ops", tester.get_phase_ops(PHASE_RESIZE))
# Phase 3: post resize LWT (100 applied CAS)
tester.set_phase(PHASE_POST)
logger.info("LWT post resize: waiting for %d applied CAS", POST_LWT_CNT)
await tester.wait_for_phase_ops(stop_event_, PHASE_POST, POST_LWT_CNT, timeout=180, poll=0.2)
logger.info("LWT post resize complete: %d ops", tester.get_phase_ops(PHASE_POST))
logger.info(
"Randomized resize complete: steps_done=%d, seen_split=%s, seen_merge=%s, ops=%d",
resize_stats["steps_done"],
resize_stats["seen_split"],
resize_stats["seen_merge"],
sum(tester.phase_ops.values()),
)
finally:
await tester.stop_workers()
await tester.verify_consistency()
logger.info("Multi-column LWT during randomized split/merge test completed successfully")

View File

@@ -6,6 +6,7 @@
import asyncio
import pytest
import logging
import random
import time
from test.pylib.manager_client import ManagerClient, wait_for_cql_and_get_hosts
from test.pylib.tablets import get_tablet_replica
@@ -193,3 +194,45 @@ async def test_interrupt_view_build_shard_registration(manager: ManagerClient):
assert len(res) == n_partitions
res = await cql.run_async(f"SELECT * FROM {ks}.mv")
assert len(res) == n_partitions
# The test verifies that when a reshard happens when building multiple views,
# which have different progress, we won't mistakenly decide that a view is built
# even if a build step is empty due to resharding.
# Reproduces https://github.com/scylladb/scylladb/issues/26523
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_empty_build_step_after_reshard(manager: ManagerClient):
server = await manager.server_add(cmdline=['--smp', '1', '--logger-log-level', 'view=debug'])
partitions = random.sample(range(1000), 129) # need more than 128 to allow the first build step to finish and save the progress
logger.info(f"Using partitions: {partitions}")
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE ks WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets={{'enabled':false}}")
await cql.run_async(f"CREATE TABLE ks.test (p int, c int, PRIMARY KEY(p,c));")
await asyncio.gather(*[cql.run_async(f"INSERT INTO ks.test (p, c) VALUES ({k}, {k+1});") for k in partitions])
# Create first materialized view and wait until building starts. The base table has enough partitions for 2 build steps.
# Allow the first build step to finish and save progress. In the second step there's only one partition left to build, which will land only on one
# of the shards after resharding.
await manager.api.enable_injection(server.ip_addr, "delay_finishing_build_step", one_shot=False)
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv AS SELECT p, c FROM ks.test WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
async def progress_saved():
rows = await cql.run_async(f"SELECT * FROM system.scylla_views_builds_in_progress WHERE keyspace_name = 'ks' AND view_name = 'mv'")
return len(rows) > 0 or None
await wait_for(progress_saved, time.time() + 60)
await manager.api.enable_injection(server.ip_addr, "dont_start_build_step", one_shot=False)
await manager.api.message_injection(server.ip_addr, "delay_finishing_build_step")
# Create second materialized view and immediately restart the server to cause resharding. The new view will effectively start building after the restart.
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv2 AS SELECT p, c FROM ks.test WHERE p IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, p)")
await manager.server_stop_gracefully(server.server_id)
await manager.server_start(server.server_id, cmdline_options_override=['--smp', '2', '--logger-log-level', 'view=debug'])
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
await wait_for_view(cql, 'mv', 1)
await wait_for_view(cql, 'mv2', 1)
# Verify that no rows are missing
base_rows = await cql.run_async(f"SELECT * FROM ks.test")
mv_rows = await cql.run_async(f"SELECT * FROM ks.mv")
mv2_rows = await cql.run_async(f"SELECT * FROM ks.mv2")
assert len(base_rows) == len(mv_rows) == len(mv2_rows) == 129

View File

@@ -0,0 +1,94 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.cluster.conftest import skip_mode
from test.pylib.manager_client import ManagerClient
from test.cluster.util import new_test_keyspace
from test.pylib.tablets import get_tablet_replica
import asyncio
import logging
import pytest
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.parametrize("migration_type", ["internode", "intranode"])
async def test_counter_updates_during_tablet_migration(manager: ManagerClient, migration_type: str):
"""
Test that counter updates remain consistent during tablet migrations.
This test performs concurrent counter increment operations on a single partition
while simultaneously triggering a tablet migration, either between nodes or
between shards on the same node.
The test verifies that counter consistency is maintained throughout the migration
process: no counter updates are lost, and the final counter value matches the total
number of increments performed.
"""
if migration_type == "intranode":
node_count = 1
else:
node_count = 3
cmdline = ['--smp', '2', '--logger-log-level', 'raft_topology=debug', '--logger-log-level', 'storage_service=debug']
servers = await manager.servers_add(node_count, cmdline=cmdline)
cql = manager.get_cql()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets={'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.counters (pk int PRIMARY KEY, c counter)")
stop_event = asyncio.Event()
pk = 1 # Single partition key for all updates
async def do_counter_updates():
"""Continuously update a single counter during migration"""
update_count = 0
while not stop_event.is_set():
await asyncio.gather(*[cql.run_async(f"UPDATE {ks}.counters SET c = c + 1 WHERE pk = {pk}") for _ in range(100)])
update_count += 100
return update_count
async def do_migration():
"""Perform the specified type of tablet migration"""
try:
tablet_token = 0 # the single tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'counters', tablet_token)
src_host = replica[0]
src_shard = replica[1]
if migration_type == "internode":
# Find a different node to migrate to
all_host_ids = [await manager.get_host_id(server.server_id) for server in servers]
dst_host = next(host_id for host_id in all_host_ids if host_id != src_host)
dst_shard = 0
else: # migration_type == "intranode"
# Move to a different shard on the same node
dst_host = src_host
dst_shard = 1 - src_shard # Switch between shard 0 and 1
await manager.api.move_tablet(servers[0].ip_addr, ks, "counters", src_host, src_shard, dst_host, dst_shard, tablet_token)
finally:
stop_event.set()
# Run counter updates and migration concurrently
update_task = asyncio.create_task(do_counter_updates())
await asyncio.sleep(0.5)
await do_migration()
total_updates = await update_task
logger.info("Completed %d counter updates during migration", total_updates)
# Verify no increments were lost - counter value should equal number of updates
result = await cql.run_async(f"SELECT c FROM {ks}.counters WHERE pk = {pk}")
actual_count = result[0].c
assert actual_count == total_updates, f"Counter value mismatch: expected {total_updates}, got {actual_count}"

View File

@@ -92,11 +92,10 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
Column("pk", IntType),
Column('int_c', IntType)
])
if not tablets_enabled: # issue #18180
await random_tables.add_table(name='t2', pks=1, columns=[
Column("pk", IntType),
Column('counter_c', CounterType)
])
await random_tables.add_table(name='t2', pks=1, columns=[
Column("pk", IntType),
Column('counter_c', CounterType)
])
cql = manager.get_cql()
await cql.run_async(f"USE {random_tables.keyspace}")
@@ -126,10 +125,9 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
with pytest.raises(WriteFailure, match="stale topology exception"):
await cql.run_async("insert into t1(pk, int_c) values (1, 1)", host=host2)
if not tablets_enabled: # issue #18180
logger.info(f"trying to write through host2 to counter column [{host2}]")
with pytest.raises(WriteFailure, match="stale topology exception"):
await cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2)
logger.info(f"trying to write through host2 to counter column [{host2}]")
with pytest.raises(WriteFailure, match="stale topology exception"):
await cql.run_async("update t2 set counter_c=counter_c+1 where pk=1", host=host2)
random_tables.drop_all()

View File

@@ -157,6 +157,9 @@ def get_incremental_repair_sst_skipped_bytes(server):
def get_incremental_repair_sst_read_bytes(server):
return get_metrics(server, "scylla_repair_inc_sst_read_bytes")
def get_repair_tablet_time_ms(server):
return get_metrics(server, "scylla_repair_tablet_time_ms")
async def get_sstables_for_server(manager, server, ks):
node_workdir = await manager.server_get_workdir(server.server_id)
sstables = get_sstables(node_workdir, ks, 'test')
@@ -671,3 +674,18 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
assert skip1 == skip2
assert read1 < read2
await do_repair_and_check('full', 1, rf'Starting tablet repair by API .* incremental_mode=full.*', check4)
@pytest.mark.asyncio
async def test_tablet_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
for s in servers:
time1 += get_repair_tablet_time_ms(s)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
for s in servers:
time2 += get_repair_tablet_time_ms(s)
assert time1 == 0
assert time2 > 0

View File

@@ -24,7 +24,7 @@ async def validate_status_operation(result: str, live_eps: list, down_eps: list,
assert lines[i] == "=" * dc_line_len
i += 1
assert lines[i] == "Status=Up/Down"
assert lines[i].startswith("Status=Up/Down")
i += 1
assert lines[i] == "|/ State=Normal/Leaving/Joining/Moving"
@@ -47,7 +47,10 @@ async def validate_status_operation(result: str, live_eps: list, down_eps: list,
assert ep in (live_eps + down_eps)
assert status_state[0] == ('U' if ep in live_eps else 'D')
if ep in live_eps:
assert status_state[0] == 'U'
else:
assert status_state[0] in ['D', 'X']
if ep in joining:
assert status_state[1] == 'J'

View File

@@ -22,7 +22,7 @@ class Measurement(NamedTuple):
metric_error_threshold: int = 1000
class DB(NamedTuple):
ks_opts: str = "WITH REPLICATION = { 'replication_factor' : '1' } AND tablets = { 'enabled' : false }"
ks_opts: str = "WITH REPLICATION = { 'replication_factor' : '1' }"
tbl_schema: str = "p int, c int, PRIMARY KEY (p)"
tbl_opts: str = ""
prep_stmt_gen: Callable[[str], str] | None = None

View File

@@ -4,10 +4,14 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import os
import time
import pytest
import asyncio
import logging
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
from test.pylib.manager_client import ManagerClient, ScyllaVersionDescription
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@@ -23,25 +27,6 @@ def yaml_to_cmdline(config):
return cmdline
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_dict_compression_not_allowed(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_dictionaries_allow_in_ddl': False,
'sstable_compression_user_table_options': {
'sstable_compression': 'ZstdWithDictsCompressor',
'chunk_length_in_kb': 4,
'compression_level': 10
}
}
expected_error = 'Invalid sstable_compression_user_table_options: sstable_compression ZstdWithDictsCompressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_negative(manager: ManagerClient, cfg_source: str):
@@ -105,4 +90,59 @@ async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
"""
Check that the default SSTable compression algorithm is:
* LZ4Compressor if SSTABLE_COMPRESSION_DICTS is disabled.
* LZ4WithDictsCompressor if SSTABLE_COMPRESSION_DICTS is enabled.
- Start a 2-node cluster running a version where dictionary compression is not supported (2025.1).
- Create a table. Ensure that it uses the LZ4Compressor.
- Upgrade one node.
- Create a second table. Ensure that it still uses the LZ4Compressor.
- Upgrade the second node.
- Wait for SSTABLE_COMPRESSION_DICTS to be enabled.
- Create a third table. Ensure that it uses the new LZ4WithDictsCompressor.
"""
async def create_table_and_check_compression(cql, keyspace, table_name, expected_compression, context):
"""Helper to create a table and verify its compression algorithm."""
logger.info(f"Creating table {table_name} ({context})")
await cql.run_async(f"CREATE TABLE {keyspace}.{table_name} (pk int PRIMARY KEY, v int)")
logger.info(f"Verifying that the default compression algorithm is {expected_compression}")
result = await cql.run_async(f"SELECT compression FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table_name}'")
actual_compression = result[0].compression.get("sstable_compression")
logger.info(f"Actual compression for {table_name}: {actual_compression}")
assert actual_compression == expected_compression, \
f"Expected {expected_compression} for {table_name} ({context}), got: {actual_compression}"
new_exe = os.getenv("SCYLLA")
assert new_exe
logger.info("Starting servers with version 2025.1")
servers = await manager.servers_add(2, version=scylla_2025_1)
logger.info("Creating a test keyspace")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}")
await create_table_and_check_compression(cql, "test_ks", "table_before_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "before upgrade")
logger.info("Upgrading server 0")
await manager.server_change_version(servers[0].server_id, new_exe)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await create_table_and_check_compression(cql, "test_ks", "table_during_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "during upgrade")
logger.info("Upgrading server 1")
await manager.server_change_version(servers[1].server_id, new_exe)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature to be enabled on all nodes")
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, host, time.time() + 60) for host in hosts))
await create_table_and_check_compression(cql, "test_ks", "table_after_upgrade", "LZ4WithDictsCompressor", "after upgrade and feature enabled")

View File

@@ -8,14 +8,10 @@ import logging
import pytest
import itertools
import time
import contextlib
import typing
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.pylib.rest_client import read_barrier, ScyllaMetrics, HTTPError
from test.pylib.rest_client import read_barrier, ScyllaMetrics
from cassandra.cluster import ConsistencyLevel, Session as CassandraSession
from cassandra.policies import FallthroughRetryPolicy, ConstantReconnectionPolicy
from cassandra.protocol import ServerError
from cassandra.query import SimpleStatement
logger = logging.getLogger(__name__)
@@ -494,83 +490,3 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
for algo in nondict_algorithms:
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
assert (await get_compressor_names(no_compression)) == set()
async def test_sstable_compression_dictionaries_allow_in_ddl(manager: ManagerClient):
"""
Tests the sstable_compression_dictionaries_allow_in_ddl option.
When it's disabled, ALTER and CREATE statements should not be allowed
to configure tables to use compression dictionaries for sstables.
"""
# Bootstrap cluster and configure server
logger.info("Bootstrapping cluster")
servers = (await manager.servers_add(1, cmdline=[
*common_debug_cli_options,
"--sstable-compression-dictionaries-allow-in-ddl=false",
], auto_rack_dc="dc1"))
@contextlib.asynccontextmanager
async def with_expect_server_error(msg):
try:
yield
except ServerError as e:
if e.message != msg:
raise
else:
raise Exception('Expected a ServerError, got no exceptions')
cql, hosts = await manager.get_ready_cql(servers)
await cql.run_async("""
CREATE KEYSPACE test
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
""")
for new_algo in ['LZ4WithDicts', 'ZstdWithDicts']:
logger.info(f"Tested algorithm: {new_algo}")
table_name = f"test.{new_algo}"
logger.info("Check that disabled sstable_compression_dictionaries_allow_in_ddl prevents CREATE with dict compression")
async with with_expect_server_error(f"sstable_compression {new_algo}Compressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`"):
await cql.run_async(SimpleStatement(f'''
CREATE TABLE {table_name} (pk int PRIMARY KEY, c blob)
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
logger.info("Enable the config option")
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "true")
logger.info("CREATE the table with dict compression")
await cql.run_async(SimpleStatement(f'''
CREATE TABLE {table_name} (pk int PRIMARY KEY, c blob)
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
logger.info("Disable compression on the table")
await cql.run_async(SimpleStatement(f'''
ALTER TABLE {table_name}
WITH COMPRESSION = {{'sstable_compression': ''}};
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
logger.info("Disable the config option again")
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "false")
logger.info("Check that disabled sstable_compression_dictionaries_allow_in_ddl prevents ALTER with dict compression")
async with with_expect_server_error(f"sstable_compression {new_algo}Compressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`"):
await cql.run_async(SimpleStatement(f'''
ALTER TABLE {table_name}
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
logger.info("Enable the config option again")
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "true")
logger.info("ALTER the table with dict compression")
await cql.run_async(SimpleStatement(f'''
ALTER TABLE {table_name}
WITH COMPRESSION = {{'sstable_compression': '{new_algo}Compressor'}};
''', retry_policy=FallthroughRetryPolicy()), host=hosts[0])
logger.info("Enable the config option again")
logger.info("Disable the config option for the next test")
await live_update_config(manager, servers, 'sstable_compression_dictionaries_allow_in_ddl', "false")

View File

@@ -3,6 +3,8 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import uuid
from cassandra.protocol import ConfigurationException, InvalidRequest, SyntaxException
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.cluster.test_tablets2 import safe_rolling_restart
@@ -921,6 +923,45 @@ async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: Manager
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
@pytest.mark.asyncio
async def test_excludenode(manager: ManagerClient):
"""
Verifies recovery scenario involving marking the node as excluded using excludenode.
1. Create a cluster with 3 racks, 1 node in rack1, 2 nodes in rack2, and 1 in rack3.
2. The keyspace is initially replicated to rack1 and rack2.
3. We down one node in rack2, which should cause unavailability.
4. We mark the node as excluded using excludenode. This unblocks the next ALTER
5. We add rack3 to RF of the keyspace. This wouldn't succeed without marking the node as excluded.
6. We verify that downed node can be removed successfully, while there are still tablets on it. That's
why we need two nodes in rack2.
"""
servers = await manager.servers_add(servers_num=3, auto_rack_dc='dc1')
await manager.server_add(property_file={'dc': 'dc1', 'rack': 'rack2'})
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', "
"'dc1': ['rack1', 'rack2']} AND tablets = { 'initial': 8 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
live_node = servers[0]
node_to_remove = servers[1]
with pytest.raises(Exception, match="Cannot mark host .* as excluded because it's alive"):
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
with pytest.raises(Exception, match=".* does not belong to this cluster"):
await manager.api.exclude_node(live_node.ip_addr, hosts=[str(uuid.uuid4())])
await manager.server_stop(node_to_remove.server_id)
await manager.others_not_see_server(node_to_remove.ip_addr)
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
# Check that tablets can be rebuilt in a new rack with rack2 down.
await cql.run_async(f"ALTER KEYSPACE {ks} WITH REPLICATION = {{ 'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2', 'rack3']}}")
# Check that removenode succeeds on the node which is excluded
await manager.remove_node(live_node.server_id, server_id=node_to_remove.server_id)
@pytest.mark.asyncio
@pytest.mark.parametrize("with_zero_token_node", [False, True])
async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_token_node: bool):

View File

@@ -1727,7 +1727,6 @@ async def test_tablet_load_and_stream_and_split_synchronization(manager: Manager
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--smp', '1',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1

View File

@@ -534,7 +534,8 @@ async def test_view_building_while_tablet_streaming_fail(manager: ManagerClient)
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'tab', tablet_token)
await manager.api.enable_injection(servers[0].ip_addr, "stream_tablet_fail", one_shot=True)
await manager.api.enable_injection(servers[0].ip_addr, "stream_tablet_fail", one_shot=False)
await manager.api.enable_injection(servers[0].ip_addr, "stream_tablet_move_to_cleanup", one_shot=False)
await asyncio.gather(*(manager.api.disable_injection(s.ip_addr, VIEW_BUILDING_WORKER_PAUSE_BUILD_RANGE_TASK) for s in servers))
await manager.api.move_tablet(servers[0].ip_addr, ks, "tab", replica[0], replica[1], s1_host_id, 0, tablet_token)

View File

@@ -1,7 +1,6 @@
-- Error messages contain a keyspace name. Make the output stable.
CREATE KEYSPACE ks
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
AND TABLETS = {'enabled': false};
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
USE ks;
create table tb1 (pk int primary key, c1 counter) with default_time_to_live = 100;

View File

@@ -1,7 +1,6 @@
> -- Error messages contain a keyspace name. Make the output stable.
> CREATE KEYSPACE ks
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
> AND TABLETS = {'enabled': false};
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
OK
> USE ks;
OK

View File

@@ -1,7 +1,6 @@
-- Error messages contain a keyspace name. Make the output stable.
CREATE KEYSPACE ks
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
AND TABLETS = {'enabled': false};
WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
USE ks;
CREATE TABLE ks.tbl_cnt (pk int PRIMARY KEY, c1 counter);

View File

@@ -1,7 +1,6 @@
> -- Error messages contain a keyspace name. Make the output stable.
> CREATE KEYSPACE ks
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}
> AND TABLETS = {'enabled': false};
> WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};
OK
> USE ks;
OK

View File

@@ -44,39 +44,21 @@ def testNonCounterInCounterBatch(cql, test_keyspace):
with pytest.raises(InvalidRequest):
sendBatch(cql, test_keyspace, BatchType.COUNTER, False, True, False)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testNonCounterInLoggedBatch(cql, test_keyspace):
sendBatch(cql, test_keyspace, BatchType.LOGGED, False, True, False)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testNonCounterInUnLoggedBatch(cql, test_keyspace):
sendBatch(cql, test_keyspace, BatchType.UNLOGGED, False, True, False)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterInCounterBatch(cql, test_keyspace):
sendBatch(cql, test_keyspace, BatchType.COUNTER, True, False, False)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterInUnLoggedBatch(cql, test_keyspace):
sendBatch(cql, test_keyspace, BatchType.UNLOGGED, True, False, False)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testTableWithClusteringInLoggedBatch(cql, test_keyspace):
sendBatch(cql, test_keyspace, BatchType.LOGGED, False, False, True)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testTableWithClusteringInUnLoggedBatch(cql, test_keyspace):
sendBatch(cql, test_keyspace, BatchType.UNLOGGED, False, False, True)

View File

@@ -242,9 +242,6 @@ def testCastsWithReverseOrder(cql, test_keyspace):
# row("2.0"))
# Reproduces #14501:
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterCastsInSelectionClause(cql, test_keyspace):
with create_table(cql, test_keyspace, "(a int primary key, b counter)") as table:
execute(cql, table, "UPDATE %s SET b = b + 2 WHERE a = 1")

View File

@@ -12,9 +12,6 @@ from cassandra.query import UNSET_VALUE
# Test for the validation bug of CASSANDRA-4706,
# migrated from cql_tests.py:TestCQL.validate_counter_regular_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testRegularCounters(cql, test_keyspace):
# The Cassandra and Scylla error messages are different: Cassandra says
# "Cannot mix counter and non counter columns in the same table", Scylla
@@ -31,9 +28,6 @@ def testRegularCounters(cql, test_keyspace):
"CREATE TABLE %s (id bigint PRIMARY KEY, count counter, things set<text>)")
# Migrated from cql_tests.py:TestCQL.collection_counter_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCountersOnCollections(cql, test_keyspace):
assert_invalid_throw(cql, test_keyspace + "." + unique_name(),
InvalidRequest,
@@ -47,9 +41,6 @@ def testCountersOnCollections(cql, test_keyspace):
InvalidRequest,
"CREATE TABLE %s (k int PRIMARY KEY, m map<text, counter>)")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterUpdatesWithUnset(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, c counter)") as table:
# set up
@@ -67,9 +58,6 @@ def testCounterUpdatesWithUnset(cql, test_keyspace):
assert_rows(execute(cql, table, "SELECT c FROM %s WHERE k = 10"), [1]) # no change to the counter value
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterFiltering(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, a counter)") as table:
for i in range(10):
@@ -97,9 +85,6 @@ def testCounterFiltering(cql, test_keyspace):
assert_rows_ignoring_order(execute(cql, table, "SELECT * FROM %s WHERE a = ? ALLOW FILTERING", 6),
[6, 6], [10, 6])
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterFilteringWithNull(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, a counter, b counter)") as table:
execute(cql, table, "UPDATE %s SET a = a + ? WHERE k = ?", 1, 1)
@@ -120,9 +105,6 @@ def testCounterFilteringWithNull(cql, test_keyspace):
"SELECT * FROM %s WHERE b = null ALLOW FILTERING")
# Test for the validation bug of CASSANDRA-9395.
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testProhibitReversedCounterAsPartOfPrimaryKey(cql, test_keyspace):
# The Cassandra message and Scylla message differ slightly -
# counter type is not supported for PRIMARY KEY column 'a'"
@@ -134,9 +116,6 @@ def testProhibitReversedCounterAsPartOfPrimaryKey(cql, test_keyspace):
"CREATE TABLE %s (a counter, b int, PRIMARY KEY (b, a)) WITH CLUSTERING ORDER BY (a desc);")
# Check that a counter batch works as intended
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterBatch(cql, test_keyspace):
with create_table(cql, test_keyspace, "(userid int, url text, total counter, PRIMARY KEY (userid, url))") as table:
# Ensure we handle updates to the same CQL row in the same partition properly

View File

@@ -282,9 +282,6 @@ def testIndexOnCompoundRowKey(cql, test_keyspace):
["t", 1, 4, 3])
# Migrated from cql_tests.py:TestCQL.secondary_index_counters()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testIndexOnCountersInvalid(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, c counter)") as table:
assert_invalid(cql, table, "CREATE INDEX ON %s(c)")

View File

@@ -126,9 +126,6 @@ def testCounters(cql, test_keyspace):
assert_rows(execute(cql, table, "SELECT total FROM %s WHERE userid = 1 AND url = 'http://foo.com'"),
[1])
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterFiltering(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, a COUNTER) WITH COMPACT STORAGE") as table:
for i in range(10):
@@ -171,9 +168,6 @@ def testCounterFiltering(cql, test_keyspace):
[10, 6])
# Test for the bug of CASSANDRA-11726.
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterAndColumnSelection(cql, test_keyspace):
for compactStorageClause in ["", " WITH COMPACT STORAGE"]:
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, c counter) " + compactStorageClause) as table:
@@ -188,9 +182,6 @@ def testCounterAndColumnSelection(cql, test_keyspace):
assert_rows(execute(cql, table, "SELECT k FROM %s"), [0])
# Check that a counter batch works as intended
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testCounterBatch(cql, test_keyspace):
with create_table(cql, test_keyspace, "(userid int, url text, total counter, PRIMARY KEY (userid, url)) WITH COMPACT STORAGE") as table:
# Ensure we handle updates to the same CQL row in the same partition properly

View File

@@ -1649,9 +1649,6 @@ def executeFilteringOnly(cql, table, statement):
assert_invalid(cql, table, statement)
return execute_without_paging(cql, table, statement + " ALLOW FILTERING")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testAllowFilteringOnPartitionKeyWithCounters(cql, test_keyspace):
with create_table(cql, test_keyspace, "(a int, b int, c int, cnt counter, PRIMARY KEY ((a, b), c))") as table:
execute(cql, table, "UPDATE %s SET cnt = cnt + ? WHERE a = ? AND b = ? AND c = ?", 14, 11, 12, 13)
@@ -1975,9 +1972,6 @@ def testCustomIndexWithFiltering(cql, test_keyspace):
assert_rows(executeFilteringOnly(cql, table, "SELECT * FROM %s WHERE c = 'b' AND d = 4"),
["c", 3, "b", 4])
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def testFilteringWithCounters(cql, test_keyspace):
with create_table(cql, test_keyspace, "(a int, b int, c int, cnt counter, PRIMARY KEY (a, b, c))") as table:
execute(cql, table, "UPDATE %s SET cnt = cnt + ? WHERE a = ? AND b = ? AND c = ?", 14, 11, 12, 13)

View File

@@ -119,9 +119,6 @@ def test_ignore_cast_to_the_same_type(cql, table1):
assert cql.execute(f"SELECT CAST(p as int) FROM {table1} WHERE p={p}").one()._fields[0] == "p"
# Test casting a counter to various other types. Reproduces #14501.
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def test_cast_from_counter(cql, table2):
p = unique_key_int()
# Set the counter to 1000 in two increments, to make it less trivial to
@@ -156,9 +153,6 @@ def test_cast_from_counter(cql, table2):
# "text" type. Since "varchar" is just an alias for "text", casting
# to varchar should work too, but in Cassandra it doesn't so this test
# is marked a Cassandra bug.
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def test_cast_from_counter_to_varchar(cql, table2, cassandra_bug):
p = unique_key_int()
cql.execute(f'UPDATE {table2} SET c = c + 1000 WHERE p = {p}')

View File

@@ -25,9 +25,6 @@ def table2(cql, test_keyspace):
# Test that the function counterasblob() exists and works as expected -
# same as bigintasblob on the same number (a counter is a 64-bit number).
# Reproduces #14742
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def test_counter_to_blob(cql, table1, table2):
p = unique_key_int()
cql.execute(f'UPDATE {table1} SET i = 1000 WHERE p = {p}')
@@ -72,9 +69,6 @@ def test_blobascounter_wrong_size(cql, table1):
# Drop a table while there is a counter update operation in progress.
# Verify the table waits for the operation to complete before it's destroyed.
# Reproduces scylladb/scylla-enterprise#4475
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18180")]), "vnodes"],
indirect=True)
def test_counter_update_while_table_dropped(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, c counter") as table, \
scylla_inject_error(cql, "apply_counter_update_delay_5s", one_shot=True):

View File

@@ -190,10 +190,7 @@ def test_desc_scylla_keyspace(scylla_only, cql, random_seed):
# Test that `DESC TABLE {tbl}` contains appropriate create statement for table
# This test compares the content of `system_schema.tables` and `system_schema.columns` tables.
def test_desc_table(cql, test_keyspace, random_seed, has_tablets):
if has_tablets: # issue #18180
global counter_table_chance
counter_table_chance = 0
def test_desc_table(cql, test_keyspace, random_seed):
with new_random_table(cql, test_keyspace) as tbl:
desc = cql.execute(f"DESC TABLE {tbl}")
desc_stmt = desc.one().create_statement
@@ -243,10 +240,7 @@ def test_desc_table(cql, test_keyspace, random_seed, has_tablets):
# This test compares the content of `system_schema.tables` and `system_schema.columns` tables
# when providing tablet options to CREATE TABLE.
def test_desc_table_with_tablet_options(cql, test_keyspace, random_seed, has_tablets):
if has_tablets: # issue #18180
global counter_table_chance
counter_table_chance = 0
def test_desc_table_with_tablet_options(cql, test_keyspace, random_seed):
tablet_options = {
'min_tablet_count': '100',
'min_per_shard_tablet_count': '0.8', # Verify that a floating point value works for this hint
@@ -269,10 +263,7 @@ def test_desc_table_with_tablet_options(cql, test_keyspace, random_seed, has_tab
# Test that `DESC TABLE {tbl}` contains appropriate create statement for table
# This test compares the content of `system_schema.scylla_tables` tables, thus the test
# is `scylla_only`.
def test_desc_scylla_table(scylla_only, cql, test_keyspace, random_seed, has_tablets):
if has_tablets: # issue #18180
global counter_table_chance
counter_table_chance = 0
def test_desc_scylla_table(scylla_only, cql, test_keyspace, random_seed):
with new_random_table(cql, test_keyspace) as tbl:
desc = cql.execute(f"DESC TABLE {tbl}")
desc_stmt = desc.one().create_statement
@@ -426,10 +417,7 @@ def test_desc_table_internals(cql, test_keyspace):
assert f"ALTER TABLE {tbl} ADD b int" in desc_internals
# Test that `DESC KEYSPACE {ks}` contains not only keyspace create statement but also for its elements
def test_desc_keyspace_elements(cql, random_seed, has_tablets):
if has_tablets: # issue #18180
global counter_table_chance
counter_table_chance = 0
def test_desc_keyspace_elements(cql, random_seed):
with new_random_keyspace(cql) as ks:
with new_random_type(cql, ks) as udt:
with new_random_table(cql, ks, [udt]) as tbl:
@@ -449,10 +437,7 @@ def test_desc_keyspace_elements(cql, random_seed, has_tablets):
# Test that `DESC SCHEMA` contains all information for user created keyspaces
# and `DESC FULL SCHEMA` contains also information for system keyspaces
def test_desc_schema(cql, test_keyspace, random_seed, has_tablets):
if has_tablets: # issue #18180
global counter_table_chance
counter_table_chance = 0
def test_desc_schema(cql, test_keyspace, random_seed):
with new_random_keyspace(cql) as ks:
with new_random_table(cql, test_keyspace) as tbl1, new_random_table(cql, ks) as tbl2:
desc = cql.execute("DESC SCHEMA")
@@ -690,10 +675,7 @@ def test_view_desc_in_table_desc(cql, test_keyspace, cassandra_bug):
# keyspace, table, view, index, UDT, UDF, UDA
# Cassandra compatibility require us to be able generic describe: keyspace, table, view, index.
def test_generic_desc(cql, random_seed, has_tablets):
if has_tablets: # issue #18180
global counter_table_chance
counter_table_chance = 0
def test_generic_desc(cql, random_seed):
with new_random_keyspace(cql) as ks:
with new_random_table(cql, ks) as t1, new_test_table(cql, ks, "a int primary key, b int, c int") as tbl:
cql.execute(f"CREATE INDEX idx ON {tbl}(b)")

Some files were not shown because too many files have changed in this diff Show More