Compare commits

...

121 Commits

Author SHA1 Message Date
Piotr Dulikowski
9536949911 Merge '[Backport 2025.2] tablets: deallocate storage state on end_migration' from Scylladb[bot]
When a tablet is migrated and cleaned up, deallocate the tablet storage
group state on `end_migration` stage, instead of `cleanup` stage:

* When the stage is updated from `cleanup` to `end_migration`, the
  storage group is removed on the leaving replica.
* When the table is initialized, if the tablet stage is `end_migration`
  then we don't allocate a storage group for it. This happens for
  example if the leaving replica is restarted during tablet migration.
  If it's initialized in `cleanup` stage then we allocate a storage
  group, and it will be deallocated when transitioning to
  `end_migration`.

This guarantees that the storage group is always deallocated on the
leaving replica by `end_migration`, and that it is always allocated if
the tablet wasn't cleaned up fully yet.

It is a similar case also for the pending replica when the migration is
aborted. We deallocate the state on `revert_migration` which is the
stage following `cleanup_target`.

Previously the storage group would be allocated when the tablet is
initialized on any of the tablet replicas - also on the leaving replica,
and when the tablet stage is `cleanup` or `end_migration`, and
deallocated during `cleanup`.

This fixes the following issue:

1. A migrating tablet enters cleanup stage
2. the tablet is cleaned up successfuly
3. The leaving replica is restarted, and allocates storage group
4. tablet cleanup is not called because it's already cleaned up
5. the storage group remains allocated on the leaving replica after the
   migration is completed - it's not cleaned up properly.

Fixes https://github.com/scylladb/scylladb/issues/23481

backport to all relevant releases since it's a bug that results in a crash

- (cherry picked from commit 34f15ca871)

- (cherry picked from commit fb18fc0505)

- (cherry picked from commit bd88ca92c8)

Parent PR: #24393

Closes scylladb/scylladb#24488

* github.com:scylladb/scylladb:
  test/cluster/test_tablets: test restart during tablet cleanup
  test: tablets: add get_tablet_info helper
  tablets: deallocate storage state on end_migration
2025-06-18 10:25:32 +02:00
Anna Stuchlik
01d3b504d1 doc: add support for z3 GCP
This commit adds support for z3-highmem-highlssd instance types to
Cloud Instance Recommendations for GCP.

Fixes https://github.com/scylladb/scylladb/issues/24511

Closes scylladb/scylladb#24533

(cherry picked from commit 648d8caf27)

Closes scylladb/scylladb#24545
2025-06-17 23:40:47 +03:00
Michael Litvak
305f827888 test/cluster/test_tablets: test restart during tablet cleanup
Add a test that reproduces issue scylladb/scylladb#23481.

The test migrates a tablet from one node to another, and while the
tablet is in some stage of cleanup - either before or right after,
depending on the parameter - the leaving replica, on which the tablet is
cleaned, is restarted.

This is interesting because when the leaving replica starts and loads
its state, the tablet could be in different stages of cleanup - the
SSTables may still exist or they may have been cleaned up already, and
we want to make sure the state is loaded correctly.

(cherry picked from commit bd88ca92c8)
2025-06-17 13:59:10 +00:00
Michael Litvak
d094bc6fc9 test: tablets: add get_tablet_info helper
Add a helper for tests to get the tablet info from system.tablets for a
tablet owning a given token.

(cherry picked from commit fb18fc0505)
2025-06-17 13:59:10 +00:00
Michael Litvak
c11a2e2aaf tablets: deallocate storage state on end_migration
When a tablet is migrated and cleaned up, deallocate the tablet storage
group state on `end_migration` stage, instead of `cleanup` stage:

* When the stage is updated from `cleanup` to `end_migration`, the
  storage group is removed on the leaving replica.
* When the table is initialized, if the tablet stage is `end_migration`
  then we don't allocate a storage group for it. This happens for
  example if the leaving replica is restarted during tablet migration.
  If it's initialized in `cleanup` stage then we allocate a storage
  group, and it will be deallocated when transitioning to
  `end_migration`.

This guarantees that the storage group is always deallocated on the
leaving replica by `end_migration`, and that it is always allocated if
the tablet wasn't cleaned up fully yet.

It is a similar case also for the pending replica when the migration is
aborted. We deallocate the state on `revert_migration` which is the
stage following `cleanup_target`.

Previously the storage group would be allocated when the tablet is
initialized on any of the tablet replicas - also on the leaving replica,
and when the tablet stage is `cleanup` or `end_migration`, and
deallocated during `cleanup`.

This fixes the following issue:

1. A migrating tablet enters cleanup stage
2. the tablet is cleaned up successfuly
3. The leaving replica is restarted, and allocates storage group
4. tablet cleanup is not called because it was already cleaned up
4. the storage group remains allocated on the leaving replica after the
   migration is completed - it's not cleaned up properly.

Fixes scylladb/scylladb#23481

(cherry picked from commit 34f15ca871)
2025-06-17 13:59:10 +00:00
Botond Dénes
a63b22eec6 Merge '[Backport 2025.2] tablets: fix missing data after tablet merge ' from Scylladb[bot]
Consider the following scenario:

1) let's assume tablet 0 has range [1, 5] (pre merge)
2) tablet merge happens, tablet 0 has now range [1, 10]
3) tablet_sstable_set isn't refreshed, so holds a stale state, thinks tablet 0 still has range [1, 5]
4) during a full scan, forward service will intersect the full range with tablet ranges and consume one tablet at a time
5) replica service is asked to consume range [1, 10] of tablet 0 (post merge)

We have two possible outcomes:

With cache bypass:

1) cache reader is bypassed
2) sstable reader is created on range [1, 10]
3) unrefreshed tablet_sstable_set holds stale state, but select correctly all sstables intersecting with range [1, 10]

With cache:

1) cache reader is created
2) finds partition with token 5 is cached
3) sstable reader is created on range [1, 4] (later would fast forward to range [6, 10]; also belongs to tablet 0)
4) incremental selector consumes the pre-merge sstable spanning range [1, 5]
4.1) since the partitioned_sstable_set pre-merge contains only that sstable, EOS is reached
4.2) since EOS is reached, the fast forward to range [6, 10] is not allowed.
So with the set refreshed, sstable set is aligned with tablet ranges, and no premature EOS is signalled, otherwise preventing fast forward to from happening and all data from being properly captured in the read.

This change fixes the bug and triggers a mutation source refresh whenever the number of tablets for the table has changed, not only when we have incoming tablets.

Additionally, includes a fix for range reads that span more than one tablet, which can happen during split execution.

Fixes: https://github.com/scylladb/scylladb/issues/23313

This change needs to be backported to all supported versions which implement tablet merge.

- (cherry picked from commit d0329ca370)

- (cherry picked from commit 1f9f724441)

- (cherry picked from commit 53df911145)

Parent PR: #24287

Closes scylladb/scylladb#24339

* github.com:scylladb/scylladb:
  replica: Fix range reads spanning sibling tablets
  test: add reproducer and test for mutation source refresh after merge
  tablets: trigger mutation source refresh on tablet count change
2025-06-17 08:35:14 +03:00
Jenkins Promoter
0adf905112 Update ScyllaDB version to: 2025.2.0-rc5 2025-06-16 16:21:22 +03:00
Pavel Emelyanov
c2a9f2d9c6 Update seastar submodule
* seastar d7ff58f2...9f0034a0 (1):
  > http_client: Add ECONNRESET to retryable errors

And switch to 2025.2 branch from scylla-seastar for backports

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#24446
2025-06-15 17:33:16 +03:00
Raphael S. Carvalho
79958472bc replica: Fix range reads spanning sibling tablets
We don't guarantee that coordinators will only emit range reads that
span only one tablet.

Consider this scenario:

1) split is about to be finalized, barrier is executed, completes.
2) coordinator starts a read, uses pre-split erm (split not committed to group0 yet)
3) split is committed to group0, all replicas switch storage.
4) replica-side read is executed, uses a range which spans tablets.

We could fix it with two-phase split execution. Rather than pushing the
complexity to higher levels, let's fix incremental selector which should
be able to serve all the tokens owned by a given shard. During split
execution, either of sibling tablets aren't going anywhere since it
runs with state machine locked, so a single read spanning both
sibling tablets works as long as the selector works across tablet
boundaries.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 53df911145)
2025-06-15 09:14:38 -03:00
Ferenc Szili
ba192c1a29 test: add reproducer and test for mutation source refresh after merge
This change adds a reproducer and test for the fix where the local mutation
source is not always refreshed after a tablet merge.

(cherry picked from commit 1f9f724441)
2025-06-15 09:14:37 -03:00
Jenkins Promoter
89f5374435 Update pgo profiles - aarch64 2025-06-15 04:46:00 +03:00
Jenkins Promoter
184e0716b3 Update pgo profiles - x86_64 2025-06-15 04:08:36 +03:00
Anna Stuchlik
baa2592299 doc: remove the limitation for disabling CDC
This commit removes the instruction to stop all writes before disabling CDC with ALTER.

Fixes https://github.com/scylladb/scylla-docs/issues/4020

Closes scylladb/scylladb#24406

(cherry picked from commit b0ced64c88)

Closes scylladb/scylladb#24476
2025-06-13 14:07:38 +03:00
Robert Bindar
a926cba476 Add support for nodetool refresh --skip-reshape
This patch adds the new option in nodetool, patches the
load_new_ss_tables REST request with a new parameter and
skips the reshape step in refresh if this flag is passed.

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>

Closes scylladb/scylladb#24409
Fixes: #24365

(cherry picked from commit ca1a9c8d01)

Closes scylladb/scylladb#24472
2025-06-13 14:06:19 +03:00
Michał Chojnowski
9c28b812ca db/config: add an option that disables dict-aware sstable compressors in DDL statements
For reasons, we want to be able to disallow dictionary-aware compressors
in chosen deployments.

This patch adds a knob for that. When the knob is disabled,
dictionary-aware compressors will be rejected in the validation
stage of CREATE and ALTER statements.

Closes scylladb/scylladb#24355

(cherry picked from commit 7d26d3c7cb)

Closes scylladb/scylladb#24454
2025-06-13 14:03:32 +03:00
Michael Litvak
d792916e8e test_cdc_generation_clearing: wait for generations to propagate
In test_cdc_generation_clearing we trigger events that update CDC
generations, verify the generations are updated as expected, and verify
the system topology and CDC generations are consistent on all nodes.

Before checking that all nodes are consistent and have the same CDC
generations, we need to consider that the changes are propagated through
raft and take some time to propagate to all nodes.

Currently, we wait for the change to be applied only on the first server
which runs the CDC generation publisher fiber and read the CDC
generations from this single node. The consistency check that follows
could fail if the change was not propagated to some other node yet.

To fix that, before checking consistency with all nodes, we execute a
read barrier on all nodes so they all see the same state as the leader.

Fixes scylladb/scylladb#24407

Closes scylladb/scylladb#24433

(cherry picked from commit 8aeb404893)

Closes scylladb/scylladb#24450
2025-06-10 15:50:40 +03:00
Michał Chojnowski
a539ff6419 utils/lsa/chunked_managed_vector: fix the calculation of max_chunk_capacity()
`chunked_managed_vector` is a vector-like container which splits
its contents into multiple contiguous allocations if necessary,
in order to fit within LSA's max preferred contiguous allocation
limits.

Each limited-size chunk is stored in a `managed_vector`.
`managed_vector` is unaware of LSA's size limits.
It's up to the user of `managed_vector` to pick a size which
is small enough.

This happens in `chunked_managed_vector::max_chunk_capacity()`.
But the calculation is wrong, because it doesn't account for
the fact that `managed_vector` has to place some metadata
(the backreference pointer) inside the allocation.
In effect, the chunks allocated by `chunked_managed_vector`
are just a tiny bit larger than the limit, and the limit is violated.

Fix this by accounting for the metadata.

Also, before the patch `chunked_managed_vector::max_contiguous_allocation`,
repeats the definition of logalloc::max_managed_object_size.
This is begging for a bug if `logalloc::max_managed_object_size`
changes one day. Adjust it so that `chunked_managed_vector` looks
directly at `logalloc::max_managed_object_size`, as it means to.

Fixes scylladb/scylladb#23854

(cherry picked from commit 7f9152babc)

Closes scylladb/scylladb#24371
2025-06-10 11:25:52 +03:00
Jenkins Promoter
b295ce38ae Update ScyllaDB version to: 2025.2.0-rc4 2025-06-06 17:03:11 +03:00
Nikos Dragazis
2e50d1a357 sstables: Fix race when loading checksum component
`read_checksum()` loads the checksum component from disk and stores a
non-owning reference in the shareable components. To avoid loading the
same component twice, the function has an early return statement.
However, this does not guarantee atomicity - two fibers or threads may
load the component and update the shareable components concurrently.
This can lead to use-after-free situations when accessing the component
through the shareable components, since the reference stored there is
non-owning. This can happen when multiple compaction tasks run on the
same SSTable (e.g., regular compaction and scrub-validate).

Fix this by not updating the reference in shareable components, if a
reference is already in place. Instead, create an owning reference to
the existing component for the current fiber. This is less efficient
than using a mutex, since the component may be loaded multiple times
from disk before noticing the race, but no locks are used for any other
SSTable component either. Also, this affects uncompressed SSTables,
which are not that common.

Fixes #23728.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>

Closes scylladb/scylladb#23872

(cherry picked from commit eaa2ce1bb5)

Closes scylladb/scylladb#24358
2025-06-06 08:49:56 +03:00
Szymon Malewski
d65b390780 mapreduce_service: Prevent race condition
In parallelized aggregation functions super-coordinator (node performing final merging step) receives and merges each partial result in parallel coroutines (`parallel_for_each`).
Usually responses are spread over time and actual merging is atomic.
However sometimes partial results are received at the similar time and if an aggregate function (e.g. lua script) yields, two coroutines can try to overwrite the same accumulator one after another,
which leads to losing some of the results.
To prevent this, in this patch each coroutine stores merging results in its own context and overwrites accumulator atomically, only after it was fully merged.
Comparing to the previous implementation order of operands in merging function is swapped, but the order of aggregation is not guaranteed anyway.

Fixes #20662

Closes scylladb/scylladb#24106

(cherry picked from commit 5969809607)

Closes scylladb/scylladb#24389
2025-06-06 08:49:15 +03:00
Anna Stuchlik
4ebae7ae62 doc: add the upgrade guide from 2025.1 to 2025.2
This commit adds the upgrade guide from version 2025.1 to 2025.2.
Also, it removes the upgrade guides existing for the previous version
that are irrelevant in 2025.2 (upgrade from OSS 6.2 and Enterprise 2024.x).

Note that the new guide does not include the "Enable Consistent Topology Updates" page,
as users upgrading to 2025.2 have consistent topology updates already enabled.

Fixes https://github.com/scylladb/scylladb/issues/24133

Fixes https://github.com/scylladb/scylladb/issues/24265

Closes scylladb/scylladb#24266

(cherry picked from commit 8b989d7fb1)

Closes scylladb/scylladb#24391
2025-06-06 08:48:31 +03:00
Ernest Zaslavsky
4fed3a5a5a encryption_test: Catch exact exception
Apparently `test_kms_network_error` will succeed at any circumstances since most of our exceptions derive from `std::exception`, so whatever happens to the test, for whatever reason it will throw, the test will be marked as passed.

Start catching the exact exception that we expect to be thrown.

Maybe somewhat related to https://github.com/scylladb/scylladb/issues/22628

Fixes: https://github.com/scylladb/scylladb/issues/24145

reapplies reverted: https://github.com/scylladb/scylladb/pull/24065

Should be backported to 2025.2.

Closes scylladb/scylladb#24242

(cherry picked from commit a39b773d36)

Closes scylladb/scylladb#24402
2025-06-06 08:48:02 +03:00
Pavel Emelyanov
5b86b6393a Merge '[Backport 2025.2] Add ability to skip SSTables cleanup when loading them' from Scylladb[bot]
The non-streaming loading of sstables performs cleanup since recently [1]. For vnodes, unfortunately, cleanup is almost unavoidable, because of the nature of vnodes sharding, even if sstable is already clean. This leads to waste of IO and CPU for nothing. Skipping the cleanup in a smart way is possible, but requires too many changes in the code and in the on-disk data. However, the effort will not help existing SSTables and it's going to be obsoleted by tablets some time soon.

Said that, the easiest way to skip cleanup is the explicit --skip-cleanup option for nodetool and respective skip_cleanup parameter for API handler.

New feature, no backport

fixes #24136
refs #12422 [1]

- (cherry picked from commit 4ab049ac8d)

- (cherry picked from commit ed3ce0f6af)

- (cherry picked from commit 1b1f653699)

- (cherry picked from commit c0796244bb)

Parent PR: #24139

Closes scylladb/scylladb#24398

* github.com:scylladb/scylladb:
  nodetool: Add refresh --skip-cleanup option
  api: Introduce skip_cleanup query parameter
  distributed_loader: Don't create owned ranges if skip-cleanup is true
  code: Push bool skip_cleanup flag around
2025-06-06 08:47:22 +03:00
Pavel Emelyanov
024af57bd5 nodetool: Add refresh --skip-cleanup option
The option "conflicts" with load-and-stream. Tests and doc included.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit c0796244bb)
2025-06-05 17:52:13 +03:00
Pavel Emelyanov
c59327950b api: Introduce skip_cleanup query parameter
Just copy the load_and_stream and primary_replica_only logic, this new
option is the same in this sense.

Throw if it's specified with the load_and_stream one.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit 1b1f653699)
2025-06-05 17:48:35 +03:00
Pavel Emelyanov
a2b2e46482 distributed_loader: Don't create owned ranges if skip-cleanup is true
In order to make reshard compaction task run cleanup, the owner-ranges
pointer is passed to it. If it's nullptr, the cleanup is not performed.
So to do the skip-cleanup, the easiest (but not the most apparent) way
is not to initialize the pointer and keep it nullptr.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit ed3ce0f6af)
2025-06-05 17:44:45 +03:00
Pavel Emelyanov
4a7ddbfe07 code: Push bool skip_cleanup flag around
Just put the boolean into the callstack between API and distributed
loader to reduce the churn in the next patches. No functional changes,
flag is false and unused.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit 4ab049ac8d)
2025-06-05 17:44:40 +03:00
Michał Chojnowski
484fc374c1 compress: fix a use-after-free in dictionary_holder::get_recommended_dict()
The function calls copy() on a foreign_ptr
(stored in a map) which can be destroyed
(erased from the map) before the copy() completes.
This is illegal.

One way to fix this would be to apply an rwlock
to the map. Another way is to wrap the `foreign_ptr`
in a `lw_shared_ptr` and extend its lifetime over
the `copy()` call. This patch does the latter.

Fixes scylladb/scylladb#24165
Fixes scylladb/scylladb#24174

Closes scylladb/scylladb#24175

(cherry picked from commit ea4d251ad2)

Closes scylladb/scylladb#24374
2025-06-05 12:11:22 +03:00
Botond Dénes
a5251b4d44 Merge '[Backport 2025.2] Add --scope arg to notedool refresh' from Scylladb[bot]
This PR adds the `--scope` option to `nodetool refresh`.
Like in the case of `nodetool restore`, you can pass either of:
* `node` - On the local node.
* `rack` - On the local rack.
* `dc` - In the datacenter (DC) where the local node lives.
* `all` (default) - Everywhere across the cluster.

as scope.

The feature is based on the existing load_and_stream paths, so it requires passing `--load-and-stream` to the `refresh` command, although this might change in the near future.

Fixes https://github.com/scylladb/scylladb/issues/23564

- (cherry picked from commit c570941692)

Parent PR: #23861

Closes scylladb/scylladb#24379

* github.com:scylladb/scylladb:
  Add nodetool refresh --scope option
  Refactor out code from test_restore_with_streaming_scopes
  Refactor out code from test_restore_with_streaming_scopes
  Refactor out code from test_restore_with_streaming_scopes
  Refactor out code from test_restore_with_streaming_scopes
  Refactor out code from test_restore_with_streaming_scopes
2025-06-05 11:54:17 +03:00
Avi Kivity
2afe0695cf Revert "config: decrease default large allocation warning threshold to 128k"
This reverts commit 04fb2c026d. 2025.2 got
the reduced threshold, but won't get most of the fixes the warning will
generate, leaving it very noisy. Better to avoid the noise for this release.

Fixes #24384.
2025-06-04 14:18:35 +03:00
Robert Bindar
b62264e1d9 Add nodetool refresh --scope option
This change adds the --scope option to nodetool refresh.
Like in the case of nodetool restore, you can pass either of:
* node - On the local node.
* rack - On the local rack.
* dc - In the datacenter (DC) where the local node lives.
* all (default) - Everywhere across the cluster.
as scope.

The feature is based on the existing load_and_stream paths, so it
requires passing --load-and-stream to the refresh command.
Also, it is not compatible with the --primary-replica-only option.

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>

Closes scylladb/scylladb#23861

(cherry picked from commit c570941692)
2025-06-04 11:59:17 +03:00
Robert Bindar
36cc0f8e7e Refactor out code from test_restore_with_streaming_scopes
part 5: check_data_is_back

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
(cherry picked from commit 548a1ec20a)
2025-06-04 11:54:07 +03:00
Robert Bindar
a885c87547 Refactor out code from test_restore_with_streaming_scopes
part 4: compute_scope

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
(cherry picked from commit 29309ae533)
2025-06-04 11:54:01 +03:00
Robert Bindar
371fc05943 Refactor out code from test_restore_with_streaming_scopes
part 3: create_dataset

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
(cherry picked from commit a0f0580a9c)
2025-06-04 11:53:51 +03:00
Robert Bindar
4366cd5a81 Refactor out code from test_restore_with_streaming_scopes
part 2: take_snapshot

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
(cherry picked from commit 5171ca385a)
2025-06-04 11:53:43 +03:00
Robert Bindar
38ee119112 Refactor out code from test_restore_with_streaming_scopes
part 1: create_cluster

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
(cherry picked from commit f09bb20ac4)
2025-06-04 11:53:32 +03:00
Piotr Dulikowski
6edf92a9e3 Merge '[Backport 2025.2] test/boost: Adjust tests to RF-rack-valid keyspaces' from Scylladb[bot]
This PR adjusts existing Boost tests so they respect the invariant
introduced by enabling `rf_rack_valid_keyspaces` configuration option.
We disable it explicitly in more problematic tests. After that, we
enable the option by default in the whole test suite.

Fixes scylladb/scylladb#23958

Backport: backporting to 2025.1 to be able to test the implementation there too.

- (cherry picked from commit 6e2fb79152)

- (cherry picked from commit e4e3b9c3a1)

- (cherry picked from commit 1199c68bac)

- (cherry picked from commit cd615c3ef7)

- (cherry picked from commit fa62f68a57)

- (cherry picked from commit 22d6c7e702)

- (cherry picked from commit 237638f4d3)

- (cherry picked from commit c60035cbf6)

Parent PR: scylladb/scylladb#23802

Closes scylladb/scylladb#24368

* github.com:scylladb/scylladb:
  test/lib/cql_test_env.cc: Enable rf_rack_valid_keyspaces by default
  test/boost/tablets_test.cc: Explicitly disable rf_rack_valid_keyspaces in problematic tests
  test/boost/tablets_test.cc: Fix indentation in test_load_balancing_with_random_load
  test/boost/tablets_test.cc: Adjust test_load_balancing_with_random_load to RF-rack-validity
  test/boost/tablets_test.cc: Adjust test_load_balancing_works_with_in_progress_transitions to RF-rack-validity
  test/boost/tablets_test.cc: Adjust test_load_balancing_resize_requests to RF-rack-validity
  test/boost/tablets_test.cc: Adjust test_load_balancing_with_two_empty_nodes to RF-rack-validity
  test/boost/tablets_test.cc: Adjust test_load_balancer_shuffle_mode to RF-rack-validity
2025-06-04 10:24:35 +02:00
Nadav Har'El
609ad01bbc alternator: hide internal tags from users
The "tags" mechanism in Alternator is a convenient way to attach metadata
to Alternator tables. Recently we have started using it more and more for
internal metadata storage:

  * UpdateTimeToLive stores the attribute in a tag system:ttl_attribute
  * CreateTable stores provisioned throughput in tags
    system:provisioned_rcu and system:provisioned_wcu
  * CreateTable stores the table's creation time in a tag called
    system:table_creation_time.

We do not want any of these internal tags to be visible to a
ListTagsOfResource request, because if they are visible (as before this
patch), systems such as Terraform can get confused when they suddenly
see a tag which they didn't set - and may even attempt to delete it
(as reported in issue #24098).

Moreover, we don't want any of these internal tags to be writable
with TagResource or UntagResource: If a user wants to change the TTL
setting they should do it via UpdateTimeToLive - not by writing
directly to tags.

So in this patch we forbid read or write to *any* tag that begins
with the "system:" prefix, except one: "system:write_isolation".
That tag is deliberately intended to be writable by the user, as
a configuration mechanism, and is never created internally by
Scylla. We should have perhaps chosen a different prefix for
configurable vs. internal tags, or chosen more unique prefixes -
but let's not change these historic names now.

This patch also adds regression tests for the internal tags features,
failing before this patch and passing after:
1. internal tags, specifically system:ttl_attribute, are not visible
   in ListTagsOfResource, and cannot be modified by TagResource or
   UntagResource.
2. system:write_isolation is not internal, and be written by either
   TagResource or UntagResource, and read with ListTagsOfResource.

This patch also fixes a bug in the test where we added more checks
for system:write_isolation - test_tag_resource_write_isolation_values.
This test forgot to remove the system:write_isolation tags from
test_table when it ended, which would lead to other tests that run
later to run with a non-default write isolation - something which we
never intended.

Fixes #24098.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#24299

(cherry picked from commit 6cbcabd100)

Closes scylladb/scylladb#24377
2025-06-04 09:56:33 +03:00
Avi Kivity
10b7f2d924 pgo: drop Java configuration
Since 5e1cf90a51
("build: replace tools/java submodule with packaged cassandra-stress")
we run pre-packaged cassandra-stress. As such, we don't need to look for
a Java runtime (which is missing on the frozen toolchain) and can
rely on the cassandra-stress package finding its own Java runtime.

Fix by just dropping all the Java-finding stuff.

Note: Java 11 is in fact present on the frozen toolchain, just
not in a way that pgo.py can find it.

Fixes #24176.

Closes scylladb/scylladb#24178

(cherry picked from commit 29932a5af1)

Closes scylladb/scylladb#24254
2025-06-03 17:54:28 +03:00
Dawid Mędrek
5130ec84de test/lib/cql_test_env.cc: Enable rf_rack_valid_keyspaces by default
We've adjusted all of the Boost tests so they respect the invariant
enforced by the `rf_rack_valid_keyspaces` configuration option, or
explicitly disabled the option in those that turned out to be more
problematic and will require more attention. Thanks to that, we can
now enable it by default in the test suite.

(cherry picked from commit c60035cbf6)
2025-06-03 11:10:16 +00:00
Dawid Mędrek
9938183ace test/boost/tablets_test.cc: Explicitly disable rf_rack_valid_keyspaces in problematic tests
Some of the tests in the file verify more subtle parts of the behavior
of tablets and rely on topology layouts or using keyspaces that violate
the invariant the `rf_rack_valid_keyspaces` configuration option is
trying to enforce. Because of that, we explicitly disable the option
to be able to enable it by default in the rest of the test suite in
the following commit.

(cherry picked from commit 237638f4d3)
2025-06-03 11:10:16 +00:00
Dawid Mędrek
1271b42848 test/boost/tablets_test.cc: Fix indentation in test_load_balancing_with_random_load
(cherry picked from commit 22d6c7e702)
2025-06-03 11:10:16 +00:00
Dawid Mędrek
012e248792 test/boost/tablets_test.cc: Adjust test_load_balancing_with_random_load to RF-rack-validity
We make sure that the keyspaces created in the test are always RF-rack-valid.
To achieve that, we change how the test is performed.

Before this commit, we first created a cluster and then ran the actual test
logic multiple times. Each of those test cases created a keyspace with a random
replication factor.

That cannot work with `rf_rack_valid_keyspaces` set to true. We cannot modify
the property file of a node (see commit: eb5b52f598),
so once we set up the cluster, we cannot adjust its layout to work with another
replication factor.

To solve that issue, we also recreate the cluster in each test case. Now we choose
the replication factor at random, create a cluster distributing nodes across as many
racks as RF, and perform the rest of the logic. We perform it multiple times in
a loop so that the test behaves as before these changes.

(cherry picked from commit fa62f68a57)
2025-06-03 11:10:16 +00:00
Dawid Mędrek
1364eec694 test/boost/tablets_test.cc: Adjust test_load_balancing_works_with_in_progress_transitions to RF-rack-validity
We distribute the nodes used in the test across two racks so we can
run the test with `rf_rack_valid_keyspaces` set to true.

We want to avoid cross-rack migrations and keep the test as realistic
as possible. Since host3 is supposed to function as a new node in the
cluster, we change the layout of it: now, host1 has 2 shards and resides
in a separate rack. Most of the remaining test logic is preserved and behaves
as before this commit.

There is a slight difference in the tablet migrations. Before the commit,
we were migrating a tablet between nodes of different shard counts. Now
it's impossible because it would force us to migrate tablets between racks.
However, since the test wants to simply verify that an ongoing migration
doesn't interfere with load balancing and still leads to a perfect balance,
that still happens: we explicitly migrate ONLY 1 tablet from host2 to host3,
so to achieve the goal, one more tablet needs to be migrated, and we test
that.

(cherry picked from commit cd615c3ef7)
2025-06-03 11:10:16 +00:00
Dawid Mędrek
85fe37a8e4 test/boost/tablets_test.cc: Adjust test_load_balancing_resize_requests to RF-rack-validity
We assign the nodes created by the test to separate racks. It has no impact
on the test since the keyspace used in the test uses RF=2, so the tablet
replicas will still be the same.

(cherry picked from commit 1199c68bac)
2025-06-03 11:10:16 +00:00
Dawid Mędrek
e21bdbb9ef test/boost/tablets_test.cc: Adjust test_load_balancing_with_two_empty_nodes to RF-rack-validity
We distribute the nodes used in the test between two racks. Although
that may affect how tablets behave in general, this change will not
have any real impact on the test. The test verifies that load balancing
eventually balances tablets in the cluster, which will still happen.
Because of that, the changes in this commit are safe to apply.

(cherry picked from commit e4e3b9c3a1)
2025-06-03 11:10:16 +00:00
Dawid Mędrek
ca8762885b test/boost/tablets_test.cc: Adjust test_load_balancer_shuffle_mode to RF-rack-validity
We distribute the nodes used in the test between two racks. Although that
may have an impact on how tablets behave, it's orthogonal to what the test
verifies -- whether the topology coordinator is continuously in the tablet
migration track. Because of that, it's safe to make this change without
influencing the test.

(cherry picked from commit 6e2fb79152)
2025-06-03 11:10:15 +00:00
Michał Chojnowski
3a7a1dc4a9 test/boost/sstable_compressor_factory_test: define a test suite name
It seems that tests in test/boost/combined_tests have to define a test
suite name, otherwise they aren't picked up by test.py.

Fixes #24199

Closes scylladb/scylladb#24200

(cherry picked from commit ff8a119f26)

Closes scylladb/scylladb#24255
2025-06-03 12:01:35 +03:00
Anna Stuchlik
12596a8eca doc: add OS support for ScyllaDB 2025.2
This commit adds the information about support for platforms
in ScyllaDB version 20252.

Fixes https://github.com/scylladb/scylladb/issues/24180

Closes scylladb/scylladb#24263

(cherry picked from commit 28cb5a1e02)

Closes scylladb/scylladb#24335
2025-06-03 10:07:28 +03:00
Anna Stuchlik
be3f50b658 doc: update migration tools overview
This commit updates the migration overview page:

- It removes the info about migration from SSTable to CQL.
- It updates the link to the migrator docs.

Fixes https://github.com/scylladb/scylladb/issues/24247

Refs https://github.com/scylladb/scylladb/pull/21775

Closes scylladb/scylladb#24258

(cherry picked from commit b197d1a617)

Closes scylladb/scylladb#24282
2025-06-03 10:06:42 +03:00
Michał Chojnowski
6cd954de8d utils/stream_compressor: allocate memory for zstd compressors externally
The default and recommended way to use zstd compressors is to let
zstd allocate and free memory for compressors on its own.

That's what we did for zstd compressors used in RPC compression.
But it turns out that it generates allocation patterns we dislike.

We expected zstd not to generate allocations after the context object
is initialized, but it turns out that it tries to downsize the context
sometimes (by reallocation). We don't want that because the allocations
generated by zstd are large (1 MiB with the parameters we use),
so repeating them periodically stresses the reclaimer.

We can avoid this by using the "static context" API of zstd,
in which the memory for context is allocated manually by the user
of the library. In this mode, zstd doesn't allocate anything
on its own.

The implementation details of this patch adds a consideration for
forward compatibility: later versions of Scylla can't use a
window size greater than the one we hardcoded in this patch
when talking to the old version of the decompressor.

(This is not a problem, since those compressors are only used
for RPC compression at the moment, where cross-version communication
can be prevented by bumping COMPRESSOR_NAME. But it's something
that the developer who changes the window size must _remember_ to do).

Fixes #24160
Fixes #24183

Closes scylladb/scylladb#24161

(cherry picked from commit 185a032044)

Closes scylladb/scylladb#24281
2025-06-03 10:02:34 +03:00
Botond Dénes
9a7ea917eb mutation/mutation_compactor: cache regular/shadowable max-purgable in separate members
Max purgeable has two possible values for each partition: one for
regular tombstones and one for shadowable ones. Yet currently a single
member is used to cache the max-purgeable value for the partition, so
whichever kind of tombstone is checked first, its max-purgeable will
become sticky and apply to the other kind of tombstones too. E.g. if the
first can_gc() check is for a regular tombstone, its max-purgeable will
apply to shadowable tombstones in the partition too, meaning they might
not be purged, even though they are purgeable, as the shadowable
max-purgeable is expected to be more lenient. The other way around is
worse, as it will result in regular tombstone being incorrectly purged,
permitted by the more lenient shadowable tombstone max-purgeable.
Fix this by caching the two possible values in two separate members.
A reproducer unit test is also added.

Fixes: scylladb/scylladb#23272

Closes scylladb/scylladb#24171

(cherry picked from commit 7db956965e)

Closes scylladb/scylladb#24329
2025-06-03 09:51:52 +03:00
Ran Regev
c5cff9e14f changed the string literals into the correct ones
Fixes: #23970

use correct string literals:
KMIP_TAG_CRYPTOGRAPHIC_LENGTH_STR --> KMIP_TAGSTR_CRYPTOGRAPHIC_LENGTH
KMIP_TAG_CRYPTOGRAPHIC_USAGE_MASK_STR --> KMIP_TAGSTR_CRYPTOGRAPHIC_USAGE_MASK

From https://github.com/scylladb/scylladb/issues/23970 description of the
problem (emphasizes are mine):

When transparent data encryption at rest is enabled with KMIP as a key
provider, the observation is that before creating a new key, Scylla tries
to locate an existing key with provided specifications (key algorithm &
length), with the intention to re-use existing key, **but the attributes
sent in the request have minor spelling mistakes** which are rejected by
the KMIP server key provider, and hence scylla assumes that a key with
these specifications doesn't exist, and creates a new key in the KMIP
server. The issue here is that for every new table, ScyllaDB will create
a key in the KMIP server, which could clutter the KMS, and make key
lifecycle management difficult for DBAs.

Closes scylladb/scylladb#24057

(cherry picked from commit 37854acc92)

Closes scylladb/scylladb#24303
2025-06-02 15:11:53 +03:00
Michael Litvak
5aca2c134d test_cdc_generation_publishing: fix to read monotonically
The test test_multiple_unpublished_cdc_generations reads the CDC
generation timestamps to verify they are published in the correct order.
To do so it issues reads in a loop with a short sleep period and checks
the differences between consecutive reads, assuming they are monotonic.

However the assumption that the reads are monotonic is not valid,
because the reads are issued with consistency_level=ONE, thus we may read
timestamps {A,B} from some node, then read timestamps {A} from another
node that didn't apply the write of the new timestamp B yet. This will
trigger the assert in the test and fail.

To ensure the reads are monotonic we change the test to use consistency
level ALL for the reads.

Fixes scylladb/scylladb#24262

Closes scylladb/scylladb#24272

(cherry picked from commit 3a1be33143)

Closes scylladb/scylladb#24336
2025-06-02 14:42:57 +03:00
Anna Stuchlik
cc299e335d doc: remove copyright from Cassandra Stress
This commit removes the Apache copyright note from the Cassandra Stress page.

It's a follow up to https://github.com/scylladb/scylladb/pull/21723, which missed
that update (see https://github.com/scylladb/scylladb/pull/21723#discussion_r1944357143).

Cassandra Stress is a separate tool with separate repo with the docs, so the copyright
information on the page is incorrect.

Fixes https://github.com/scylladb/scylladb/issues/23240

Closes scylladb/scylladb#24219

(cherry picked from commit d303edbc39)

Closes scylladb/scylladb#24256
2025-06-02 14:41:34 +03:00
David Garcia
a7b34a54bc docs: fix \t (tab) is not rendered correctly
Closes scylladb/scylladb#24096

(cherry picked from commit bf9534e2b5)

Closes scylladb/scylladb#24257
2025-06-02 14:40:54 +03:00
Pavel Emelyanov
eb78d3aefb test/result_utils: Do not assume map_reduce reducing order
When map_reduce is called on a collection, one shouldn't expect that it
processes the elements of the collection in any specific order.

Current test of map-reduce over boost outcome assumes that if reduce
function is the string concatenation, then it would concatenate the
given vector of strings in the order they are listed. That requirement
should be relaxed, and the result may have reversed concatentation.

Fixes scylladb/scylladb#24321

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#24325

(cherry picked from commit a65ffdd0df)

Closes scylladb/scylladb#24337
2025-06-02 14:00:07 +03:00
Jenkins Promoter
304f47f6ec Update ScyllaDB version to: 2025.2.0-rc3 2025-06-01 15:29:44 +03:00
Ferenc Szili
8ba5a1be70 tablets: trigger mutation source refresh on tablet count change
Consider the following scenario:

- let's assume tablet 0 has range [1, 5] (pre merge)
- tablet merge happens, tablet 0 has now range [1, 10]
- tablet_sstable_set isn't refreshed, so holds a stale state, thinks tablet
  0 still has range [1, 5]
- during a full scan, forward service will intersect the full range with
  tablet ranges and consume one tablet at a time
- replica service is asked to consume range [1, 10] of tablet 0 (post merge)

We have two possible outcomes:

With cache bypass:
1) cache reader is bypassed
2) sstable reader is created on range [1, 10]
3) unrefreshed tablet_sstable_set holds stale state, but select correctly
   all sstables intersecting with range [1, 10]

With cache:
1) cache reader is created
2) finds partition with token 5 is cached
3) sstable reader is created on range [1, 4] (later would fast forward to
   range [6, 10]; also belongs to tablet 0)
4) incremental selector consumes the pre-merge sstable spanning range [1, 5]
4.1) since the partitioned_sstable_set pre-merge contains only that sstable,
     EOS is reached
4.2) since EOS is reached, the fast forward to range [6, 10] is not allowed.

So with the set refreshed, sstable set is aligned with tablet ranges, and no
premature EOS is signalled, otherwise preventing fast forward to from
happening and all data from being properly captured in the read.

This change fixes the bug and triggeres a mutation source refresh whenever
the number of tablets for the table has changed, not only when we have
incoming tablets.

Fixes: #23313
(cherry picked from commit d0329ca370)
2025-05-30 17:08:45 +00:00
Anna Stuchlik
20602b6a8b doc: clarify RF increase issues for tablets vs. vnodes
This commit updates the guidelines for increasing the Replication Factor
depending on whether tablets are enabled or disabled.

To present it in a clear way, I've reorganized the page.

Fixes https://github.com/scylladb/scylladb/issues/23667

Closes scylladb/scylladb#24221

(cherry picked from commit efce03ef43)

Closes scylladb/scylladb#24284
2025-05-30 15:16:17 +03:00
Botond Dénes
19513fa47e Merge '[Backport 2025.2] raft_sys_table_storage: avoid temp buffer when deserializing log_entry' from Scylladb[bot]
The get_blob method linearizes data by copying it into a single buffer, which can cause 'oversized allocation' warnings.

In this commit we avoid copying by creating input stream on top of the original fragmened managed bytes, returned by untyped_result_set_row::get_view.

fixes scylladb/scylladb#23903

backport: no need, not a critical issue.

- (cherry picked from commit 6496ae6573)

- (cherry picked from commit f245b05022)

Parent PR: #24123

Closes scylladb/scylladb#24317

* github.com:scylladb/scylladb:
  raft_sys_table_storage: avoid temporary buffer when deserializing log_entry
  serializer_impl.hh:  add as_input_stream(managed_bytes_view) overload
2025-05-30 09:14:43 +03:00
Wojciech Mitros
dec10d348e test: actually wait for tablets to distribute across nodes
In test_tablet_mv_replica_pairing_during_replace, after we create
the tables, we want to wait for their tablets to distribute evenly
across nodes and we have a wait_for for that.
But we don't await this wait_for, so it's a no-op. This patch fixes
it by adding the missing await.

Refs scylladb/scylladb#23982
Refs scylladb/scylladb#23997

Closes scylladb/scylladb#24250

(cherry picked from commit 5074daf1b7)

Closes scylladb/scylladb#24311
2025-05-29 16:44:51 +02:00
Petr Gusev
ffea5e67c1 raft_sys_table_storage: avoid temporary buffer when deserializing log_entry
The get_blob() method linearizes data by copying it into a
single buffer, which can trigger "oversized allocation" warnings.
This commit avoids that extra copy by creating an input stream
directly over the original fragmented managed bytes returned by
untyped_result_set_row::get_view().

Fixes scylladb/scylladb#23903

(cherry picked from commit f245b05022)
2025-05-29 08:42:09 +00:00
Petr Gusev
bcbbc40026 serializer_impl.hh: add as_input_stream(managed_bytes_view) overload
It's useful to have it here so that people can find it easily.

(cherry picked from commit 6496ae6573)
2025-05-29 08:42:09 +00:00
Anna Stuchlik
70d9352cec doc: remove the redundant pages
This commit removes two redundant pages and adds the related redirections.

- The Tutorials page is a duplicate and is not maintained anymore.
  Having it in the docs hurts the SEO of the up-to-date Tutorias page.
- The Contributing page is not helpful. Contributions-related information
  should be maintained in the project README file.

Fixes https://github.com/scylladb/scylladb/issues/17279
Fixes https://github.com/scylladb/scylladb/issues/24060

Closes scylladb/scylladb#24090

(cherry picked from commit eed8373b77)

Closes scylladb/scylladb#24220
2025-05-26 10:30:03 +03:00
Pavel Emelyanov
e215350c61 Revert "encryption_test: Catch exact exception"
This reverts commit 59bf300e83.

KMS tests became flaky after it: #24218
Need to revisit.
2025-05-20 13:51:07 +03:00
Ernest Zaslavsky
59bf300e83 encryption_test: Catch exact exception
Apparently `test_kms_network_error` will succeed at any circumstances since most of our exceptions derive from `std::exception`, so whatever happens to the test, for whatever reason it will throw, the test will be marked as passed.

Start catching the exact exception that we expect to be thrown.

Closes scylladb/scylladb#24065

(cherry picked from commit 2d5c0f0cfd)

Closes scylladb/scylladb#24147
2025-05-20 08:27:56 +03:00
Aleksandra Martyniuk
6d733051de cql_test_env: main: move stream_manager initialization
Currently, stream_manager is initialized after storage_service and
so it is stopped before the storage_service is. In its stop method
storage_service accesses stream_manager which is uninitialized
at a time.

Move stream_manager initialization over the storage_service initialization.

Fixes: #23207.

Closes scylladb/scylladb#24008

(cherry picked from commit 9c03255fd2)

Closes scylladb/scylladb#24190
2025-05-20 08:27:26 +03:00
Ernest Zaslavsky
24c134992b database_test: Wait for the index to be created
Just call `wait_until_built` for the index in question

fix: https://github.com/scylladb/scylladb/issues/24059

Closes scylladb/scylladb#24117

(cherry picked from commit 4a7c847cba)

Closes scylladb/scylladb#24132
2025-05-19 12:08:41 +03:00
Wojciech Mitros
9247c9472a mv: remove queue length limit from the view update read concurrency semaphore
Each view update is correlated to a write that generates it (aside from view
building which is throttled separately). These writes are limited by a throttling
mechanism, which effectively works by performing the writes with CL=ALL if
ongoing writes exceed some memory usage limit

When writes generate view updates, they usually also need to perform a read. This read
goes through a read concurrency semaphore where it can get delayed or killed. The
semaphore allows up to 100 concurrent reads and puts all remaining reads in a queue.
If the number of queued reads exceeds a specific limit, the view update will fail on
the replica, causing inconsistencies.

This limit is not necessary. When a read gets queued on the semaphore, the write that's
causing the view update is paused, so the write takes part in the regular write throttling.
If too many writes get stuck on view update reads, they will get throttled, so their
number is limited and the number of queued reads is also limited to the same amount.

In this patch we remove the specified queue length limit for the view update read concurrency
semaphore. Instead of this limit, the queue will be now limited indirectly, by the base write
throttling mechanism. This may allow the queue grow longer than with the previous limit, but
it shouldn't ever cause issues - we only perform up to 100 actual reads at once, and the
remaining ones that get queued use a tiny amount of memory, less than the writes that generated
them and which are getting limited directly.

Fixes https://github.com/scylladb/scylladb/issues/23319

Closes scylladb/scylladb#24112

(cherry picked from commit 5920647617)

Closes scylladb/scylladb#24170
2025-05-19 12:05:48 +03:00
Anna Stuchlik
ab8d50b5e7 doc: fix the product name for version 2025.1
Starting with 2025.1, ScyllaDB versions are no longer called "Enterprise",
but the OS support page still uses that label.
This commit fixes that by replacing "Enterprise" with "ScyllaDB".

This update is required since we've removed "Enterprise" from everywhere else,
including the commands, so having it here is confusing.

Fixes https://github.com/scylladb/scylladb/issues/24179

Closes scylladb/scylladb#24181

(cherry picked from commit 2d7db0867c)

Closes scylladb/scylladb#24204
2025-05-19 12:03:35 +03:00
Dawid Mędrek
7986ef73da locator/production_snitch_base: Reduce log level when property file incomplete
We're reducing the log level in case the provided property file is incomplete.
The rationale behind this change is related to how CCM interacts with Scylla:

* The `GossipingPropertyFileSnitch` reloads the `cassandra-rackdc.properties`
  configuration every 60 seconds.
* When a new node is added to the cluster, CCM recreates the
  `cassandra-rackdc.properties` file for EVERY node.

If those two processes start happening at about the same time, it may lead
to Scylla trying to read a not-completely-recreated file, and an error will
be produced.

Although we would normally fix this issue and try to avoid the race, that
behavior will be no longer relevant as we're making the rack and DC values
immutable (cf. scylladb/scylladb#23278). What's more, trying to fix the problem
in the older versions of Scylla could bring a more serious regression. Having
that in mind, this commit is a compromise between making CI less flaky and
having minimal impact when backported.

We do the same for when the format of the file is invalid: the rationale
is the same.

We also do that for when there is a double declaration. Although it seems
impossible that this can stem from the same scenario the other two errors
can (since if the format of the file is valid, the error is justified;
if the format is invalid, it should be detected sooner than a doubled
declaration), let's stay consistent with the logging level.

Fixes scylladb/scylladb#20092

Closes scylladb/scylladb#23956

(cherry picked from commit 9ebd6df43a)

Closes scylladb/scylladb#24143
2025-05-16 11:51:23 +03:00
Wojciech Mitros
847504ad25 test_mv_tablets_replace: wait for tablet replicas to balance before working on them
In the test test_tablet_mv_replica_pairing_during_replace we stop 2 out of 4 servers while using RF=2.
Even though in the test we use exactly 4 tablets (1 for each replica of a base table and view), intially,
the tablets may not be split evenly between all nodes. Because of this, even when we chose a server that
hosts the view and a different server that hosts the base table, we sometimes stoped all replicas of the
base or the view table because the node with the base table replica may also be a view replica.

After some time, the tablets should be distributed across all nodes. When that happens, there will be
no common nodes with a base and view replica, so the test scenario will continue as planned.

In this patch, we add this waiting period after creating the base and view, and continue the test only
when all 4 tablets are on distinct nodes.

Fixes https://github.com/scylladb/scylladb/issues/23982
Fixes https://github.com/scylladb/scylladb/issues/23997

Closes scylladb/scylladb#24111

(cherry picked from commit bceb64fb5a)

Closes scylladb/scylladb#24126
2025-05-16 11:51:07 +03:00
Pavel Emelyanov
854587c10c Merge '[Backport 2025.2] test/cluster: Adjust tests to RF-rack-valid keyspaces' from Scylladb[bot]
In this PR, we're adjusting most of the cluster tests so that they pass
with the `rf_rack_valid_keyspaces` configuration option enabled. In most
cases, the changes are straightforward and require little to no additional
insight into what the tests are doing or verifying. In some, however, doing
that does require a deeper understanding of the tests we're modifying.
The justification for those changes and their correctness is included in
the commit messages corresponding to them.

Note that this PR does not cover all of the cluster tests. There are few
remaining ones, but they require a bit more effort, so we delegate that
work to a separate PR.

I tested all of the modified tests locally with `rf_rack_valid_keyspaces`
set to true, and they all passed.

Fixes scylladb/scylladb#23959

Backport: we want to backport these changes to 2025.1 since that's the version where we introduced RF-rack-valid keyspaces in. Although the tests are not, by default, run with `rf_rack_valid_keyspaces` enabled yet, that will most likely change in the near future and we'll also want to backport those changes too. The reason for this is that we want to verify that Scylla works correctly even with that constraint.

- (cherry picked from commit dbb8835fdf)

- (cherry picked from commit 9281bff0e3)

- (cherry picked from commit 5b83304b38)

- (cherry picked from commit 73b22d4f6b)

- (cherry picked from commit 2882b7e48a)

- (cherry picked from commit 4c46551c6b)

- (cherry picked from commit 92f7d5bf10)

- (cherry picked from commit 5d1bb8ebc5)

- (cherry picked from commit d3c0cd6d9d)

- (cherry picked from commit 04567c28a3)

- (cherry picked from commit c8c28dae92)

- (cherry picked from commit c4b32c38a3)

- (cherry picked from commit ee96f8dcfc)

Parent PR: #23661

Closes scylladb/scylladb#24121

* github.com:scylladb/scylladb:
  test/cluster/suite.yaml: Enable rf_rack_valid_keyspaces in suite
  test/cluster: Disable rf_rack_valid_keyspaces in problematic tests
  test/cluster/test_tablets: Divide rack into two to adjust tests to RF-rack-validity
  test/cluster/test_tablets: Adjust test_tablet_rf_change to RF-rack-validity
  test/cluster/test_tablet_repair_scheduler.py: Adjust to RF-rack-validity
  test/pylib/repair.py: Assign nodes to multiple racks in create_table_insert_data_for_repair
  test/cluster/test_zero_token_nodes_topology_ops: Adjust to RF-rack-validity
  test/cluster/test_zero_token_nodes_no_replication.py: Adjust to RF-rack-validity
  test/cluster/test_zero_token_nodes_multidc.py: Adjust to RF-rack-validity
  test/cluster/test_not_enough_token_owners.py: Adjust to RF-rack-validity
  test/cluster/test_multidc.py: Adjust to RF-rack-validity
  test/cluster/object_store/test_backup.py: Adjust to RF-rack-validity
  test/cluster: Adjust simple tests to RF-rack-validity
2025-05-16 11:50:45 +03:00
Pavel Emelyanov
9058d5658b Merge '[Backport 2025.2] logalloc_test: don't test performance in test background_reclaim' from Scylladb[bot]
The test is failing in CI sometimes due to performance reasons.

There are at least two problems:
1. The initial 500ms (wall time) sleep might be too short. If the reclaimer
   doesn't manage to evict enough memory during this time, the test will fail.
2. During the 100ms (thread CPU time) window given by the test to background
   reclaim, the `background_reclaim` scheduling group isn't actually
   guaranteed to get any CPU, regardless of shares. If the process is
   switched out inside the `background_reclaim` group, it might
   accumulate so much vruntime that it won't get any more CPU again
   for a long time.

We have seen both.

This kind of timing test can't be run reliably on overcommitted machines
without modifying the Seastar scheduler to support that (by e.g. using
thread clock instead of wall time clock in the scheduler), and that would
require an amount of effort disproportionate to the value of the test.

So for now, to unflake the test, this patch removes the performance test
part. (And the tradeoff is a weakening of the test). After the patch,
we only check that the background reclaim happens *eventually*.

Fixes https://github.com/scylladb/scylladb/issues/15677

Backporting this is optional. The test is flaky even in stable branches, but the failure is rare.

- (cherry picked from commit c47f438db3)

- (cherry picked from commit 1c1741cfbc)

Parent PR: #24030

Closes scylladb/scylladb#24094

* github.com:scylladb/scylladb:
  logalloc_test: don't test performance in test `background_reclaim`
  logalloc: make background_reclaimer::free_memory_threshold publicly visible
2025-05-16 11:50:17 +03:00
Gleb Natapov
dd9ec03323 topology coordinator: make decommissioning node non voter before completing the operation
A decommissioned node is removed from a raft config after operation is
marked as completed. This is required since otherwise the decommissioned
node will not see that decommission has completed (the status is
propagated through raft). But right after the decommission is marked as
completed a decommissioned node may terminate, so in case of a two node
cluster, the configuration change that removes it from the raft will fail,
because there will no be quorum.

The solution is to mark the decommissioning node as non voter before
reporting the operation as completed.

Fixes: #24026

Backport to 2025.2 because it fixes a potential hang. Don't backport to
branches older than 2025.2 because they don't have
8b186ab0ff, which caused this issue.

Closes scylladb/scylladb#24027

(cherry picked from commit c6e1758457)

Closes scylladb/scylladb#24093
2025-05-16 11:49:46 +03:00
Pavel Emelyanov
eef6a95e26 Merge '[Backport 2025.2] replica: Fix use-after-free with concurrent schema change and sstable set update' from Scylladb[bot]
When schema is changed, sstable set is updated according to the compaction strategy of the new schema (no changes to set are actually made, just the underlying set type is updated), but the problem is that it happens without a lock, causing a use-after-free when running concurrently to another set update.

Example:

1) A: sstable set is being updated on compaction completion
2) B: schema change updates the set (it's non deferring, so it happens in one go) and frees the set used by A.
3) when A resumes, system will likely crash since the set is freed already.

ASAN screams about it:
SUMMARY: AddressSanitizer: heap-use-after-free sstables/sstable_set.cc ...

Fix is about deferring update of the set on schema change to compaction, which is triggered after new schema is set. Only strategy state and backlog tracker are updated immediately, which is fine since strategy doesn't depend on any particular implementation of sstable set.

Fixes #22040.

- (cherry picked from commit 628bec4dbd)

- (cherry picked from commit 434c2c4649)

Parent PR: #23680

Closes scylladb/scylladb#24085

* github.com:scylladb/scylladb:
  replica: Fix use-after-free with concurrent schema change and sstable set update
  sstables: Implement sstable_set_impl::all_sstable_runs()
2025-05-16 11:49:21 +03:00
Piotr Smaron
9f2a13c8c2 cql: fix CREATE tablets KS warning msg
Materialized Views and Secondary Indexes are yet another features that
keyspaces with tablets do not support, but these were not listed in a
warning message returned to the user on CREATE KEYSPACE statement. This
commit adds the 2 missing features.

Fixes: #24006

Closes scylladb/scylladb#23902

(cherry picked from commit f740f9f0e1)

Closes scylladb/scylladb#24084
2025-05-16 11:49:00 +03:00
Piotr Dulikowski
4792a27396 topology_coordinator: silence ERROR messages on abort
When the topology coordinator is shut down while doing a long-running
operation, the current operation might throw a raft::request_aborted
exception. This is not a critical issue and should not be logged with
ERROR verbosity level.

Make sure that all the try..catch blocks in the topology coordinator
which:

- May try to acquire a new group0 guard in the `try` part
- Have a `catch (...)` block that print an ERROR-level message

...have a pass-through `catch (raft::request_aborted&)` block which does
not log the exception.

Fixes: scylladb/scylladb#22649

Closes scylladb/scylladb#23962

(cherry picked from commit 156ff8798b)

Closes scylladb/scylladb#24082
2025-05-16 11:48:43 +03:00
Aleksandra Martyniuk
f26c2b22dc test_tablet_repair_hosts_filter: change injected error
test_tablet_repair_hosts_filter checks whether the host filter
specfied for tablet repair is correctly persisted. To check this,
we need to ensure that the repair is still ongoing and its data
is kept. The test achieves that by failing the repair on replica
side - as the failed repair is going to be retried.

However, if the filter does not contain any host (included_host_count = 0),
the repair is started on no replica, so the request succeeds
and its data is deleted. The test fails if it checks the filter
after repair request data is removed.

Fail repair on topology coordinator side, so the request is ongoing
regardless of the specified hosts.

Fixes: #23986.

Closes scylladb/scylladb#24003

(cherry picked from commit 2549f5e16b)

Closes scylladb/scylladb#24080
2025-05-16 11:48:27 +03:00
Botond Dénes
163b65cec4 tools/scylla-nodetool: status: handle negative load sizes
Negative load sizes don't make sense, but we've seen a case in
production, where a negative number was returned by ScyllaDB REST API,
so be prepared to handle these too.

Fixes: scylladb/scylladb#24134

Closes scylladb/scylladb#24135

(cherry picked from commit 700a5f86ed)

Closes scylladb/scylladb#24169
2025-05-15 17:36:48 +03:00
Aleksandra Martyniuk
fcde30d2b0 streaming: use host_id in file streaming
Use host ids instead of ips in file-streaming.

Fixes: #22421.

Closes scylladb/scylladb#24055

(cherry picked from commit 2dcea5a27d)

Closes scylladb/scylladb#24119
2025-05-14 22:13:48 +02:00
Jenkins Promoter
26bd28dac9 Update ScyllaDB version to: 2025.2.0-rc2 2025-05-14 20:59:54 +03:00
Jenkins Promoter
6f1efcff31 Update ScyllaDB version to: 2025.2.0-rc1 2025-05-13 22:48:32 +03:00
Dawid Mędrek
204f9e2cc8 test/cluster/suite.yaml: Enable rf_rack_valid_keyspaces in suite
Almost all of the tests have been adjusted to be able to be run with
the `rf_rack_valid_keyspaces` configuration option enabled, while
the rest, a minority, create nodes with it disabled. Thanks to that,
we can enable it by default, so let's do that.

(cherry picked from commit ee96f8dcfc)
2025-05-12 23:11:34 +02:00
Dawid Mędrek
0c6a449a30 test/cluster: Disable rf_rack_valid_keyspaces in problematic tests
Some of the tests in the test suite have proven to be more problematic
in adjusting to RF-rack-validity. Since we'd like to run as many tests
as possible with the `rf_rack_valid_keyspaces` configuration option
enabled, let's disable it in those. In the following commit, we'll enable
it by default.

(cherry picked from commit c4b32c38a3)
2025-05-12 23:11:30 +02:00
Botond Dénes
7673a17365 Merge 'compress: fix an internal error when a specific debug log is enabled' from Michał Chojnowski
compress: fix an internal error when a specific debug log is enabled
While iterating over the recent 69684e16d8,
series I shot myself in the foot by defining `algorithm_to_name(algorithm::none)`
to be an internal error, and later calling that anyway in a debug log.

(Tests didn't catch it because there's no test which simultaneously
enables the debug log and configures some table to have no compression).

This proves that `algorithm_to_name` is too much of a footgun.
Fix it so that calling `algorithm_to_name(algorithm::none)` is legal.
In hindsight, I should have done that immediately.

Fixes #23624

Fix for recently-added code, no backporting needed.

Closes scylladb/scylladb#23625

* github.com:scylladb/scylladb:
  test_sstable_compression_dictionaries: reproduce an internal error in debug logging
  compress: fix an internal error when a specific debug log is enabled

(cherry picked from commit 746382257c)
2025-05-12 23:13:59 +03:00
Avi Kivity
ae05d62b97 Merge '[Backport 2025.2] compress: make sstable compression dictionaries NUMA-aware ' from Scylladb[bot]
compress: distribute compression dictionaries over shards
We don't want each shard to have its own copy of each dictionary.
It would unnecessary pressure on cache and memory.
Instead, we want to share dictionaries between shards.

Before this commit, all dictionaries live on shard 0.
All other shards borrow foreign shared pointers from shard 0.

There's a problem with this setup: dictionary blobs receive many random
accesses. If shard 0 is on a remote NUMA node, this could pose
a performance problem.

Therefore, for each dictionary, we would like to have one copy per NUMA node,
not one copy per the entire machine. And each shard should use the copy
belonging to its own NUMA node. This is the main goal of this patch.

There is another issue with putting all dicts on shard 0: it eats
an assymetric amount of memory from shard 0.
This commit spreads the ownership of dicts over all shards within
the NUMA group, to make the situation more symmetric.
(Dict owner is decided based on the hash of dict contents).

It should be noted that the last part isn't necessarily a good thing,
though.
While it makes the situation more symmetric within each node,
it makes it less symmetric across the cluster, if different node
sizes are present.

If dicts occupy 1% of memory on each shard of a 100-shard node,
then the same dicts would occupy 100% of memory on a 1-shard node.

So for the sake of cluster-wide symmetry, we might later want to consider
e.g. making the memory limit for dictionaries inversely proportional
to the number of shards.

New functionality, added to a feature which isn't in any stable branch yet. No backporting.

Edit: no backporting to <=2025.1, but need backporting to 2025.2, where the feature is introduced.

Fixes #24108

- (cherry picked from commit 0e4d0ded8d)

- (cherry picked from commit 8649adafa8)

- (cherry picked from commit 1bcf77951c)

- (cherry picked from commit 6b831aaf1b)

- (cherry picked from commit e952992560)

- (cherry picked from commit 66a454f61d)

- (cherry picked from commit 518f04f1c4)

- (cherry picked from commit f075674ebe)

Parent PR: #23590

Closes scylladb/scylladb#24109

* github.com:scylladb/scylladb:
  test: add test/boost/sstable_compressor_factory_test
  compress: add some test-only APIs
  compress: rename sstable_compressor_factory_impl to dictionary_holder
  compress: fix indentation
  compress: remove sstable_compressor_factory_impl::_owner_shard
  compress: distribute compression dictionaries over shards
  test: switch uses of make_sstable_compressor_factory() to a seastar::thread-dependent version
  test: remove sstables::test_env::do_with()
2025-05-12 23:11:12 +03:00
Dawid Mędrek
5c5911d874 test/cluster/test_tablets: Divide rack into two to adjust tests to RF-rack-validity
Three tests in the file use a multi-DC cluster. Unfortunately, they put
all of the nodes in a DC in the same rack and because of that, they fail
when run with the `rf_rack_valid_keyspaces` configuration option enabled.
Since the tests revolve mostly around zero-token nodes and how they
affect replication in a keyspace, this change should have zero impact on
them.

(cherry picked from commit c8c28dae92)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
6a2e52d250 test/cluster/test_tablets: Adjust test_tablet_rf_change to RF-rack-validity
We reduce the number of nodes and the RF values used in the test
to make sure that the test can be run with the `rf_rack_valid_keyspaces`
configuration option. The test doesn't seem to be reliant on the
exact number of nodes, so the reduction should not make any difference.

(cherry picked from commit 04567c28a3)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
f98c83b92f test/cluster/test_tablet_repair_scheduler.py: Adjust to RF-rack-validity
The change boils down to matching the number of created racks to the number
of created nodes in each DC in the auxiliary function `prepare_multi_dc_repair`.
This way, we ensure that the created keyspace will be RF-rack-valid and so
we can run the test file even with the `rf_rack_valid_keyspaces` configuration
option enabled.

The change has no impact on the tests that use the function; the distribution
of nodes across racks does not affect how repair is performed or what the
tests do and verify. Because of that, the change is correct.

(cherry picked from commit d3c0cd6d9d)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
f5cf4a3893 test/pylib/repair.py: Assign nodes to multiple racks in create_table_insert_data_for_repair
We assign the newly created nodes to multiple racks. If RF <= 3,
we create as many racks as the provided RF. We disallow the case
of  RF > 3 to avoid trying to create an RF-rack-invalid keyspace;
note that no existing test calls `create_table_insert_data_for_repair`
providing a higher RF. The rationale for doing this is we want to ensure
that the tests calling the function can be run with the
`rf_rack_valid_keyspaces` configuration option enabled.

(cherry picked from commit 5d1bb8ebc5)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
12f0136b26 test/cluster/test_zero_token_nodes_topology_ops: Adjust to RF-rack-validity
We assign the nodes to the same DC, but multiple racks to ensure that
the created keyspace is RF-rack-valid and we can run the test with
the `rf_rack_valid_keyspaces` configuration option enabled. The changes
do not affect what the test does and verifies.

(cherry picked from commit 92f7d5bf10)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
4e45ceda21 test/cluster/test_zero_token_nodes_no_replication.py: Adjust to RF-rack-validity
We simply assign the nodes used in the test to seprate racks to
ensure that the created keyspace is RF-rack-valid to be able
to run the test with the `rf_rack_valid_keyspaces` configuration
option set to true. The change does not affect what the test
does and verifies -- it only depends on the type of nodes,
whether they are normal token owners or not -- and so the changes
are correct in that sense.

(cherry picked from commit 4c46551c6b)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
2c8b5143ba test/cluster/test_zero_token_nodes_multidc.py: Adjust to RF-rack-validity
We parameterize the test so it's run with and without enforced
RF-rack-valid keyspaces. In the test itself, we introduce a branch
to make sure that we won't run into a situation where we're
attempting to create an RF-rack-invalid keyspace.

Since the `rf_rack_valid_keyspaces` option is not commonly used yet
and because its semantics will most likely change in the future, we
decide to parameterize the test rather than try to get rid of some
of the test cases that are problematic with the option enabled.

(cherry picked from commit 2882b7e48a)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
474de0f048 test/cluster/test_not_enough_token_owners.py: Adjust to RF-rack-validity
We simply assign DC/rack properties to every node used in the test.
We put all of them in the same DC to make sure that the cluster behaves
as closely to how it would before these changes. However, we distribute
them over multiple racks to ensure that the keyspace used in the test
is RF-rack-valid, so we can also run it with the `rf_rack_valid_keyspaces`
configuration option set to true. The distribution of nodes between racks
has no effect on what the test does and verifies, so the changes are
correct in that sense.

(cherry picked from commit 73b22d4f6b)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
5ac07a6c72 test/cluster/test_multidc.py: Adjust to RF-rack-validity
Instead of putting all of the nodes in a DC in the same rack
in `test_putget_2dc_with_rf`, we assign them to different racks.
The distribution of nodes in racks is orthogonal to what the test
is doing and verifying, so the change is correct in that sense.
At the same time, it ensures that the test never violates the
invariant of RF-rack-valid keyspaces, so we can also run it
with `rf_rack_valid_keyspaces` set to true.

(cherry picked from commit 5b83304b38)
2025-05-12 13:10:11 +00:00
Dawid Mędrek
f88d8edcaf test/cluster/object_store/test_backup.py: Adjust to RF-rack-validity
We modify the parameters of `test_restore_with_streaming_scopes`
so that it now represents a pair of values: topology layout and
the value `rf_rack_valid_keyspaces` should be set to.

Two of the already existing parameters violate RF-rack-validity
and so the test would fail when run with `rf_rack_valid_keyspaces: true`.
However, since the option isn't commonly used yet and since the
semantics of RF-rack-valid keyspaces will most likely change in
the future, let's keep those cases and just run them with the
option disabled. This way, we still test everything we can
without running into undesired failures that don't indicate anything.

(cherry picked from commit 9281bff0e3)
2025-05-12 13:10:11 +00:00
Dawid Mędrek
05c70b0820 test/cluster: Adjust simple tests to RF-rack-validity
We adjust all of the simple cases of cluster tests so they work
with `rf_rack_valid_keyspaces: true`. It boils down to assigning
nodes to multiple racks. For most of the changes, we do that by:

* Using `pytest.mark.prepare_3_racks_cluster` instead of
  `pytest.mark.prepare_3_nodes_cluster`.
* Using an additional argument -- `auto_rack_dc` -- when calling
  `ManagerClient::servers_add()`.

In some cases, we need to assign the racks manually, which may be
less obvious, but in every such situation, the tests didn't rely
on that assignment, so that doesn't affect them or what they verify.

(cherry picked from commit dbb8835fdf)
2025-05-12 13:10:11 +00:00
Michał Chojnowski
732321e3b8 test: add test/boost/sstable_compressor_factory_test
Add a basic test for NUMA awareness of `default_sstable_compressor_factory`.

(cherry picked from commit f075674ebe)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
a2622e1919 compress: add some test-only APIs
Will be needed by the test added in the next patch.

(cherry picked from commit 518f04f1c4)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
270bf34846 compress: rename sstable_compressor_factory_impl to dictionary_holder
Since sstable_compressor_factory_impl no longer
implements sstable_compressor_factory, the name can be
misleading. Rename it to something closer to its new role.

(cherry picked from commit 66a454f61d)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
168f694c5d compress: fix indentation
Purely cosmetic.

(cherry picked from commit e952992560)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
b5579be915 compress: remove sstable_compressor_factory_impl::_owner_shard
Before the series, sstable_compressor_factory_impl was directly
accessed by multiple shards. Now, it's a part of a `sharded`
data structure and is never directly from other shards,
so there's no need to check for that. Remove the leftover logic.

(cherry picked from commit 6b831aaf1b)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
ad60d765f9 compress: distribute compression dictionaries over shards
We don't want each shard to have its own copy of each dictionary.
It would unnecessary pressure on cache and memory.
Instead, we want to share dictionaries between shards.

Before this commit, all dictionaries live on shard 0.
All other shards borrow foreign shared pointers from shard 0.

There's a problem with this setup: dictionary blobs receive many random
accesses. If shard 0 is on a remote NUMA node, this could pose
a performance problem.

Therefore, for each dictionary, we would like to have one copy per NUMA node,
not one copy per the entire machine. And each shard should use the copy
belonging to its own NUMA node. This is the main goal of this patch.

There is another issue with putting all dicts on shard 0: it eats
an assymetric amount of memory from shard 0.
This commit spreads the ownership of dicts over all shards within
the NUMA group, to make the situation more symmetric.
(Dict owner is decided based on the hash of dict contents).

It should be noted that the last part isn't necessarily a good thing,
though.
While it makes the situation more symmetric within each node,
it makes it less symmetric across the cluster, if different node
sizes are present.

If dicts occupy 1% of memory on each shard of a 100-shard node,
then the same dicts would occupy 100% of memory on a 1-shard node.

So for the sake of cluster-wide symmetry, we might later want to consider
e.g. making the memory limit for dictionaries inversely proportional
to the number of shards.

(cherry picked from commit 1bcf77951c)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
68d2086fa5 test: switch uses of make_sstable_compressor_factory() to a seastar::thread-dependent version
In next patches, make_sstable_compressor_factory() will have to
disappear.
In preparation for that, we switch to a seastar::thread-dependent
replacement.

(cherry picked from commit 8649adafa8)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
403d43093f test: remove sstables::test_env::do_with()
`sstable_manager` depends on `sstable_compressor_factory&`.
Currently, `test_env` obtains an implementation of this
interface with the synchronous `make_sstable_compressor_factory()`.

But after this patch, the only implementation of that interface
`sstable_compressor_factory&` will use `sharded<...>`,
so its construction will become asynchronous,
and the synchronous `make_sstable_compressor_factory()` must disappear.

There are several possible ways to deal with this, but I think the
easiest one is to write an asynchronous replacement for
`make_sstable_compressor_factory()`
that will keep the same signature but will be only usable
in a `seastar::thread`.

All other uses of `make_sstable_compressor_factory()` outside of
`test_env::do_with()` already are in seastar threads,
so if we just get rid of `test_env::do_with()`, then we will
be able to use that thread-dependent replacement. This is the
purpose of this commit.

We shouldn't be losing much.

(cherry picked from commit 0e4d0ded8d)
2025-05-12 09:12:04 +00:00
Patryk Jędrzejczak
2b1b4d1dfc Merge '[Backport 2025.2] Correctly skip updating node's own ip address due to oudated gossiper data ' from Scylladb[bot]
Used host id to check if the update is for the node itself. Using IP is unreliable since if a node is restarted with different IP a gossiper message with previous IP can be misinterpreted as belonging to a different node.

Fixes: #22777

Backport to 2025.1 since this fixes a crash. Older version do not have the code.

- (cherry picked from commit a2178b7c31)

- (cherry picked from commit ecd14753c0)

- (cherry picked from commit 7403de241c)

Parent PR: #24000

Closes scylladb/scylladb#24089

* https://github.com/scylladb/scylladb:
  test: add reproducer for #22777
  storage_service: Do not remove gossiper entry on address change
  storage_service: use id to check for local node
2025-05-12 09:31:20 +02:00
Michał Chojnowski
a5b513dde7 logalloc_test: don't test performance in test background_reclaim
The test is failing in CI sometimes due to performance reasons.

There are at least two problems:
1. The initial 500ms (wall time) sleep might be too short. If the reclaimer
   doesn't manage to evict enough memory during this time, the test will fail.
2. During the 100ms (thread CPU time) window given by the test to background
   reclaim, the `background_reclaim` scheduling group isn't actually
   guaranteed to get any CPU, regardless of shares. If the process is
   switched out inside the `background_reclaim` group, it might
   accumulate so much vruntime that it won't get any more CPU again
   for a long time.

We have seen both.

This kind of timing test can't be run reliably on overcommitted machines
without modifying the Seastar scheduler to support that (by e.g. using
thread clock instead of wall time clock in the scheduler), and that would
require an amount of effort disproportionate to the value of the test.

So for now, to unflake the test, this patch removes the performance test
part. (And the tradeoff is a weakening of the test).

(cherry picked from commit 1c1741cfbc)
2025-05-09 16:12:22 +00:00
Michał Chojnowski
2c431c1ea2 logalloc: make background_reclaimer::free_memory_threshold publicly visible
Wanted by the change to the background_reclaim test in the next patch.

(cherry picked from commit c47f438db3)
2025-05-09 16:12:22 +00:00
Gleb Natapov
827563902c test: add reproducer for #22777
Add sleep before starting gossiper to increase a chance of getting old
gossiper entry about yourself before updating local gossiper info with
new IP address.

(cherry picked from commit 7403de241c)
2025-05-09 12:56:15 +00:00
Gleb Natapov
ccf194bd89 storage_service: Do not remove gossiper entry on address change
When gossiper indexed entries by ip an old entry had to be removed on an
address change, but the index is id based, so even if ip was change the
entry should stay. Gossiper simply updates an ip address there.

(cherry picked from commit ecd14753c0)
2025-05-09 12:56:15 +00:00
Gleb Natapov
9b735bb4dc storage_service: use id to check for local node
IP may change and an old gossiper message with previous IP may be
processed when it shouldn't.

Fixes: #22777
(cherry picked from commit a2178b7c31)
2025-05-09 12:56:15 +00:00
Michał Chojnowski
f29b87970a test/boost/mvcc_test: fix an overly-strong assertion in test_snapshot_cursor_is_consistent_with_merging
The test checks that merging the partition versions on-the-fly using the
cursor gives the same results as merging them destructively with apply_monotonically.

In particular, it tests that the continuity of both results is equal.
However, there's a subtlety which makes this not true.
The cursor puts empty dummy rows (i.e. dummies shadowed by the partition
tombstone) in the output.
But the destructive merge is allowed (as an expection to the general
rule, for optimization reasons), to remove those dummies and thus reduce
the continuity.

So after this patch we instead check that the output of the cursor
has continuity equal to the merged continuities of version.
(Rather than to the continuity of merged versions, which can be
smaller as described above).

Refs https://github.com/scylladb/scylladb/pull/21459, a patch which did
the same in a different test.
Fixes https://github.com/scylladb/scylladb/issues/13642

Closes scylladb/scylladb#24044

(cherry picked from commit 746ec1d4e4)

Closes scylladb/scylladb#24083
2025-05-09 13:00:34 +02:00
Raphael S. Carvalho
82ca17e70d replica: Fix use-after-free with concurrent schema change and sstable set update
When schema is changed, sstable set is updated according to the compaction
strategy of the new schema (no changes to set are actually made, just
the underlying set type is updated), but the problem is that it happens
without a lock, causing a use-after-free when running concurrently to
another set update.

Example:

1) A: sstable set is being updated on compaction completion
2) B: schema change updates the set (it's non deferring, so it
happens in one go) and frees the set used by A.
3) when A resumes, system will likely crash since the set is freed
already.

ASAN screams about it:
SUMMARY: AddressSanitizer: heap-use-after-free sstables/sstable_set.cc ...

Fix is about deferring update of the set on schema change to compaction,
which is triggered after new schema is set. Only strategy state and
backlog tracker are updated immediately, which is fine since strategy
doesn't depend on any particular implementation of sstable set, since
patch "sstables: Implement sstable_set_impl::all_sstable_runs()".

Fixes #22040.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 434c2c4649)
2025-05-09 05:57:40 +00:00
Raphael S. Carvalho
ddf9d047db sstables: Implement sstable_set_impl::all_sstable_runs()
With upcoming change where table::set_compaction_strategy() might delay
update of sstable set, ICS might temporarily work with sstable set
implementations other than partitioned_sstable_set. ICS relies on
all_sstable_runs() during regular compaction, and today it triggers
bad_function_call exception if not overriden by set implementation.
To remove this strong dependency between compaction strategy and
a particular set implementation, let's provide a default implementation
of all_sstable_runs(), such that ICS will still work until the set
is updated eventually through a process that adds or remove a
sstable.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 628bec4dbd)
2025-05-09 05:57:40 +00:00
Botond Dénes
17a76b6264 Merge '[Backport 2025.2] test/cluster/test_read_repair.py: improve trace logging test (again)' from Scylladb[bot]
The test test_read_repair_with_trace_logging wants to test read repair with trace logging. Turns out that node restart + trace-level logging + debug mode is too much and even with 1 minute timeout, the read repair     times out sometimes. Refactor the test to use injection point instead of restart. To make sure the test still tests what it supposed to test, use tracing to assert that read repair did indeed happen.

Fixes: scylladb/scylladb#23968

Needs backport to 2025.1 and 6.2, both have the flaky test

- (cherry picked from commit 51025de755)

- (cherry picked from commit 29eedaa0e5)

Parent PR: #23989

Closes scylladb/scylladb#24051

* github.com:scylladb/scylladb:
  test/cluster/test_read_repair.py: improve trace logging test (again)
  test/cluster: extract execute_with_tracing() into pylib/util.py
2025-05-08 11:01:18 +03:00
Aleksandra Martyniuk
ab45df1aa1 streaming: skip dropped tables
Currently, stream_session::prepare throws when a table in requests
or summaries is dropped. However, we do not want to fail streaming
if the table is dropped.

Delete table checks from stream_session::prepare. Further streaming
steps can handle the dropped table and finish the streaming successfully.

Fixes: #15257.

Closes scylladb/scylladb#23915

(cherry picked from commit 20c2d6210e)

Closes scylladb/scylladb#24053
2025-05-08 11:00:27 +03:00
Botond Dénes
97f0f312e0 test/cluster/test_read_repair.py: improve trace logging test (again)
The test test_read_repair_with_trace_logging wants to test read repair
with trace logging. Turns out that node restart + trace-level logging
+ debug mode is too much and even with 1 minute timeout, the read repair
times out sometimes.
Refactor the test to use injection point instead of restart. To make
sure the test still tests what it supposed to test, use tracing to
assert that read repair did indeed happen.

(cherry picked from commit 29eedaa0e5)
2025-05-07 13:26:08 +00:00
Botond Dénes
4df6a17d30 test/cluster: extract execute_with_tracing() into pylib/util.py
To allow reuse in other tests.

(cherry picked from commit 51025de755)
2025-05-07 13:26:08 +00:00
Anna Mikhlin
b3dbfaf27a Update ScyllaDB version to: 2025.2.0-rc0 2025-05-07 11:41:33 +03:00
149 changed files with 2624 additions and 1499 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2025.2.0-dev
VERSION=2025.2.0-rc5
if test -f version
then

View File

@@ -1006,6 +1006,17 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
default_write_isolation = parse_write_isolation(value);
}
// Alternator uses tags whose keys start with the "system:" prefix for
// internal purposes. Those should not be readable by ListTagsOfResource,
// nor writable with TagResource or UntagResource (see #24098).
// Only a few specific system tags, currently only system:write_isolation,
// are deliberately intended to be set and read by the user, so are not
// considered "internal".
static bool tag_key_is_internal(std::string_view tag_key) {
return tag_key.starts_with("system:") &&
tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY;
}
enum class update_tags_action { add_tags, delete_tags };
static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>& tags_map, update_tags_action action) {
if (action == update_tags_action::add_tags) {
@@ -1030,6 +1041,9 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
if (!validate_legal_tag_chars(tag_key)) {
throw api_error::validation("A tag Key can only contain letters, spaces, and [+-=._:/]");
}
if (tag_key_is_internal(tag_key)) {
throw api_error::validation(fmt::format("Tag key '{}' is reserved for internal use", tag_key));
}
// Note tag values are limited similarly to tag keys, but have a
// longer length limit, and *can* be empty.
if (tag_value.size() > 256) {
@@ -1042,7 +1056,11 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
}
} else if (action == update_tags_action::delete_tags) {
for (auto it = tags.Begin(); it != tags.End(); ++it) {
tags_map.erase(sstring(it->GetString(), it->GetStringLength()));
auto tag_key = rjson::to_string_view(*it);
if (tag_key_is_internal(tag_key)) {
throw api_error::validation(fmt::format("Tag key '{}' is reserved for internal use", tag_key));
}
tags_map.erase(sstring(tag_key));
}
}
@@ -1117,6 +1135,9 @@ future<executor::request_return_type> executor::list_tags_of_resource(client_sta
rjson::value& tags = ret["Tags"];
for (auto& tag_entry : tags_map) {
if (tag_key_is_internal(tag_entry.first)) {
continue;
}
rjson::value new_entry = rjson::empty_object();
rjson::add(new_entry, "Key", rjson::from_string(tag_entry.first));
rjson::add(new_entry, "Value", rjson::from_string(tag_entry.second));

View File

@@ -2144,6 +2144,31 @@
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"skip_cleanup",
"description":"Don't cleanup keys from loaded sstables. Invalid if load_and_stream is true",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"skip_reshape",
"description":"Don't reshape the loaded sstables. Invalid if load_and_stream is true",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"scope",
"description":"Defines the set of nodes to which mutations can be streamed",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query",
"enum": ["all", "dc", "rack", "node"]
}
]
}

View File

@@ -453,17 +453,26 @@ void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>&
auto cf = req->get_query_param("cf");
auto stream = req->get_query_param("load_and_stream");
auto primary_replica = req->get_query_param("primary_replica_only");
auto skip_cleanup_p = req->get_query_param("skip_cleanup");
boost::algorithm::to_lower(stream);
boost::algorithm::to_lower(primary_replica);
bool load_and_stream = stream == "true" || stream == "1";
bool primary_replica_only = primary_replica == "true" || primary_replica == "1";
bool skip_cleanup = skip_cleanup_p == "true" || skip_cleanup_p == "1";
auto scope = parse_stream_scope(req->get_query_param("scope"));
auto skip_reshape_p = req->get_query_param("skip_reshape");
auto skip_reshape = skip_reshape_p == "true" || skip_reshape_p == "1";
if (scope != sstables_loader::stream_scope::all && !load_and_stream) {
throw httpd::bad_param_exception("scope takes no effect without load-and-stream");
}
// No need to add the keyspace, since all we want is to avoid always sending this to the same
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
auto coordinator = std::hash<sstring>()(cf) % smp::count;
return sst_loader.invoke_on(coordinator,
[ks = std::move(ks), cf = std::move(cf),
load_and_stream, primary_replica_only] (sstables_loader& loader) {
return loader.load_new_sstables(ks, cf, load_and_stream, primary_replica_only, sstables_loader::stream_scope::all);
load_and_stream, primary_replica_only, skip_cleanup, skip_reshape, scope] (sstables_loader& loader) {
return loader.load_new_sstables(ks, cf, load_and_stream, primary_replica_only, skip_cleanup, skip_reshape, scope);
}).then_wrapped([] (auto&& f) {
if (f.failed()) {
auto msg = fmt::format("Failed to load new sstables: {}", f.get_exception());

View File

@@ -15,6 +15,8 @@
#include <seastar/core/metrics.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/weak_ptr.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/reactor.hh>
#include "utils/reusable_buffer.hh"
#include "sstables/compress.hh"
#include "sstables/exceptions.hh"
@@ -27,7 +29,7 @@
// SHA256
using dict_id = std::array<std::byte, 32>;
class sstable_compressor_factory_impl;
class dictionary_holder;
static seastar::logger compressor_factory_logger("sstable_compressor_factory");
@@ -41,11 +43,11 @@ template <> struct fmt::formatter<compression_parameters::algorithm> : fmt::form
// raw dicts might be used (and kept alive) directly by compressors (in particular, lz4 decompressor)
// or referenced by algorithm-specific dicts.
class raw_dict : public enable_lw_shared_from_this<raw_dict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
dict_id _id;
std::vector<std::byte> _dict;
public:
raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict);
raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict);
~raw_dict();
const std::span<const std::byte> raw() const { return _dict; }
dict_id id() const { return _id; }
@@ -79,13 +81,13 @@ struct zstd_callback_allocator {
// (which internally holds a pointer to the raw dictionary blob
// and parsed entropy tables).
class zstd_ddict : public enable_lw_shared_from_this<zstd_ddict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
size_t _used_memory = 0;
zstd_callback_allocator _alloc;
std::unique_ptr<ZSTD_DDict, decltype(&ZSTD_freeDDict)> _dict;
public:
zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
~zstd_ddict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -100,14 +102,14 @@ public:
// so the level of compression is decided at the time of construction
// of this dict.
class zstd_cdict : public enable_lw_shared_from_this<zstd_cdict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
int _level;
size_t _used_memory = 0;
zstd_callback_allocator _alloc;
std::unique_ptr<ZSTD_CDict, decltype(&ZSTD_freeCDict)> _dict;
public:
zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level);
zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level);
~zstd_cdict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -119,11 +121,11 @@ public:
// and a hash index over the substrings of the blob).
//
class lz4_cdict : public enable_lw_shared_from_this<lz4_cdict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
std::unique_ptr<LZ4_stream_t, decltype(&LZ4_freeStream)> _dict;
public:
lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
~lz4_cdict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -164,6 +166,7 @@ public:
size_t compress_max_size(size_t input_len) const override;
std::map<sstring, sstring> options() const override;
algorithm get_algorithm() const override;
std::optional<unsigned> get_dict_owner_for_test() const override;
};
class snappy_processor: public compressor {
@@ -266,6 +269,7 @@ public:
size_t compress_max_size(size_t input_len) const override;
algorithm get_algorithm() const override;
std::map<sstring, sstring> options() const override;
std::optional<unsigned> get_dict_owner_for_test() const override;
};
zstd_processor::zstd_processor(const compression_parameters& opts, cdict_ptr cdict, ddict_ptr ddict) {
@@ -323,6 +327,16 @@ auto zstd_processor::get_algorithm() const -> algorithm {
return (_cdict || _ddict) ? algorithm::zstd_with_dicts : algorithm::zstd;
}
std::optional<unsigned> zstd_processor::get_dict_owner_for_test() const {
if (_cdict) {
return _cdict.get_owner_shard();
} else if (_ddict) {
return _ddict.get_owner_shard();
} else {
return std::nullopt;
}
}
const std::string_view DICTIONARY_OPTION = ".dictionary.";
static std::map<sstring, sstring> dict_as_options(std::span<const std::byte> d) {
@@ -384,6 +398,10 @@ std::map<sstring, sstring> compressor::options() const {
return {};
}
std::optional<unsigned> compressor::get_dict_owner_for_test() const {
return std::nullopt;
}
std::string compressor::name() const {
return compression_parameters::algorithm_to_qualified_name(get_algorithm());
}
@@ -434,7 +452,7 @@ std::string_view compression_parameters::algorithm_to_name(algorithm alg) {
case algorithm::snappy: return "SnappyCompressor";
case algorithm::zstd: return "ZstdCompressor";
case algorithm::zstd_with_dicts: return "ZstdWithDictsCompressor";
case algorithm::none: on_internal_error(compressor_factory_logger, "algorithm_to_name(): called with algorithm::none");
case algorithm::none: return "none"; // Name used only for logging purposes, can't be chosen by the user.
}
abort();
}
@@ -518,13 +536,17 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
}
}
void compression_parameters::validate(const gms::feature_service& fs) {
if (!fs.sstable_compression_dicts) {
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) {
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
if (!dicts_enabled) {
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
"all nodes are upgraded to a versions which supports it",
algorithm_to_name(_algorithm)));
}
if (!dicts_allowed) {
throw std::runtime_error(std::format("sstable_compression {} has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`",
algorithm_to_name(_algorithm)));
}
}
if (_chunk_length) {
auto chunk_length = _chunk_length.value();
@@ -660,6 +682,16 @@ std::map<sstring, sstring> lz4_processor::options() const {
}
}
std::optional<unsigned> lz4_processor::get_dict_owner_for_test() const {
if (_cdict) {
return _cdict.get_owner_shard();
} else if (_ddict) {
return _ddict.get_owner_shard();
} else {
return std::nullopt;
}
}
compressor_ptr make_lz4_sstable_compressor_for_tests() {
return std::make_unique<lz4_processor>();
}
@@ -751,21 +783,12 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
return snappy_max_compressed_length(input_len);
}
// Constructs compressors and decompressors for SSTables,
// making sure that the expensive identical parts (dictionaries) are shared
// across nodes.
//
// Holds weak pointers to all live dictionaries
// (so that they can be cheaply shared with new SSTables if an identical dict is requested),
// and shared (lifetime-extending) pointers to the current writer ("recommended")
// dict for each table (so that they can be shared with new SSTables without consulting
// `system.dicts`).
//
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
// (and are borrowed by foreign shared pointers) and all requests for a given dict ID go through its owner.
// (Note: this shouldn't pose a performance problem because a dict is only requested once per an opening of an SSTable).
// (Note: at the moment of this writing, one shard owns all. Later we can spread the ownership. (E.g. shard it by dict hash)).
//
// Whenever a dictionary dies (because its refcount reaches 0), its weak pointer
// is removed from the factory.
//
@@ -774,10 +797,10 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
// Has a configurable memory budget for live dicts. If the budget is exceeded,
// will return null dicts to new writers (to avoid making the memory usage even worse)
// and print warnings.
class sstable_compressor_factory_impl : public sstable_compressor_factory, public weakly_referencable<sstable_compressor_factory_impl> {
class dictionary_holder : public weakly_referencable<dictionary_holder> {
mutable logger::rate_limit budget_warning_rate_limit{std::chrono::minutes(10)};
shard_id _owner_shard;
config _cfg;
using config = default_sstable_compressor_factory::config;
const config& _cfg;
uint64_t _total_live_dict_memory = 0;
metrics::metric_groups _metrics;
struct zstd_cdict_id {
@@ -789,7 +812,7 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
std::map<zstd_cdict_id, const zstd_cdict*> _zstd_cdicts;
std::map<dict_id, const zstd_ddict*> _zstd_ddicts;
std::map<dict_id, const lz4_cdict*> _lz4_cdicts;
std::map<table_id, lw_shared_ptr<const raw_dict>> _recommended;
std::map<table_id, lw_shared_ptr<foreign_ptr<lw_shared_ptr<const raw_dict>>>> _recommended;
size_t memory_budget() const {
return _cfg.memory_fraction_starting_at_which_we_stop_writing_dicts() * seastar::memory::stats().total_memory();
@@ -806,8 +829,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
memory_budget()
);
}
public:
lw_shared_ptr<const raw_dict> get_canonical_ptr(std::span<const std::byte> dict) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (dict.empty()) {
return nullptr;
}
auto id = get_sha256(dict);
if (auto it = _raw_dicts.find(id); it != _raw_dicts.end()) {
return it->second->shared_from_this();
@@ -819,7 +845,9 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
using foreign_zstd_ddict = foreign_ptr<lw_shared_ptr<const zstd_ddict>>;
foreign_zstd_ddict get_zstd_dict_for_reading(lw_shared_ptr<const raw_dict> raw, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!raw) {
return nullptr;
}
lw_shared_ptr<const zstd_ddict> ddict;
// Fo reading, we must allocate a new dict, even if memory budget is exceeded. We have no other choice.
// In any case, if the budget is exceeded after we print a rate-limited warning about it.
@@ -835,15 +863,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(ddict));
}
future<foreign_zstd_ddict> get_zstd_dict_for_reading(std::span<const std::byte> dict, int level) {
return smp::submit_to(_owner_shard, [this, dict, level] -> foreign_zstd_ddict {
auto raw = get_canonical_ptr(dict);
return get_zstd_dict_for_reading(raw, level);
});
}
using foreign_zstd_cdict = foreign_ptr<lw_shared_ptr<const zstd_cdict>>;
foreign_zstd_cdict get_zstd_dict_for_writing(lw_shared_ptr<const raw_dict> raw, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!_cfg.enable_writing_dictionaries() || !raw) {
return nullptr;
}
lw_shared_ptr<const zstd_cdict> cdict;
// If we can share an already-allocated dict, we do that regardless of memory budget.
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
@@ -859,19 +883,6 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(cdict));
}
future<foreign_zstd_cdict> get_zstd_dict_for_writing(table_id t, int level) {
return smp::submit_to(_owner_shard, [this, t, level] -> foreign_zstd_cdict {
if (!_cfg.enable_writing_dictionaries()) {
return {};
}
auto rec_it = _recommended.find(t);
if (rec_it != _recommended.end()) {
return get_zstd_dict_for_writing(rec_it->second, level);
} else {
return {};
}
});
}
using lz4_dicts = std::pair<
foreign_ptr<lw_shared_ptr<const raw_dict>>,
foreign_ptr<lw_shared_ptr<const lz4_cdict>>
@@ -879,18 +890,12 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
using foreign_lz4_ddict = foreign_ptr<lw_shared_ptr<const raw_dict>>;
using foreign_lz4_cdict = foreign_ptr<lw_shared_ptr<const lz4_cdict>>;
foreign_lz4_ddict get_lz4_dict_for_reading(lw_shared_ptr<const raw_dict> raw) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
lw_shared_ptr<const raw_dict> ddict;
return make_foreign(std::move(raw));
}
future<foreign_lz4_ddict> get_lz4_dicts_for_reading(std::span<const std::byte> dict) {
return smp::submit_to(_owner_shard, [this, dict] -> foreign_lz4_ddict {
auto raw = get_canonical_ptr(dict);
return get_lz4_dict_for_reading(raw);
});
}
foreign_lz4_cdict get_lz4_dict_for_writing(lw_shared_ptr<const raw_dict> raw) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!_cfg.enable_writing_dictionaries() || !raw) {
return nullptr;
}
lw_shared_ptr<const lz4_cdict> cdict;
// If we can share an already-allocated dict, we do that regardless of memory budget.
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
@@ -905,24 +910,10 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(cdict));
}
future<foreign_lz4_cdict> get_lz4_dicts_for_writing(table_id t) {
return smp::submit_to(_owner_shard, [this, t] -> foreign_lz4_cdict {
if (!_cfg.enable_writing_dictionaries()) {
return {};
}
auto rec_it = _recommended.find(t);
if (rec_it != _recommended.end()) {
return get_lz4_dict_for_writing(rec_it->second);
} else {
return {};
}
});
}
public:
sstable_compressor_factory_impl(config cfg)
: _owner_shard(this_shard_id())
, _cfg(std::move(cfg))
dictionary_holder(const config& cfg)
: _cfg(cfg)
{
if (_cfg.register_metrics) {
namespace sm = seastar::metrics;
@@ -931,8 +922,8 @@ public:
});
}
}
sstable_compressor_factory_impl(sstable_compressor_factory_impl&&) = delete;
~sstable_compressor_factory_impl() {
dictionary_holder(dictionary_holder&&) = delete;
~dictionary_holder() {
// Note: `_recommended` might be the only thing keeping some dicts alive,
// so clearing it will destroy them.
//
@@ -948,39 +939,39 @@ public:
_recommended.clear();
}
void forget_raw_dict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_raw_dicts.erase(id);
}
void forget_zstd_cdict(dict_id id, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_zstd_cdicts.erase({id, level});
}
void forget_zstd_ddict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_zstd_ddicts.erase(id);
}
void forget_lz4_cdict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_lz4_cdicts.erase(id);
}
future<> set_recommended_dict(table_id t, std::span<const std::byte> dict) override {
return smp::submit_to(_owner_shard, [this, t, dict] {
_recommended.erase(t);
if (dict.size()) {
auto canonical_ptr = get_canonical_ptr(dict);
_recommended.emplace(t, canonical_ptr);
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
t, dict.size(), fmt_hex(canonical_ptr->id()));
} else {
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
}
});
void set_recommended_dict(table_id t, foreign_ptr<lw_shared_ptr<const raw_dict>> dict) {
_recommended.erase(t);
if (dict) {
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
t, dict->raw().size(), fmt_hex(dict->id()));
_recommended.emplace(t, make_lw_shared(std::move(dict)));
} else {
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
}
}
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t) {
auto rec_it = _recommended.find(t);
if (rec_it == _recommended.end()) {
co_return nullptr;
}
// Note that rec_it might be invalidated while we are doing the copy(),
// so we have to make a copy of the outer shared ptr first.
lw_shared_ptr<foreign_ptr<lw_shared_ptr<const raw_dict>>> ptr = rec_it->second;
co_return co_await ptr->copy();
}
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
void account_memory_delta(ssize_t n) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (static_cast<ssize_t>(_total_live_dict_memory) + n < 0) {
compressor_factory_logger.error(
"Error in dictionary memory accounting: delta {} brings live memory {} below 0",
@@ -990,19 +981,85 @@ public:
}
};
default_sstable_compressor_factory::default_sstable_compressor_factory(config cfg)
: _cfg(std::move(cfg))
, _holder(std::make_unique<dictionary_holder>(_cfg))
{
for (shard_id i = 0; i < smp::count; ++i) {
auto numa_id = _cfg.numa_config[i];
_numa_groups.resize(std::max<size_t>(_numa_groups.size(), numa_id + 1));
_numa_groups[numa_id].push_back(i);
}
}
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writing(schema_ptr s) {
const auto params = s->get_compressor_params();
default_sstable_compressor_factory::~default_sstable_compressor_factory() {
}
std::vector<unsigned> default_sstable_compressor_factory_config::get_default_shard_to_numa_node_mapping() {
auto sp = local_engine->smp().shard_to_numa_node_mapping();
return std::vector<unsigned>(sp.begin(), sp.end());
}
unsigned default_sstable_compressor_factory::local_numa_id() {
return _cfg.numa_config[this_shard_id()];
}
shard_id default_sstable_compressor_factory::get_dict_owner(unsigned numa_id, const sha256_type& sha) {
auto hash = read_unaligned<uint64_t>(sha.data());
const auto& group = _numa_groups[numa_id];
if (group.empty()) {
on_internal_error(compressor_factory_logger, "get_dict_owner called on an empty NUMA group");
}
return group[hash % group.size()];
}
future<> default_sstable_compressor_factory::set_recommended_dict_local(table_id t, std::span<const std::byte> dict) {
if (_leader_shard != this_shard_id()) {
on_internal_error(compressor_factory_logger, fmt::format("set_recommended_dict_local called on wrong shard. Expected: {}, got {}", _leader_shard, this_shard_id()));
}
auto units = co_await get_units(_recommendation_setting_sem, 1);
auto sha = get_sha256(dict);
for (unsigned numa_id = 0; numa_id < _numa_groups.size(); ++numa_id) {
const auto& group = _numa_groups[numa_id];
if (group.empty()) {
continue;
}
auto r = get_dict_owner(numa_id, sha);
auto d = co_await container().invoke_on(r, [dict](self& local) {
return make_foreign(local._holder->get_canonical_ptr(dict));
});
auto local_coordinator = group[0];
co_await container().invoke_on(local_coordinator, coroutine::lambda([t, d = std::move(d)](self& local) mutable {
local._holder->set_recommended_dict(t, std::move(d));
}));
}
}
future<> default_sstable_compressor_factory::set_recommended_dict(table_id t, std::span<const std::byte> dict) {
return container().invoke_on(_leader_shard, &self::set_recommended_dict_local, t, dict);
}
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> default_sstable_compressor_factory::get_recommended_dict(table_id t) {
const auto local_coordinator = _numa_groups[local_numa_id()][0];
return container().invoke_on(local_coordinator, [t](self& local) {
return local._holder->get_recommended_dict(t);
});
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_impl(const compression_parameters& params, table_id id) {
using algorithm = compression_parameters::algorithm;
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", s->id(), algo);
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", id, algo);
switch (algo) {
case algorithm::lz4:
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
case algorithm::lz4_with_dicts: {
auto cdict = _cfg.enable_writing_dictionaries()
? co_await get_lz4_dicts_for_writing(s->id())
: nullptr;
holder::foreign_lz4_cdict cdict;
if (auto recommended = co_await get_recommended_dict(id)) {
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [recommended = std::move(recommended)] (self& local) mutable {
return local._holder->get_lz4_dict_for_writing(recommended.release());
});
}
if (cdict) {
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
}
@@ -1015,9 +1072,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
case algorithm::zstd:
co_return std::make_unique<zstd_processor>(params, nullptr, nullptr);
case algorithm::zstd_with_dicts: {
auto cdict = _cfg.enable_writing_dictionaries()
? co_await get_zstd_dict_for_writing(s->id(), params.zstd_compression_level().value_or(ZSTD_defaultCLevel()))
: nullptr;
holder::foreign_zstd_cdict cdict;
if (auto recommended = co_await get_recommended_dict(id)) {
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [level, recommended = std::move(recommended)] (self& local) mutable {
return local._holder->get_zstd_dict_for_writing(recommended.release(), level);
});
}
if (cdict) {
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
}
@@ -1029,17 +1090,28 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
abort();
}
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_reading(sstables::compression& c) {
const auto params = compression_parameters(sstables::options_from_compression(c));
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing(schema_ptr s) {
return make_compressor_for_writing_impl(s->get_compressor_params(), s->id());
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_for_tests(const compression_parameters& params, table_id id) {
return make_compressor_for_writing_impl(params, id);
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_impl(const compression_parameters& params, std::span<const std::byte> dict) {
using algorithm = compression_parameters::algorithm;
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
switch (algo) {
case algorithm::lz4:
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
case algorithm::lz4_with_dicts: {
auto dict = dict_from_options(c);
auto ddict = co_await get_lz4_dicts_for_reading(std::as_bytes(std::span(*dict)));
auto dict_span = dict;
auto sha = get_sha256(dict_span);
auto dict_owner = get_dict_owner(local_numa_id(), sha);
auto ddict = co_await container().invoke_on(dict_owner, [dict_span] (self& local) mutable {
auto d = local._holder->get_canonical_ptr(dict_span);
return local._holder->get_lz4_dict_for_reading(std::move(d));
});
if (ddict) {
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
}
@@ -1054,8 +1126,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
}
case algorithm::zstd_with_dicts: {
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
auto dict = dict_from_options(c);
auto ddict = co_await get_zstd_dict_for_reading(std::as_bytes(std::span(*dict)), level);
auto dict_span = dict;
auto sha = get_sha256(dict_span);
auto dict_owner = get_dict_owner(local_numa_id(), sha);
auto ddict = co_await container().invoke_on(dict_owner, [level, dict_span] (self& local) mutable {
auto d = local._holder->get_canonical_ptr(dict_span);
return local._holder->get_zstd_dict_for_reading(std::move(d), level);
});
if (ddict) {
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
}
@@ -1067,7 +1144,19 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
abort();
}
raw_dict::raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict)
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading(sstables::compression& c) {
const auto params = compression_parameters(sstables::options_from_compression(c));
auto dict = dict_from_options(c);
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
co_return co_await make_compressor_for_reading_impl(params, std::as_bytes(std::span(*dict)));
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_for_tests(const compression_parameters& params, std::span<const std::byte> dict) {
return make_compressor_for_reading_impl(params, dict);
}
raw_dict::raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict)
: _owner(owner.weak_from_this())
, _id(key)
, _dict(dict.begin(), dict.end())
@@ -1082,7 +1171,7 @@ raw_dict::~raw_dict() {
}
}
zstd_cdict::zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level)
zstd_cdict::zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level)
: _owner(owner.weak_from_this())
, _raw(raw)
, _level(level)
@@ -1114,7 +1203,7 @@ zstd_cdict::~zstd_cdict() {
}
}
zstd_ddict::zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
zstd_ddict::zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
: _owner(owner.weak_from_this())
, _raw(raw)
, _alloc([this] (ssize_t n) {
@@ -1143,7 +1232,7 @@ zstd_ddict::~zstd_ddict() {
}
}
lz4_cdict::lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
lz4_cdict::lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
: _owner(owner.weak_from_this())
, _raw(raw)
, _dict(LZ4_createStream(), LZ4_freeStream)
@@ -1162,6 +1251,28 @@ lz4_cdict::~lz4_cdict() {
}
}
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg) {
return std::make_unique<sstable_compressor_factory_impl>(std::move(cfg));
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
SCYLLA_ASSERT(thread::running_in_thread());
struct wrapper : sstable_compressor_factory {
using impl = default_sstable_compressor_factory;
sharded<impl> _impl;
future<compressor_ptr> make_compressor_for_writing(schema_ptr s) override {
return _impl.local().make_compressor_for_writing(s);
}
future<compressor_ptr> make_compressor_for_reading(sstables::compression& c) override {
return _impl.local().make_compressor_for_reading(c);
}
future<> set_recommended_dict(table_id t, std::span<const std::byte> d) override {
return _impl.local().set_recommended_dict(t, d);
};
wrapper(wrapper&&) = delete;
wrapper() {
_impl.start().get();
}
~wrapper() {
_impl.stop().get();
}
};
return std::make_unique<wrapper>();
}

View File

@@ -13,12 +13,9 @@
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/bool_class.hh>
#include "seastarx.hh"
namespace gms {
class feature_service;
} // namespace gms
class compression_parameters;
class compressor {
@@ -64,6 +61,8 @@ public:
virtual algorithm get_algorithm() const = 0;
virtual std::optional<unsigned> get_dict_owner_for_test() const;
using ptr_type = std::unique_ptr<compressor>;
};
@@ -106,7 +105,10 @@ public:
algorithm get_algorithm() const { return _algorithm; }
std::optional<int> zstd_compression_level() const { return _zstd_compression_level; }
void validate(const gms::feature_service&);
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
void validate(dicts_feature_enabled, dicts_usage_allowed);
std::map<sstring, sstring> get_options() const;
bool compression_enabled() const {

View File

@@ -1538,6 +1538,7 @@ deps['test/boost/combined_tests'] += [
'test/boost/secondary_index_test.cc',
'test/boost/sessions_test.cc',
'test/boost/sstable_compaction_test.cc',
'test/boost/sstable_compressor_factory_test.cc',
'test/boost/sstable_directory_test.cc',
'test/boost/sstable_set_test.cc',
'test/boost/statement_restrictions_test.cc',

View File

@@ -23,6 +23,7 @@
#include "db/per_partition_rate_limit_options.hh"
#include "db/tablet_options.hh"
#include "utils/bloom_calculations.hh"
#include "db/config.hh"
#include <boost/algorithm/string/predicate.hpp>
@@ -135,7 +136,9 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception(sstring("Missing sub-option '") + compression_parameters::SSTABLE_COMPRESSION + "' for the '" + KW_COMPRESSION + "' option.");
}
compression_parameters cp(*compression_options);
cp.validate(db.features());
cp.validate(
compression_parameters::dicts_feature_enabled(bool(db.features().sstable_compression_dicts)),
compression_parameters::dicts_usage_allowed(db.get_config().sstable_compression_dictionaries_allow_in_ddl()));
}
auto per_partition_rate_limit_options = get_per_partition_rate_limit_options(schema_extensions);

View File

@@ -113,10 +113,9 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
if (rs->uses_tablets()) {
warnings.push_back(
"Tables in this keyspace will be replicated using Tablets "
"and will not support CDC, LWT and counters features. "
"To use CDC, LWT or counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} "
"to the CREATE KEYSPACE statement.");
"and will not support Materialized Views, Secondary Indexes, CDC, LWT and counters features. "
"To use Materialized Views, Secondary Indexes, CDC, LWT or counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
if (ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
}

View File

@@ -1230,7 +1230,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default)"
"bytes written to data file. Value must be between 0 and 1.")
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .2, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, (size_t(128) << 10) + 1, "Warn about memory allocations above this size; set to zero to disable.")
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable.")
, enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.")
, enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting.")
, enable_node_aggregated_table_metrics(this, "enable_node_aggregated_table_metrics", value_status::Used, true, "Enable aggregated per node, per keyspace and per table metrics reporting, applicable if enable_keyspace_column_family_metrics is false.")
@@ -1243,6 +1243,13 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, sstable_format(this, "sstable_format", value_status::Used, "me", "Default sstable file format", {"md", "me"})
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"
"This is only enforced when this node validates a new DDL statement; disabling the option won't disable dictionary-based compression "
"on tables which already have it configured, and won't do anything to existing sstables.\n"
"To affect existing tables, you can ALTER them to a non-dictionary compressor, or disable dictionary compression "
"for the whole node through `sstable_compression_dictionaries_enable_writing`.")
, sstable_compression_dictionaries_enable_writing(this, "sstable_compression_dictionaries_enable_writing", liveness::LiveUpdate, value_status::Used, true,
"Enables SSTable compression with shared dictionaries (for tables which opt in). If set to false, this node won't write any new SSTables using dictionary compression.\n"
"Option meant not for regular usage, but for unforeseen problems that call for disabling dictionaries without modifying table schema.")

View File

@@ -436,6 +436,7 @@ public:
named_value<bool> enable_sstables_mc_format;
named_value<bool> enable_sstables_md_format;
named_value<sstring> sstable_format;
named_value<bool> sstable_compression_dictionaries_allow_in_ddl;
named_value<bool> sstable_compression_dictionaries_enable_writing;
named_value<float> sstable_compression_dictionaries_memory_budget_fraction;
named_value<float> sstable_compression_dictionaries_retrain_period_in_seconds;

View File

@@ -22,6 +22,8 @@ def readable_desc_rst(description):
cleaned_line = line.replace('\\n', '\n')
cleaned_line = cleaned_line.replace('\\t', '\n' + indent * 2)
if line.endswith('"'):
cleaned_line = cleaned_line[:-1] + ' '

View File

@@ -1,15 +1,24 @@
{
"Linux Distributions": {
"Ubuntu": ["20.04", "22.04", "24.04"],
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
"Debian": ["11"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": ["2023"]
},
"ScyllaDB Versions": [
{
"version": "Enterprise 2025.1",
"version": "ScyllaDB 2025.2",
"supported_OS": {
"Ubuntu": ["20.04", "22.04", "24.04"],
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
"Debian": ["11"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": ["2023"]
}
},
{
"version": "ScyllaDB 2025.1",
"supported_OS": {
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
"Debian": ["11"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": ["2023"]
@@ -18,7 +27,7 @@
{
"version": "Enterprise 2024.2",
"supported_OS": {
"Ubuntu": ["20.04", "22.04", "24.04"],
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04"],
"Debian": ["11"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": ["2023"]
@@ -27,20 +36,11 @@
{
"version": "Enterprise 2024.1",
"supported_OS": {
"Ubuntu": ["20.04", "22.04", "24.04*"],
"Ubuntu": ["20.04 (deprecated)", "22.04", "24.04*"],
"Debian": ["11"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": []
}
},
{
"version": "Open Source 6.2",
"supported_OS": {
"Ubuntu": ["20.04", "22.04", "24.04"],
"Debian": ["11"],
"Rocky / CentOS / RHEL": ["8", "9"],
"Amazon Linux": ["2023"]
}
}
]
}

View File

@@ -2,6 +2,11 @@
#old path: new path
# Remove reduntant pages
/stable/getting-started/tutorials: https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html
/stable/contribute: https://github.com/scylladb/scylladb/blob/master/CONTRIBUTING.md
# Remove an oudated article
/stable/troubleshooting/nodetool-memory-read-timeout.html: /stable/troubleshooting/index.html

View File

@@ -1,31 +0,0 @@
Contribute to ScyllaDB
=======================
Thank you for your interest in making ScyllaDB better!
We appreciate your help and look forward to welcoming you to the ScyllaDB Community.
There are two ways you can contribute:
* Send a patch to the ScyllaDB source code
* Write documentation for ScyllaDB Docs
Contribute to ScyllaDB's Source Code
------------------------------------
ScyllaDB developers use patches and email to share and discuss changes.
Setting up can take a little time, but once you have done it the first time, its easy.
The basic steps are:
* Join the ScyllaDB community
* Create a Git branch to work on
* Commit your work with clear commit messages and sign-offs.
* Send a PR or use ``git format-patch`` and ``git send-email`` to send to the list
The entire process is `documented here <https://github.com/scylladb/scylla/blob/master/CONTRIBUTING.md>`_.
Contribute to ScyllaDB Docs
---------------------------
Each ScyllaDB project has accompanying documentation. For information about contributing documentation to a specific ScyllaDB project, refer to the README file for the individual project.
For general information or to contribute to the ScyllaDB Sphinx theme, read the `Contributor's Guide <https://sphinx-theme.scylladb.com/stable/contribute/>`_.

View File

@@ -67,9 +67,6 @@ You can enable CDC when creating or altering a table using the ``cdc`` option, f
CREATE TABLE ks.t (pk int, ck int, v int, PRIMARY KEY (pk, ck, v)) WITH cdc = {'enabled':true};
.. note::
If you enabled CDC and later decide to disable it, you need to **stop all writes** to the base table before issuing the ``ALTER TABLE ... WITH cdc = {'enabled':false};`` command.
.. include:: /features/cdc/_common/cdc-params.rst
Using CDC with Applications

View File

@@ -113,7 +113,38 @@ Pick a zone where Haswell CPUs are found. Local SSD performance offers, accordin
Image with NVMe disk interface is recommended.
(`More info <https://cloud.google.com/compute/docs/disks/local-ssd>`_)
Recommended instances types are `n1-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n1_machines>`_ and `n2-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n2_machines>`_
Recommended instances types are `z3-highmem-highlssd <https://cloud.google.com/compute/docs/storage-optimized-machines#z3_machine_types>`_,
`n1-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n1_machines>`_, and `n2-highmem <https://cloud.google.com/compute/docs/general-purpose-machines#n2_machines>`_
.. list-table::
:widths: 30 20 20 30
:header-rows: 1
* - Model
- vCPU
- Mem (GB)
- Storage (GB)
* - z3-highmem-8-highlssd
- 8
- 64
- 3,000
* - z3-highmem-16-highlssd
- 16
- 128
- 6,000
* - z3-highmem-22-highlssd
- 22
- 176
- 9,000
* - z3-highmem-32-highlssd
- 32
- 256
- 12,000
* - z3-highmem-44-highlssd
- 44
- 352
- 18,000
.. list-table::
:widths: 30 20 20 30

View File

@@ -11,7 +11,6 @@ Getting Started
requirements
Migrate to ScyllaDB </using-scylla/migrate-scylla>
Integration Solutions </using-scylla/integrations/index>
tutorials
.. panel-box::
:title: ScyllaDB Requirements

View File

@@ -4,6 +4,9 @@ OS Support by Linux Distributions and Version
The following matrix shows which Linux distributions, containers, and images
are :ref:`supported <os-support-definition>` with which versions of ScyllaDB.
Note that support for Ubuntu 20.04 is deprecated and will be removed in
a future release.
.. datatemplate:json:: /_static/data/os-support.json
:template: platforms.tmpl

View File

@@ -1,21 +0,0 @@
============
Tutorials
============
The tutorials will show you how to use ScyllaDB as a data source for an application.
ScyllaDB Tutorial
===================
`Build an IoT App with sensor simulator and a REST API <https://iot.scylladb.com/stable/>`_
ScyllaDB Cloud Tutorial
=======================
`Implement CRUD operations with a TODO App <https://github.com/scylladb/scylla-cloud-getting-started/>`_
ScyllaDB Cloud Feature Store Tutorial
=====================================
`Build a machine learning (ML) feature store with ScyllaDB <https://feature-store.scylladb.com/stable/>`_

View File

@@ -73,6 +73,5 @@ In addition, you can read our `blog <https://www.scylladb.com/blog/>`_ and atten
kb/index
reference/index
faq
Contribute to ScyllaDB <contribute>
2024.2 and earlier documentation <https://enterprise.docs.scylladb.com/branch-2024.2/>

View File

@@ -2,40 +2,65 @@
How to Safely Increase the Replication Factor
=======================================================
A replication factor (RF) is configured per keyspace. You can change the RF
using the :ref:`ALTER KEYSPACE <alter-keyspace-statement>` command.
**Topic: What can happen when you increase RF**
To increase the RF safely, ensure you follow the guidelines below.
The guidelines differ depending on whether your a keyspace is tablets-based
(the default) or has tablets disabled. See :doc:`Data Distribution with Tablets </architecture/tablets>`
for more information about tablets.
Increasing the RF in Tablets-based Keyspaces
-------------------------------------------------
**Audience: ScyllaDB administrators**
If a keyspace has tablets enabled (the default), changing the RF does not
impact data consistency in the cluster.
However, due to limitations in the current protocol used to pass tablet data
to drivers, drivers will not pick up new replicas after the RF is increased.
As a result, drivers will not route requests to new replicas, causing imbalance.
Issues
------
To avoid this issue, restart the client applications after the ALTER statement
that changes the RF completes successfully.
When a Replication Factor (RF) is increased, using the :ref:`ALTER KEYSPACE <alter-keyspace-statement>` command, the data consistency is effectively dropped
by the difference of the RF_new value and the RF_old value for all pre-existing data.
Increasing the RF in Keyspaces with Tablets Disabled
----------------------------------------------------------
If you :ref:`opted out of tablets when creating a keyspace <tablets-enable-tablets>`,
so your keyspace is vnodes-based, increasing the RF will impact data consistency.
Data consistency in your cluster is effectively dropped by the difference
between the RF_new value and the RF_old value for all pre-existing data.
Consistency will only be restored after running a repair.
Another issue occurs in keyspaces with tablets enabled and is driver-related. Due to limitations in the current protocol used to pass tablet data to drivers, drivers will not pick
up new replicas after replication factor is increased. This will cause them to avoid routing requests to those replicas, causing imbalance.
Resolution
----------
========================
When one increases an RF, one should consider that the pre-existing data will **not be streamed** to new replicas (a common misconception).
When you increase the RF, you should be aware that the pre-existing data will
**not be streamed** to new replicas (a common misconception).
As a result, in order to make sure that you can keep on reading the old data with the same level of consistency, increase the read Consistency Level (CL) according to the following formula:
As a result, in order to make sure that you can keep on reading the old data
with the same level of consistency:
``CL_new = CL_old + RF_new - RF_old``
#. Increase the read Consistency Level (CL) according to the following formula:
After you run a repair, you can decrease the CL. If RF has only been changed in a particular Data Center (DC) only the nodes in that DC have to be repaired.
.. code::
CL_new = CL_old + RF_new - RF_old
#. Run repair.
#. Decrease the CL.
If RF has only been changed in a particular Datacenter (DC), only the nodes in
that DC have to be repaired.
To resolve the driver-related issue, restart the client applications after the ALTER statement that changes the RF completes successfully.
Example
=======
In this example your five node cluster RF is 3 and your CL is TWO. You want to increase your RF from 3 to 5.
In this example, your five-node cluster RF is 3 and your CL is TWO. You want to increase your RF from 3 to 5.
#. Increase the read CL by a RF_new - RF_old value.
Following the example the RF_new is 5 and the RF_old is 3 so, 5-3 =2. You need to increase the CL by 2.
@@ -45,9 +70,9 @@ In this example your five node cluster RF is 3 and your CL is TWO. You want to i
#. Restore the reads CL to the originally intended value. For this example, QUORUM.
If you do not follow the procedure above you may start reading stale or null data after increasing the RF.
If you do not follow the procedure above, you may start reading stale or null data after increasing the RF.
More Information
References
----------------
* :doc:`Fault Tolerance </architecture/architecture-fault-tolerance/>`

View File

@@ -5,4 +5,3 @@ The cassandra-stress tool is used for benchmarking and load testing both ScyllaD
Cassandra Stress is not part of ScyllaDB and it is not distributed along side it anymore. It has it's own separate repository and release cycle. More information about it can be found on `GitHub <https://github.com/scylladb/cassandra-stress>`_ or on `DockerHub <https://hub.docker.com/r/scylladb/cassandra-stress>`_.
.. include:: /rst_include/apache-copyrights.rst

View File

@@ -1,3 +1,5 @@
.. _nodetool-cleanup-cmd:
Nodetool cleanup
================
**cleanup** ``[<keyspace> <tablename ...>]``- triggers the immediate removal of data from node(s) that "lose" part of their token range due to a range movement operation (node addition or node replacement).

View File

@@ -29,7 +29,7 @@ Load and Stream
.. code::
nodetool refresh <my_keyspace> <my_table> [--load-and-stream | -las]
nodetool refresh <my_keyspace> <my_table> [--load-and-stream | -las] [--scope <scope>]
The Load and Stream feature extends nodetool refresh. The new ``-las`` option loads arbitrary sstables that do not belong to a node into the cluster. It loads the sstables from the disk and calculates the data's owning nodes, and streams automatically.
For example, say the old cluster has 6 nodes and the new cluster has 3 nodes. We can copy the sstables from the old cluster to any of the new nodes and trigger the load and stream process.
@@ -39,5 +39,42 @@ Load and Stream make restores and migrations much easier:
* You can place sstable from every node to every node
* No need to run nodetool cleanup to remove unused data
Scope
-----
The `scope` parameter describes the subset of cluster nodes where you want to load data:
* `node` - On the local node.
* `rack` - On the local rack.
* `dc` - In the datacenter (DC) where the local node lives.
* `all` (default) - Everywhere across the cluster.
Scope supports a variety of options for filtering out the destination nodes.
On one extreme, one node is given all SStables with the scope ``all``; on the other extreme, all
nodes are loading only their own SStables with the scope ``node``. In between, you can choose
a subset of nodes to load only SStables that belong to the rack or DC.
This option is only valid when using the ``--load-and-stream`` option.
Skip cleanup
---------------
.. code::
nodetool refresh <my_keyspace> <my_table> [--skip-cleanup]
When loading an SSTable, Scylla will cleanup it from keys that the node is not responsible for. To skip this step, use the `--skip-cleanup` option.
See :ref:`nodetool cleanup <nodetool-cleanup-cmd>`.
Skip reshape
---------------
.. code::
nodetool refresh <my_keyspace> <my_table> [--skip-reshape]
When refreshing, the SSTables to load might be out of shape, Scylla will attempt to reshape them if that's the case. To skip this step, use the `--skip-reshape` option.
.. include:: nodetool-index.rst

View File

@@ -4,8 +4,7 @@ Upgrade ScyllaDB
.. toctree::
ScyllaDB Open Source 6.2 to ScyllaDB 2025.1 <upgrade-guide-from-6.2-to-2025.1/index>
ScyllaDB Enterprise 2024.x to ScyllaDB 2025.1 <upgrade-guide-from-2024.x-to-2025.1/index>
ScyllaDB 2025.1 to ScyllaDB 2025.2 <upgrade-guide-from-2025.1-to-2025.2/index>
ScyllaDB Image <ami-upgrade>

View File

@@ -1,129 +0,0 @@
=====================================
Enable Consistent Topology Updates
=====================================
.. note::
The following procedure only applies if:
* You're upgrading **from ScyllaDB Enterprise 2024.1** to ScyllaDB 2025.1.
* You previously upgraded from 2024.1 to 2024.2 without enabling consistent
topology updates (see the `2024.2 upgrade guide <https://enterprise.docs.scylladb.com/branch-2024.2/upgrade/upgrade-enterprise/upgrade-guide-from-2024.1-to-2024.2/enable-consistent-topology.html>`_
for reference).
Introduction
============
ScyllaDB 2025.1 has :ref:`consistent topology changes based on Raft <raft-topology-changes>`.
Clusters created with version 2025.1 use consistent topology changes right
from the start. However, consistent topology changes are *not* automatically
enabled in clusters upgraded from version 2024.1. In such clusters, you need to
enable consistent topology changes manually by following the procedure described in this article.
Before you start, you **must** check that the cluster meets the prerequisites
and ensure that some administrative procedures will not be run while
the procedure is in progress.
.. _enable-raft-topology-2025.1-prerequisites:
Prerequisites
=============
* Make sure that all nodes in the cluster are upgraded to ScyllaDB 2025.1.
* Verify that :ref:`schema on raft is enabled <schema-on-raft-enabled>`.
* Make sure that all nodes enabled ``SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`` cluster feature.
One way to verify it is to look for the following message in the log:
.. code-block:: none
features - Feature SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES is enabled
Alternatively, it can be verified programmatically by checking whether the ``value``
column under the ``enabled_features`` key contains the name of the feature in
the ``system.scylla_local`` table. One way to do it is with the following bash script:
.. code-block:: bash
until cqlsh -e "select value from system.scylla_local where key = 'enabled_features'" | grep "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"
do
echo "Upgrade didn't finish yet on the local node, waiting 10 seconds before checking again..."
sleep 10
done
echo "Upgrade completed on the local node"
* Make sure that all nodes are alive for the duration of the procedure.
.. _enable-raft-topology-2025.1-forbidden-operations:
Administrative operations that must not be running during the procedure
=========================================================================
Make sure that administrative operations will not be running while
the procedure is in progress. In particular, you must abstain from:
* :doc:`Cluster management procedures </operating-scylla/procedures/cluster-management/index>`
(adding, replacing, removing, decommissioning nodes, etc.).
* Running :doc:`nodetool repair </operating-scylla/nodetool-commands/repair>`.
* Running :doc:`nodetool checkAndRepairCdcStreams </operating-scylla/nodetool-commands/checkandrepaircdcstreams>`.
* Any modifications of :doc:`authentication </operating-scylla/security/authentication>` and :doc:`authorization </operating-scylla/security/enable-authorization>` settings.
* Any change of authorization via :doc:`CQL API </operating-scylla/security/authorization>`.
* Schema changes.
Running the procedure
=====================
.. warning::
Before proceeding, make sure that all the :ref:`prerequisites <enable-raft-topology-2025.1-prerequisites>` are met
and no :ref:`forbidden administrative operations <enable-raft-topology-2025.1-forbidden-operations>` will run
during the procedure. Failing to do so may put the cluster in an inconsistent state.
#. Issue a POST HTTP request to the ``/storage_service/raft_topology/upgrade``
endpoint to any of the nodes in the cluster.
For example, you can do it with ``curl``:
.. code-block:: bash
curl -X POST "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
#. Wait until all nodes report that the procedure is complete. You can check
whether a node finished the procedure in one of two ways:
* By sending a HTTP ``GET`` request on the ``/storage_service/raft_topology/upgrade``
endpoint. For example, you can do it with ``curl``:
.. code-block:: bash
curl -X GET "http://127.0.0.1:10000/storage_service/raft_topology/upgrade"
It will return a JSON string that will be equal to ``done`` after the procedure is complete on that node.
* By querying the ``upgrade_state`` column in the ``system.topology`` table.
You can use ``cqlsh`` to get the value of the column:
.. code-block:: bash
cqlsh -e "select upgrade_state from system.topology"
The ``upgrade_state`` column should be set to ``done`` after the procedure
is complete on that node:
After the procedure is complete on all nodes, wait at least one minute before
issuing any topology changes in order to avoid data loss from writes that were
started before the procedure.
What if the procedure gets stuck?
===================================
If the procedure gets stuck at some point, first check the status of your cluster:
- If there are some nodes that are not alive, try to restart them.
- If all nodes are alive, ensure that the network is healthy and every node can reach all other nodes.
- If all nodes are alive and the network is healthy, perform
a :doc:`rolling restart </operating-scylla/procedures/config-change/rolling-restart/>` of the cluster.
If none of the above solves the issue, perform :ref:`the Raft recovery procedure <recovery-procedure>`.
During recovery, the cluster will switch back to the gossip-based topology management mechanism.
After exiting recovery, you should retry enabling consistent topology updates using
the procedure described in this document.

View File

@@ -1,17 +0,0 @@
==========================================================
Upgrade - ScyllaDB Enterprise 2024.x to ScyllaDB 2025.1
==========================================================
.. toctree::
:maxdepth: 2
:hidden:
ScyllaDB <upgrade-guide-from-2024.x-to-2025.1>
Enable Consistent Topology Updates <enable-consistent-topology>
Metrics <metric-update-2024.x-to-2025.1>
* :doc:`Upgrade from ScyllaDB Enterprise 2024.x.y to ScyllaDB 2025.1.y <upgrade-guide-from-2024.x-to-2025.1>`
* :doc:`Enable Consistent Topology Updates <enable-consistent-topology>`
* :doc:`Metrics Update Between 2024.x and 2025.1 <metric-update-2024.x-to-2025.1>`

View File

@@ -1,74 +0,0 @@
.. |SRC_VERSION| replace:: 2024.x
.. |NEW_VERSION| replace:: 2025.1
=======================================================================================
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
=======================================================================================
ScyllaDB Enterprise |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
New Metrics
------------
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
.. list-table::
:widths: 25 150
:header-rows: 1
* - Metric
- Description
* - scylla_alternator_batch_item_count
- The total number of items processed across all batches.
* - scylla_hints_for_views_manager_sent_bytes_total
- The total size of the sent hints (in bytes).
* - scylla_hints_manager_sent_bytes_total
- The total size of the sent hints (in bytes).
* - scylla_io_queue_activations
- The number of times the class was woken up from idle.
* - scylla_raft_apply_index
- The applied index.
* - scylla_raft_commit_index
- The commit index.
* - scylla_raft_log_last_index
- The index of the last log entry.
* - scylla_raft_log_last_term
- The term of the last log entry.
* - scylla_raft_snapshot_last_index
- The index of the snapshot.
* - scylla_raft_snapshot_last_term
- The term of the snapshot.
* - scylla_raft_state
- The current state: 0 - follower, 1 - candidate, 2 - leader
* - scylla_rpc_client_delay_samples
- The total number of delay samples.
* - scylla_rpc_client_delay_total
- The total delay in seconds.
* - scylla_storage_proxy_replica_received_hints_bytes_total
- The total size of hints and MV hints received by this node.
* - scylla_storage_proxy_replica_received_hints_total
- The number of hints and MV hints received by this node.
Renamed Metrics
------------------
The following metrics are renamed in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
.. list-table::
:widths: 25 150
:header-rows: 1
* - 2024.2
- 2025.1
* - scylla_hints_for_views_manager_sent
- scylla_hints_for_views_manager_sent_total
* - scylla_hints_manager_sent
- scylla_hints_manager_sent_total
* - scylla_forward_service_requests_dispatched_to_other_nodes
- scylla_mapreduce_service_requests_dispatched_to_other_nodes
* - scylla_forward_service_requests_dispatched_to_own_shards
- scylla_mapreduce_service_requests_dispatched_to_own_shards
* - scylla_forward_service_requests_executed
- scylla_mapreduce_service_requests_executed

View File

@@ -1,395 +0,0 @@
.. |SCYLLA_NAME| replace:: ScyllaDB
.. |SRC_VERSION| replace:: 2024.x
.. |NEW_VERSION| replace:: 2025.1
.. |ROLLBACK| replace:: rollback
.. _ROLLBACK: ./#rollback-procedure
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2024.x to 2025.1
.. _SCYLLA_METRICS: ../metric-update-2024.x-to-2025.1
=======================================================================================
Upgrade from |SCYLLA_NAME| Enterprise |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
=======================================================================================
This document is a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
to |NEW_VERSION|, and rollback to version |SRC_VERSION| if required.
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL) CentOS, Debian,
and Ubuntu. See :doc:`OS Support by Platform and Version </getting-started/os-support>`
for information about supported versions.
This guide also applies when you're upgrading ScyllaDB official image on EC2,
GCP, or Azure.
Before You Upgrade ScyllaDB
================================
**Upgrade Your Driver**
If you're using a :doc:`ScyllaDB driver </using-scylla/drivers/cql-drivers/index>`,
upgrade the driver before you upgrade ScyllaDB. The latest two versions of each driver
are supported.
**Upgrade ScyllaDB Monitoring Stack**
If you're using the ScyllaDB Monitoring Stack, verify that your Monitoring Stack
version supports the ScyllaDB version to which you want to upgrade. See
`ScyllaDB Monitoring Stack Support Matrix <https://monitoring.docs.scylladb.com/stable/reference/matrix.html>`_.
We recommend upgrading the Monitoring Stack to the latest version.
**Check Feature Updates**
See the ScyllaDB Release Notes for the latest updates. The Release Notes are published
at the `ScyllaDB Community Forum <https://forum.scylladb.com/>`_.
Upgrade Procedure
=================
A ScyllaDB upgrade is a rolling procedure that does **not** require full cluster shutdown.
For each of the nodes in the cluster, you will:
* Check that the cluster's schema is synchronized
* Drain the node and backup the data
* Backup the configuration file
* Stop ScyllaDB
* Download and install new ScyllaDB packages
* Start ScyllaDB
* Validate that the upgrade was successful
.. caution::
Apply the procedure **serially** on each node. Do not move to the next node before
validating that the node you upgraded is up and running the new version.
**During** the rolling upgrade, it is highly recommended:
* Not to use the new |NEW_VERSION| features.
* Not to run administration functions, like repairs, refresh, rebuild, or add or remove
nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
ScyllaDB Manager's scheduled or running repairs.
* Not to apply schema changes.
**After** the upgrade, you may need to enable consistent topology updates.
See :ref:`After Upgrading Every Node <upgrade-2024.x-2025.1-after-upgrading-nodes>` for details.
Upgrade Steps
=============
Check the cluster schema
-------------------------
Make sure that all nodes have the schema synchronized before upgrade. The upgrade
procedure will fail if there is a schema disagreement between nodes.
.. code:: sh
nodetool describecluster
Backup the data
-----------------------------------
Before any major procedure, like an upgrade, it is recommended to backup all the data
to an external device.
We recommend using `ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
to create backups.
Alternatively, you can use the ``nodetool snapshot`` command. For **each** node in the cluster, run
the following command:
.. code:: sh
nodetool drain
nodetool snapshot
Take note of the directory name that nodetool gives you, and copy all the directories
having that name under ``/var/lib/scylla`` to a backup device.
When the upgrade is completed on all nodes, remove the snapshot with the
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of space.
Backup the configuration file
------------------------------
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
in case you need to rollback the upgrade.
.. tabs::
.. group-tab:: Debian/Ubuntu
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
.. group-tab:: RHEL/CentOS
.. code:: sh
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
Gracefully stop the node
------------------------
.. code:: sh
sudo service scylla-server stop
Download and install the new release
------------------------------------
Before upgrading, check what version you are running now using ``scylla --version``.
You should use the same version as this version in case you want to |ROLLBACK|_
the upgrade.
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Update the ScyllaDB deb repo to |NEW_VERSION|.
.. code-block:: console
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.1.list
#. Install the new ScyllaDB version:
.. code-block:: console
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
#. Update the ScyllaDB rpm repo to |NEW_VERSION|.
.. code-block:: console
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.1.repo
#. Install the new ScyllaDB version:
.. code:: sh
sudo yum clean all
sudo yum update scylla\* -y
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If youre using the ScyllaDB official image (recommended), see
the **Debian/Ubuntu** tab for upgrade instructions. If youre using your
own image and have installed ScyllaDB packages for Ubuntu or Debian,
you need to apply an extended upgrade procedure:
#. Update the ScyllaDB deb repo (see the **Debian/Ubuntu** tab).
#. Install the new ScyllaDB version with the additional
``scylla-machine-image`` package:
.. code::
sudo apt-get clean all
sudo apt-get update
sudo apt-get dist-upgrade scylla
sudo apt-get dist-upgrade scylla-machine-image
#. Run ``scylla_setup`` without running ``io_setup``.
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
If you need JMX server, see
:doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
and get new version.
Start the node
--------------
.. code:: sh
sudo service scylla-server start
Validate
--------
#. Check cluster status with ``nodetool status`` and make sure **all** nodes, including
the one you just upgraded, are in ``UN`` status.
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
to check the ScyllaDB version. Validate that the version matches the one you upgraded to.
#. Check scylla-server log (using ``journalctl _COMM=scylla``) and ``/var/log/syslog``
to validate there are no new errors in the log.
#. Check again after two minutes to validate that no new issues are introduced.
Once you are sure the node upgrade was successful, move to the next node in the cluster.
.. _upgrade-2024.x-2025.1-after-upgrading-nodes:
After Upgrading Every Node
===============================
This step applies if:
* You're upgrading from ScyllaDB Enterprise **2024.1** to ScyllaDB 2025.1.
* You previously upgraded from 2024.1 to 2024.2 without enabling consistent
topology updates (see the `2024.2 upgrade guide <https://enterprise.docs.scylladb.com/branch-2024.2/upgrade/upgrade-enterprise/upgrade-guide-from-2024.1-to-2024.2/enable-consistent-topology.html>`_
for reference).
After you have upgraded every node, you must enable the Raft-based consistent
topology updates feature. See
:doc:`Enable Consistent Topology Updates </upgrade/upgrade-guides/upgrade-guide-from-2024.x-to-2025.1/enable-consistent-topology>`
for instructions.
Rollback Procedure
==================
.. warning::
The rollback procedure can only be applied if some nodes have **not** been upgraded
to |NEW_VERSION| yet. As soon as the last node in the rolling upgrade procedure is
started with |NEW_VERSION|, rollback becomes impossible. At that point, the only way
to restore a cluster to |SRC_VERSION| is by restoring it from backup.
The following procedure describes a rollback from |SCYLLA_NAME| |NEW_VERSION|.x to
|SRC_VERSION|.y. Apply this procedure if an upgrade from |SRC_VERSION| to |NEW_VERSION|
failed before completing on all nodes.
* Use this procedure only for nodes you upgraded to |NEW_VERSION|.
* Execute the commands one node at a time, moving to the next node
only after the rollback procedure is completed successfully.
ScyllaDB rollback is a rolling procedure that does **not** require a full cluster shutdown.
For each of the nodes you rollback to |SRC_VERSION|, you will:
* Drain the node and stop ScyllaDB
* Retrieve the old ScyllaDB packages
* Restore the configuration file
* Reload systemd configuration
* Restart ScyllaDB
* Validate the rollback success
Apply the procedure **serially** on each node. Do not move to the next node
before validating that the rollback was successful and the node is up and
running the old version.
Rollback Steps
==============
Drain and gracefully stop the node
----------------------------------
.. code:: sh
nodetool drain
sudo service scylla-server stop
Download and install the old release
------------------------------------
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Remove the old repo file.
.. code:: sh
sudo rm -rf /etc/apt/sources.list.d/scylla.list
#. Restore the |SRC_VERSION| packages backed up during the upgrade.
.. code:: sh
sudo cp ~/scylla.list-backup /etc/apt/sources.list.d/scylla.list
sudo chown root.root /etc/apt/sources.list.d/scylla.list
sudo chmod 644 /etc/apt/sources.list.d/scylla.list
#. Install:
.. code-block::
sudo apt-get update
sudo apt-get remove scylla\* -y
sudo apt-get install scylla-enterprise
Answer y to the first two questions.
.. group-tab:: RHEL/CentOS
#. Remove the old repo file.
.. code:: sh
sudo rm -rf /etc/yum.repos.d/scylla.repo
#. Restore the |SRC_VERSION| packages backed up during the upgrade procedure.
.. code:: sh
sudo cp ~/scylla.repo-backup /etc/yum.repos.d/scylla.repo
sudo chown root.root /etc/yum.repos.d/scylla.repo
sudo chmod 644 /etc/yum.repos.d/scylla.repo
#. Install:
.. code:: console
sudo yum clean all
sudo yum remove scylla\*
sudo yum install scylla-enterprise
.. group-tab:: EC2/GCP/Azure Ubuntu Image
If youre using the ScyllaDB official image (recommended), see the **Debian/Ubuntu**
tab for upgrade instructions.
If youre using your own image and installed ScyllaDB packages for Ubuntu or Debian,
you need to additionally restore the ``scylla-machine-image`` package.
#. Restore the |SRC_VERSION| packages backed up during the upgrade
(see the **Debian/Ubuntu** tab).
#. Install:
.. code-block::
sudo apt-get update
sudo apt-get remove scylla\* -y
sudo apt-get install scylla-enterprise
sudo apt-get install scylla-enterpraise-machine-image
Answer y to the first two questions.
Restore the configuration file
------------------------------
.. code:: sh
sudo rm -rf /etc/scylla/scylla.yaml
sudo cp /etc/scylla/scylla.yaml-backup /etc/scylla/scylla.yaml
Reload systemd configuration
----------------------------
You must reload the unit file if the systemd unit file is changed.
.. code:: sh
sudo systemctl daemon-reload
Start the node
--------------
.. code:: sh
sudo service scylla-server start
Validate
--------
Check the upgrade instructions above for validation. Once you are sure the node rollback
is successful, move to the next node in the cluster.

View File

@@ -0,0 +1,13 @@
==========================================================
Upgrade - ScyllaDB 2025.1 to ScyllaDB 2025.2
==========================================================
.. toctree::
:maxdepth: 2
:hidden:
Upgrade ScyllaDB <upgrade-guide-from-2025.1-to-2025.2>
Metrics Update <metric-update-2025.1-to-2025.2>
* :doc:`Upgrade from ScyllaDB 2025.1.x to ScyllaDB 2025.2.y <upgrade-guide-from-2025.1-to-2025.2>`
* :doc:`Metrics Update Between 2025.1 and 2025.2 <metric-update-2025.1-to-2025.2>`

View File

@@ -0,0 +1,61 @@
.. |SRC_VERSION| replace:: 2025.1
.. |NEW_VERSION| replace:: 2025.2
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
================================================================
.. toctree::
:maxdepth: 2
:hidden:
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
New Metrics
------------
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
.. list-table::
:widths: 25 150
:header-rows: 1
* - Metric
- Description
* - scylla_alternator_batch_item_count_histogram
- A histogram of the number of items in a batch request.
* - scylla_database_total_view_updates_failed_pairing
- Total number of view updates for which we failed base/view pairing.
* - scylla_group_name_cross_rack_collocations
- The number of co-locating migrations that move replica across racks.
* - scylla_network_bytes_received
- The number of bytes received from network sockets.
* - scylla_network_bytes_sent
- The number of bytes written to network sockets.
* - scylla_reactor_awake_time_ms_total
- Total reactor awake time (wall_clock).
* - scylla_reactor_cpu_used_time_ms
- Total reactor thread CPU time (from CLOCK_THREAD_CPUTIME).
* - scylla_reactor_sleep_time_ms_total
- Total reactor sleep time (wall clock).
* - scylla_sstable_compression_dicts_total_live_memory_bytes
- Total amount of memory consumed by SSTable compression dictionaries in RAM.
* - scylla_transport_connections_blocked
- Holds an incrementing counter with the CQL connections that were blocked
before being processed due to threshold configured via
uninitialized_connections_semaphore_cpu_concurrency.Blocks are normal
when we have multiple connections initialized at once. If connectionsare
timing out and this value is high it indicates either connections storm
or unusually slow processing.
* - scylla_transport_connections_shed
- Holds an incrementing counter with the CQL connections that were shed
due to concurrency semaphore timeout (threshold configured via
uninitialized_connections_semaphore_cpu_concurrency). This typically can
happen during connection.

View File

@@ -1,16 +1,16 @@
.. |SCYLLA_NAME| replace:: ScyllaDB
.. |SRC_VERSION| replace:: 6.2
.. |NEW_VERSION| replace:: 2025.1
.. |SRC_VERSION| replace:: 2025.1
.. |NEW_VERSION| replace:: 2025.2
.. |ROLLBACK| replace:: rollback
.. _ROLLBACK: ./#rollback-procedure
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 6.2 to 2025.1
.. _SCYLLA_METRICS: ../metric-update-6.2-to-2025.1
.. |SCYLLA_METRICS| replace:: ScyllaDB Metrics Update - ScyllaDB 2025.1 to 2025.2
.. _SCYLLA_METRICS: ../metric-update-2025.1-to-2025.2
=======================================================================================
Upgrade from |SCYLLA_NAME| Open Source |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
Upgrade from |SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION|
=======================================================================================
This document describes a step-by-step procedure for upgrading from |SCYLLA_NAME| |SRC_VERSION|
@@ -20,7 +20,7 @@ This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS,
and Ubuntu. See :doc:`OS Support by Platform and Version </getting-started/os-support>`
for information about supported versions.
It also applies when using ScyllaDB official image on EC2, GCP, or Azure.
It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
Before You Upgrade ScyllaDB
==============================
@@ -28,7 +28,7 @@ Before You Upgrade ScyllaDB
**Upgrade Your Driver**
If you're using a :doc:`ScyllaDB driver </using-scylla/drivers/cql-drivers/index>`,
upgrade the driver before you upgrade ScyllaDB. The latest two versions of each driver
upgrade the driver before upgrading ScyllaDB. The latest two versions of each driver
are supported.
**Upgrade ScyllaDB Monitoring Stack**
@@ -150,7 +150,7 @@ You should take note of the current version in case you want to |ROLLBACK|_ the
.. code-block:: console
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.1.list
sudo wget -O /etc/apt/sources.list.d/scylla.list https://downloads.scylladb.com/deb/debian/scylla-2025.2.list
#. Install the new ScyllaDB version:
@@ -168,7 +168,7 @@ You should take note of the current version in case you want to |ROLLBACK|_ the
.. code-block:: console
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.1.repo
sudo curl -o /etc/yum.repos.d/scylla.repo -L https://downloads.scylladb.com/rpm/centos/scylla-2025.2.repo
#. Install the new ScyllaDB version:

View File

@@ -1,13 +0,0 @@
==========================================================
Upgrade - ScyllaDB Open Source 6.2 to ScyllaDB 2025.1
==========================================================
.. toctree::
:maxdepth: 2
:hidden:
Upgrade ScyllaDB <upgrade-guide-from-6.2-to-2025.1>
Metrics Update <metric-update-6.2-to-2025.1>
* :doc:`Upgrade from ScyllaDB Open Source 6.2 .x to ScyllaDB 2025.1.y <upgrade-guide-from-6.2-to-2025.1>`
* :doc:`Metrics Update Between 6.2 and 2025.1 <metric-update-6.2-to-2025.1>`

View File

@@ -1,54 +0,0 @@
.. |SRC_VERSION| replace:: 6.2
.. |NEW_VERSION| replace:: 2025.1
Metrics Update Between |SRC_VERSION| and |NEW_VERSION|
================================================================
.. toctree::
:maxdepth: 2
:hidden:
ScyllaDB |NEW_VERSION| Dashboards are available as part of the latest |mon_root|.
New Metrics
------------
The following metrics are new in ScyllaDB |NEW_VERSION| compared to |SRC_VERSION|:
.. list-table::
:widths: 25 150
:header-rows: 1
* - Metric
- Description
* - scylla_alternator_rcu_total
- The total number of consumed read units, counted as half units.
* - scylla_alternator_wcu_total
- The total number of consumed write units, counted as half units.
* - scylla_rpc_compression_bytes_received
- The bytes read from RPC connections after decompression.
* - scylla_rpc_compression_bytes_sent
- The bytes written to RPC connections before compression.
* - scylla_rpc_compression_compressed_bytes_received
- The bytes read from RPC connections before decompression.
* - scylla_rpc_compression_compressed_bytes_sent
- The bytes written to RPC connections after compression.
* - scylla_rpc_compression_compression_cpu_nanos
- The nanoseconds spent on compression.
* - scylla_rpc_compression_decompression_cpu_nanos
- The nanoseconds spent on decompression.
* - scylla_rpc_compression_messages_received
- The RPC messages received.
* - scylla_rpc_compression_messages_sent
- The RPC messages sent.

View File

@@ -8,10 +8,7 @@ such as Apache Cassandra, or from other ScyllaDB clusters:
* From SSTable to SSTable
- Using nodetool refresh, :ref:`Load and Stream <nodetool-refresh-load-and-stream>` option.
- On a large scale, it requires tooling to upload / transfer files from location to location.
* From SSTable to CQL.
- :doc:`sstableloader</operating-scylla/admin-tools/sstableloader/>`
* From CQL to CQL
- `Spark Migrator <https://github.com/scylladb/scylla-migrator>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.
- `Spark Migrator <https://migrator.docs.scylladb.com/>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.
* From DynamoDB to ScyllaDB Alternator
- `Spark Migrator <https://github.com/scylladb/scylla-migrator>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.
- `Spark Migrator <https://migrator.docs.scylladb.com/>`_. The Spark migrator allows you to easily transform the data before pushing it to the destination DB.

View File

@@ -867,8 +867,8 @@ future<std::vector<kmip_host::id_type>> kmip_host::impl::find_matching_keys(cons
auto [kdl_attrs, crypt_alg] = make_attributes(info, false);
static const char kmip_tag_cryptographic_length[] = KMIP_TAG_CRYPTOGRAPHIC_LENGTH_STR;
static const char kmip_tag_cryptographic_usage_mask[] = KMIP_TAG_CRYPTOGRAPHIC_USAGE_MASK_STR;
static const char kmip_tag_cryptographic_length[] = KMIP_TAGSTR_CRYPTOGRAPHIC_LENGTH;
static const char kmip_tag_cryptographic_usage_mask[] = KMIP_TAGSTR_CRYPTOGRAPHIC_USAGE_MASK;
// #1079. Query mask apparently ignores things like cryptographic
// attribute set of options, instead we must specify the query

15
init.cc
View File

@@ -13,6 +13,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <seastar/core/coroutine.hh>
#include "sstables/sstable_compressor_factory.hh"
logging::logger startlog("init");
@@ -129,3 +130,17 @@ void service_set::add(std::any value) {
std::any service_set::find(const std::type_info& type) const {
return _impl->find(type);
}
// Placed here to avoid dependency on db::config in compress.cc,
// where the rest of default_sstable_compressor_factory_config is.
auto default_sstable_compressor_factory_config::from_db_config(
const db::config& cfg,
std::span<const unsigned> numa_config) -> self
{
return self {
.register_metrics = true,
.enable_writing_dictionaries = cfg.sstable_compression_dictionaries_enable_writing,
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg.sstable_compression_dictionaries_memory_budget_fraction,
.numa_config{numa_config.begin(), numa_config.end()},
};
}

View File

@@ -112,19 +112,19 @@ void production_snitch_base::parse_property_file(std::string contents) {
[[noreturn]]
void production_snitch_base::throw_double_declaration(const sstring& key) const {
logger().error("double \"{}\" declaration in {}", key, _prop_file_name);
logger().warn("double \"{}\" declaration in {}", key, _prop_file_name);
throw bad_property_file_error();
}
[[noreturn]]
void production_snitch_base::throw_bad_format(const sstring& line) const {
logger().error("Bad format in properties file {}: {}", _prop_file_name, line);
logger().warn("Bad format in properties file {}: {}", _prop_file_name, line);
throw bad_property_file_error();
}
[[noreturn]]
void production_snitch_base::throw_incomplete_file() const {
logger().error("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
logger().warn("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
throw bad_property_file_error();
}

View File

@@ -221,6 +221,18 @@ std::optional<tablet_replica> get_leaving_replica(const tablet_info& tinfo, cons
return *leaving.begin();
}
bool is_post_cleanup(tablet_replica replica, const tablet_info& tinfo, const tablet_transition_info& trinfo) {
if (replica == locator::get_leaving_replica(tinfo, trinfo)) {
// we do tablet cleanup on the leaving replica in the `cleanup` stage, after which there is only the `end_migration` stage.
return trinfo.stage == locator::tablet_transition_stage::end_migration;
}
if (replica == trinfo.pending_replica) {
// we do tablet cleanup on the pending replica in the `cleanup_target` stage, after which there is only the `revert_migration` stage.
return trinfo.stage == locator::tablet_transition_stage::revert_migration;
}
return false;
}
tablet_replica_set get_new_replicas(const tablet_info& tinfo, const tablet_migration_info& mig) {
return replace_replica(tinfo.replicas, mig.src, mig.dst);
}

View File

@@ -291,6 +291,10 @@ struct tablet_transition_info {
// Returns the leaving replica for a given transition.
std::optional<tablet_replica> get_leaving_replica(const tablet_info&, const tablet_transition_info&);
// True if the tablet is transitioning and it's in a stage that follows the stage
// where we clean up the tablet on the given replica.
bool is_post_cleanup(tablet_replica replica, const tablet_info& tinfo, const tablet_transition_info& trinfo);
/// Represents intention to move a single tablet replica from src to dst.
struct tablet_migration_info {
locator::tablet_transition_kind kind;

50
main.cc
View File

@@ -1236,17 +1236,19 @@ sharded<locator::shared_token_metadata> token_metadata;
auto stop_lang_man = defer_verbose_shutdown("lang manager", [] { langman.invoke_on_all(&lang::manager::stop).get(); });
langman.invoke_on_all(&lang::manager::start).get();
auto sstable_compressor_factory = make_sstable_compressor_factory(sstable_compressor_factory::config{
.register_metrics = true,
.enable_writing_dictionaries = cfg->sstable_compression_dictionaries_enable_writing,
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg->sstable_compression_dictionaries_memory_budget_fraction,
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
sstable_compressor_factory.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config,
std::cref(*cfg), std::cref(numa_groups))).get();
auto stop_compressor_factory = defer_verbose_shutdown("sstable_compressor_factory", [&sstable_compressor_factory] {
sstable_compressor_factory.stop().get();
});
checkpoint(stop_signal, "starting database");
debug::the_database = &db;
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(*sstable_compressor_factory),
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(sstable_compressor_factory),
std::ref(stop_signal.as_sharded_abort_source()), utils::cross_shard_barrier()).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
@@ -1717,7 +1719,7 @@ sharded<locator::shared_token_metadata> token_metadata;
auto sstables_prefix = std::string_view("sstables/");
if (name.starts_with(sstables_prefix)) {
auto table = table_id(utils::UUID(name.substr(sstables_prefix.size())));
co_await sstable_compressor_factory->set_recommended_dict(table, std::move(dict.data));
co_await sstable_compressor_factory.local().set_recommended_dict(table, std::move(dict.data));
} else if (name == dictionary_service::rpc_compression_dict_name) {
co_await utils::announce_dict_to_shards(compressor_tracker, std::move(dict));
}
@@ -1755,6 +1757,24 @@ sharded<locator::shared_token_metadata> token_metadata;
utils::get_local_injector().inject("stop_after_starting_repair", [] { std::raise(SIGSTOP); });
debug::the_stream_manager = &stream_manager;
checkpoint(stop_signal, "starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
});
checkpoint(stop_signal, "starting streaming manager");
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
return sm.start(stop_signal.as_local_abort_source());
}).get();
api::set_server_stream_manager(ctx, stream_manager).get();
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "initializing storage service");
debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
@@ -1921,24 +1941,6 @@ sharded<locator::shared_token_metadata> token_metadata;
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
});
debug::the_stream_manager = &stream_manager;
checkpoint(stop_signal, "starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
});
checkpoint(stop_signal, "starting streaming manager");
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
return sm.start(stop_signal.as_local_abort_source());
}).get();
api::set_server_stream_manager(ctx, stream_manager).get();
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "starting hinted handoff manager");
if (!hinted_handoff_enabled.is_disabled_for_all()) {
hints_dir_initializer.ensure_rebalanced().get();

View File

@@ -149,7 +149,8 @@ class compact_mutation_state {
gc_clock::time_point _query_time;
max_purgeable_fn _get_max_purgeable;
can_gc_fn _can_gc;
api::timestamp_type _max_purgeable = api::missing_timestamp;
api::timestamp_type _max_purgeable_regular = api::missing_timestamp;
api::timestamp_type _max_purgeable_shadowable = api::missing_timestamp;
std::optional<gc_clock::time_point> _gc_before;
const query::partition_slice& _slice;
uint64_t _row_limit{};
@@ -288,11 +289,12 @@ private:
if (!t) {
return false;
}
if (_max_purgeable == api::missing_timestamp) {
_max_purgeable = _get_max_purgeable(*_dk, is_shadowable);
auto& max_purgeable = is_shadowable ? _max_purgeable_shadowable : _max_purgeable_regular;
if (max_purgeable == api::missing_timestamp) {
max_purgeable = _get_max_purgeable(*_dk, is_shadowable);
}
auto ret = t.timestamp < _max_purgeable;
mclog.debug("can_gc: t={} is_shadowable={} max_purgeable={}: ret={}", t, is_shadowable, _max_purgeable, ret);
auto ret = t.timestamp < max_purgeable;
mclog.debug("can_gc: t={} is_shadowable={} max_purgeable={}: ret={}", t, is_shadowable, max_purgeable, ret);
return ret;
};
@@ -347,7 +349,8 @@ public:
_static_row_live = false;
_partition_tombstone = {};
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
_max_purgeable = api::missing_timestamp;
_max_purgeable_regular = api::missing_timestamp;
_max_purgeable_shadowable = api::missing_timestamp;
_gc_before = std::nullopt;
_last_static_row.reset();
_last_pos = position_in_partition::for_partition_start();

View File

@@ -20,7 +20,6 @@ import logging
import os
import pathlib
import random
import re
import shlex
import shutil
import signal
@@ -102,32 +101,6 @@ def configure_cpusets():
config_logger.info(f"Choosing cpusets for nodes: {NODE_CPUSETS.get()}")
config_logger.info(f"Choosing cpuset for load generators: {CS_CPUSET.get()}")
JAVA_HOME: ContextVar[Optional[str]] = ContextVar('JAVA_HOME')
async def configure_java() -> None:
"""
cassandra-stress can only deal with Java 11
"""
version_output = (await bash("java -version", stderr=asyncio.subprocess.PIPE))[2]
assert isinstance(version_output, bytes)
version_first_line = version_output.decode().split(sep='\n')[0]
config_logger.info(f"First line of java -version: {version_first_line}")
version = 11
if re.search(rf'version.*{version}\.[0-9]+\.[0-9]+', version_first_line):
config_logger.info(f"Default Java version recognized as Java {version}. Proceeding with the default.")
JAVA_HOME.set(None)
return
config_logger.info(f"Default Java version is not recognized as Java {version}.")
if os.path.exists(java_path := f'/usr/lib/jvm/java-{version}'):
config_logger.warning(f"{java_path} found. Choosing it as JAVA_HOME.")
JAVA_HOME.set(java_path)
return
error = f"Failed to find a suitable Java version. Java {version} is required."
config_logger.error(error)
raise RuntimeError(error)
################################################################################
# Child process utilities
@@ -534,7 +507,7 @@ async def with_cluster(executable: PathLike, workdir: PathLike, cpusets: Optiona
def cs_command(cmd: list[str], n: int, node: str, cl: str, pop: Optional[str] = None, warmup: bool = False, rate: str = "threads=200", schema: Optional[str] = None) -> list[str]:
"""Strings together a cassandra-stress command from given options."""
return (["env", f"JAVA_HOME={JAVA_HOME.get()}"] if JAVA_HOME.get() else []) + [
return [
"cassandra-stress",
*cmd,
f"n={n}",
@@ -817,7 +790,6 @@ async def train_full(executable: PathLike, output_profile_file: PathLike, datase
training_logger.info(f"Starting training of executable {executable}. Exhaustive logs can be found in {LOGDIR.get()}/")
configure_cpusets()
await configure_java()
assert executable_exists(executable)

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:5c807f5ffe7f1bf9acb6a461887f31157e3f84886a7749e61396d9153c0863b2
size 6011808
oid sha256:6bdf1d3c9ba4866abe2b9f28542b0912086c43b2737135e911dfdeb70cba3d8c
size 5951620

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:19ad6fbf38bcc7db54150988c4bb394caa487f5e2f2c132bb9f27ad7cae43666
size 6015468
oid sha256:9121eac637fb95a096caec3b2f03d46decaceef3eb30ffe02adcff52aa21cb5c
size 5964108

View File

@@ -397,7 +397,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _view_update_read_concurrency_semaphores_group(
max_memory_concurrent_view_update_reads(),
utils::updateable_value<int>(max_count_concurrent_view_update_reads),
max_inactive_view_update_queue_length(),
std::numeric_limits<size_t>::max(),
_cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_cpu_concurrency,

View File

@@ -1488,7 +1488,6 @@ private:
size_t max_memory_concurrent_view_update_reads() { return _dbcfg.available_memory * 0.01; }
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
size_t max_inactive_queue_length() { return _dbcfg.available_memory * 0.02 / 1000; }
size_t max_inactive_view_update_queue_length() { return _dbcfg.available_memory * 0.01 / 1000; }
// They're rather heavyweight, so limit more
static constexpr size_t max_count_streaming_concurrent_reads{10};
size_t max_memory_streaming_concurrent_reads() { return _dbcfg.available_memory * 0.02; }

View File

@@ -167,13 +167,13 @@ distributed_loader::make_sstables_available(sstables::sstable_directory& dir, sh
}
future<>
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks, sstring cf) {
distributed_loader::process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks, sstring cf, bool skip_cleanup, bool skip_reshape) {
const auto& rs = db.local().find_keyspace(ks).get_replication_strategy();
if (rs.is_per_table()) {
on_internal_error(dblog, "process_upload_dir is not supported with tablets");
}
return seastar::async([&db, &vb, ks = std::move(ks), cf = std::move(cf)] {
return seastar::async([&db, &vb, ks = std::move(ks), cf = std::move(cf), skip_cleanup, skip_reshape] {
auto global_table = get_table_on_all_shards(db, ks, cf).get();
sharded<sstables::sstable_directory> directory;
@@ -217,10 +217,12 @@ distributed_loader::process_upload_dir(distributed<replica::database>& db, shard
// - split the keyspace local ranges per compaction_group as done in table::perform_cleanup_compaction
// so that cleanup can be considered per compaction group
const auto& erm = db.local().find_keyspace(ks).get_vnode_effective_replication_map();
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(erm).get());
auto owned_ranges_ptr = skip_cleanup ? lw_shared_ptr<dht::token_range_vector>(nullptr) : compaction::make_owned_ranges_ptr(db.local().get_keyspace_local_ranges(erm).get());
reshard(directory, db, ks, cf, make_sstable, owned_ranges_ptr).get();
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, make_sstable,
[] (const sstables::shared_sstable&) { return true; }).get();
if (!skip_reshape) {
reshape(directory, db, sstables::reshape_mode::strict, ks, cf, make_sstable,
[] (const sstables::shared_sstable&) { return true; }).get();
}
// Move to staging directory to avoid clashes with future uploads. Unique generation number ensures no collisions.
const bool use_view_update_path = db::view::check_needs_view_update_path(vb.local(), erm->get_token_metadata_ptr(), *global_table, streaming::stream_reason::repair).get();

View File

@@ -91,7 +91,7 @@ public:
get_sstables_from_upload_dir(distributed<replica::database>& db, sstring ks, sstring cf, sstables::sstable_open_config cfg);
static future<std::tuple<table_id, std::vector<std::vector<sstables::shared_sstable>>>>
get_sstables_from_object_store(distributed<replica::database>& db, sstring ks, sstring cf, std::vector<sstring> sstables, sstring endpoint, sstring bucket, sstring prefix, sstables::sstable_open_config cfg, std::function<seastar::abort_source*()> = {});
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name);
static future<> process_upload_dir(distributed<replica::database>& db, sharded<db::view::view_builder>& vb, sstring ks_name, sstring cf_name, bool skip_cleanup, bool skip_reshape);
};
future<sstables::generation_type> highest_generation_seen(sharded<sstables::sstable_directory>& directory);

View File

@@ -830,12 +830,19 @@ public:
auto local_replica = locator::tablet_replica{_my_host_id, this_shard_id()};
for (auto tid : tmap.tablet_ids()) {
auto range = tmap.get_token_range(tid);
if (tmap.has_replica(tid, local_replica)) {
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range));
if (!tmap.has_replica(tid, local_replica)) {
continue;
}
// if the tablet was cleaned up already on this replica, don't allocate a storage group for it.
auto trinfo = tmap.get_tablet_transition_info(tid);
if (trinfo && locator::is_post_cleanup(local_replica, tmap.get_tablet_info(tid), *trinfo)) {
continue;
}
auto range = tmap.get_token_range(tid);
tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name());
ret[tid.value()] = allocate_storage_group(tmap, tid, std::move(range));
}
_storage_groups = std::move(ret);
}
@@ -1892,6 +1899,8 @@ table::sstable_list_builder::build_new_list(const sstables::sstable_set& current
const std::vector<sstables::shared_sstable>& old_sstables) {
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100));
// add sstables from the current list into the new list except the ones that are in the old list
std::vector<sstables::shared_sstable> removed_sstables;
co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) {
@@ -2178,14 +2187,13 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), sstables::compaction_strategy::name(strategy));
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
struct compaction_group_sstable_set_updater {
struct compaction_group_strategy_updater {
table& t;
compaction_group& cg;
compaction_backlog_tracker new_bt;
compaction::compaction_strategy_state new_cs_state;
lw_shared_ptr<sstables::sstable_set> new_sstables;
compaction_group_sstable_set_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
compaction_group_strategy_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
: t(t)
, cg(cg)
, new_bt(new_cs.make_backlog_tracker())
@@ -2196,26 +2204,26 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
auto move_read_charges = new_cs.type() == t._compaction_strategy.type();
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
new_sstables = make_lw_shared<sstables::sstable_set>(new_cs.make_sstable_set(cg.as_table_state()));
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size());
cg.main_sstables()->for_each_sstable([this, &new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
new_sstables->insert(s);
cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
new_sstables_for_backlog_tracker.push_back(s);
});
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
}
void execute() noexcept {
// Update strategy state and backlog tracker according to new strategy. SSTable set update
// is delayed until new compaction, which is triggered on strategy change. SSTable set
// cannot be updated here since it must happen under the set update lock.
t._compaction_manager.register_backlog_tracker(cg.as_table_state(), std::move(new_bt));
cg.set_main_sstables(std::move(new_sstables));
cg.set_compaction_strategy_state(std::move(new_cs_state));
}
};
std::vector<compaction_group_sstable_set_updater> cg_sstable_set_updaters;
std::vector<compaction_group_strategy_updater> cg_sstable_set_updaters;
for_each_compaction_group([&] (compaction_group& cg) {
compaction_group_sstable_set_updater updater(*this, cg, new_cs);
compaction_group_strategy_updater updater(*this, cg, new_cs);
updater.prepare(new_cs);
cg_sstable_set_updaters.push_back(std::move(updater));
});
@@ -2224,7 +2232,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
for (auto& updater : cg_sstable_set_updaters) {
updater.execute();
}
refresh_compound_sstable_set();
}
size_t table::sstables_count() const {
@@ -2707,7 +2714,7 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
handle_tablet_merge_completion(*old_tablet_map, *new_tablet_map);
}
// Allocate storage group if tablet is migrating in.
// Allocate storage group if tablet is migrating in, or deallocate if it's migrating out.
auto this_replica = locator::tablet_replica{
.host = erm.get_token_metadata().get_my_id(),
.shard = this_shard_id()
@@ -2723,6 +2730,8 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
auto range = new_tablet_map->get_token_range(tid);
_storage_groups[tid.value()] = allocate_storage_group(*new_tablet_map, tid, std::move(range));
tablet_migrating_in = true;
} else if (_storage_groups.contains(tid.value()) && locator::is_post_cleanup(this_replica, new_tablet_map->get_tablet_info(tid), transition_info)) {
remove_storage_group(tid.value());
}
}
@@ -2742,7 +2751,7 @@ void tablet_storage_group_manager::update_effective_replication_map(const locato
// Also serves as a protection for clearing the cache on the new range, although it shouldn't be a
// problem as fresh node won't have any data in new range and migration cleanup invalidates the
// range being moved away.
if (tablet_migrating_in) {
if (tablet_migrating_in || old_tablet_count != new_tablet_count) {
refresh_mutation_source();
}
}
@@ -4123,7 +4132,6 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato
co_await stop_compaction_groups(sg);
co_await utils::get_local_injector().inject("delay_tablet_compaction_groups_cleanup", std::chrono::seconds(5));
co_await cleanup_compaction_groups(db, sys_ks, tid, sg);
_sg_manager->remove_storage_group(tid.value());
}
future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) {

View File

@@ -954,12 +954,15 @@ public:
// but caller can skip to a position outside the current set
const dht::ring_position_view& pos = s.pos;
auto token = pos.token();
if (!_cur_set || pos.token() >= _lowest_next_token) {
auto pr_end = s.range ? dht::ring_position_view::for_range_end(*s.range) : dht::ring_position_view::max();
// End of stream is reached when pos is past the end of the read range (i.e. exclude tablets
// that doesn't intersect with the range).
// We don't want to advance next position when EOS has been reached, such that a fast forward
// to the next tablet range will work.
bool eos_reached = dht::ring_position_tri_compare(*_tset.schema(), pos, pr_end) > 0;
if ((!_cur_set || pos.token() >= _lowest_next_token) && !eos_reached) {
auto idx = _tset.group_of(token);
auto pr_end = s.range ? dht::ring_position_view::for_range_end(*s.range) : dht::ring_position_view::max();
// End of stream is reached when pos is past the end of the read range (i.e. exclude tablets
// that doesn't intersect with the range).
if (dht::ring_position_tri_compare(*_tset.schema(), pos, pr_end) <= 0 && _tset._sstable_set_ids.contains(idx)) {
if (_tset._sstable_set_ids.contains(idx)) {
_cur_set = _tset.find_sstable_set(idx);
}
// Set the next token to point to the next engaged storage group.

Submodule seastar updated: d7ff58f2b2...9f0034a099

View File

@@ -769,6 +769,12 @@ utils::input_stream as_input_stream(const bytes_ostream& b) {
return utils::input_stream::fragmented(b.fragments().begin(), b.size());
}
template<FragmentedView View>
inline
auto as_input_stream(View v) {
return fragmented_memory_input_stream(fragment_range(v).begin(), v.size_bytes());
}
template<typename Output, typename ...T>
void serialize(Output& out, const boost::variant<T...>& v) {}

View File

@@ -579,7 +579,7 @@ future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_req
co_await coroutine::parallel_for_each(vnodes_per_addr,
[&] (std::pair<const locator::host_id, dht::partition_range_vector>& vnodes_with_addr) -> future<> {
locator::host_id addr = vnodes_with_addr.first;
query::mapreduce_result& result_ = result;
query::mapreduce_result& shared_accumulator = result;
tracing::trace_state_ptr& tr_state_ = tr_state;
retrying_dispatcher& dispatcher_ = dispatcher;
@@ -600,9 +600,21 @@ future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_req
flogger.debug("received mapreduce_result={} from {}", partial_printer, addr);
auto aggrs = mapreduce_aggregates(req);
co_return co_await aggrs.with_thread_if_needed([&result_, &aggrs, partial_result = std::move(partial_result)] () mutable {
aggrs.merge(result_, std::move(partial_result));
});
// Anytime this coroutine yields, other coroutines may want to write to `shared_accumulator`.
// As merging can yield internally, merging directly to `shared_accumulator` would result in race condition.
// We can safely write to `shared_accumulator` only when it is empty.
while (!shared_accumulator.query_results.empty()) {
// Move `shared_accumulator` content to local variable. Leave `shared_accumulator` empty - now other coroutines can safely write to it.
query::mapreduce_result previous_results = std::exchange(shared_accumulator, {});
// Merge two local variables - it can yield.
co_await aggrs.with_thread_if_needed([&previous_results, &aggrs, &partial_result] () mutable {
aggrs.merge(partial_result, std::move(previous_results));
});
// `partial_result` now contains results merged by this coroutine, but `shared_accumulator` might have been updated by others.
}
// `shared_accumulator` is empty, we can atomically write results merged by this coroutine.
shared_accumulator = std::move(partial_result);
});
mapreduce_aggregates aggrs(req);

View File

@@ -89,7 +89,6 @@ future<raft::index_t> raft_sys_table_storage::load_commit_idx() {
co_return raft::index_t(static_row.get_or<int64_t>("commit_idx", raft::index_t{}.value()));
}
future<raft::log_entries> raft_sys_table_storage::load_log() {
static const auto load_cql = format("SELECT term, \"index\", data FROM system.{} WHERE group_id = ?", db::system_keyspace::RAFT);
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id}, cql3::query_processor::cache_internal::yes);
@@ -103,7 +102,7 @@ future<raft::log_entries> raft_sys_table_storage::load_log() {
}
raft::term_t term = raft::term_t(row.get_as<int64_t>("term"));
raft::index_t idx = raft::index_t(row.get_as<int64_t>("index"));
auto raw_data = row.get_blob("data");
auto raw_data = row.get_view("data");
auto in = ser::as_input_stream(raw_data);
using data_variant_type = decltype(raft::log_entry::data);
data_variant_type data = ser::deserialize(in, std::type_identity<data_variant_type>());

View File

@@ -457,7 +457,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
switch (rs.state) {
case node_state::normal: {
if (is_me(ip)) {
if (is_me(id)) {
co_return;
}
// In replace-with-same-ip scenario the replaced node IP will be the same
@@ -490,8 +490,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
auto old_ip = it->second;
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
co_await _gossiper.force_remove_endpoint(id, gms::null_permit_id);
}
}
break;
@@ -945,22 +943,13 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
if (prev_ip == endpoint) {
co_return;
}
if (_address_map.find(id) != endpoint) {
// Address map refused to update IP for the host_id,
// this means prev_ip has higher generation than endpoint.
// We can immediately remove endpoint from gossiper
// since it represents an old IP (before an IP change)
// for the given host_id. This is not strictly
// necessary, but it reduces the noise circulated
// in gossiper messages and allows for clearer
// expectations of the gossiper state in tests.
co_await _ss._gossiper.force_remove_endpoint(id, permit_id);
// Do not update address.
co_return;
}
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
if (_ss.raft_topology_change_enabled()) {
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
@@ -1691,6 +1680,8 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
slogger.info("Starting up server gossip");
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
co_await _gossiper.start_gossiping(new_generation, app_states);
utils::get_local_injector().inject("stop_after_starting_gossiping",
@@ -6010,14 +6001,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
});
}
inet_address storage_service::host2ip(locator::host_id host) const {
auto ip = _address_map.find(host);
if (!ip) {
throw std::runtime_error(::format("Cannot map host {} to ip", host));
}
return *ip;
}
// Performs a replica-side operation for a given tablet.
// What operation is performed is determined by "op" based on the
// current state of tablet metadata. The coordinator is supposed to prepare tablet
@@ -7281,11 +7264,7 @@ void storage_service::init_messaging_service() {
[this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future<streaming::stream_files_response> {
streaming::stream_files_response resp;
resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future<size_t> {
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req, [&ss] (locator::host_id host) -> future<gms::inet_address> {
return ss.container().invoke_on(0, [host] (storage_service& ss) {
return ss.host2ip(host);
});
});
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req);
co_return res.stream_bytes;
},
size_t(0),

View File

@@ -210,7 +210,6 @@ private:
// when both of which sit on the same node. So all the movement is local.
future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending);
future<> cleanup_tablet(locator::global_tablet_id);
inet_address host2ip(locator::host_id) const;
// Handler for table load stats RPC.
future<locator::load_stats> load_stats_for_tablet_based_tables();
future<> process_tablet_split_candidate(table_id) noexcept;

View File

@@ -1471,8 +1471,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case locator::tablet_transition_stage::use_new:
transition_to_with_barrier(locator::tablet_transition_stage::cleanup);
break;
case locator::tablet_transition_stage::cleanup:
if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] {
case locator::tablet_transition_stage::cleanup: {
bool wait = utils::get_local_injector().enter("cleanup_tablet_wait");
if (!wait && advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] {
auto maybe_dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
if (!maybe_dst) {
rtlogger.info("Tablet cleanup of {} skipped because no replicas leaving", gid);
@@ -1489,6 +1490,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
})) {
transition_to(locator::tablet_transition_stage::end_migration);
}
}
break;
case locator::tablet_transition_stage::cleanup_target:
if (advance_in_background(gid, tablet_state.cleanup, "cleanup_target", [&] {
@@ -1534,7 +1536,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
break;
case locator::tablet_transition_stage::repair: {
if (action_failed(tablet_state.repair)) {
bool fail_repair = utils::get_local_injector().enter("handle_tablet_migration_repair_fail");
if (fail_repair || action_failed(tablet_state.repair)) {
if (do_barrier()) {
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
@@ -2021,6 +2024,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::join_group0, "
"global_token_metadata_barrier failed, error {}",
@@ -2157,6 +2162,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::commit_cdc_generation, "
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
@@ -2236,6 +2243,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("tablets draining failed with {}. Aborting the topology operation", std::current_exception());
_rollback = fmt::format("Failed to drain tablets: {}", std::current_exception());
@@ -2251,6 +2260,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::write_both_read_old, "
"global_token_metadata_barrier failed, error {}",
@@ -2297,6 +2308,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
} catch (term_changed_error&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is {}): {}", state, std::current_exception());
@@ -2327,6 +2340,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::write_both_read_new, "
"global_token_metadata_barrier failed, error {}",
@@ -2466,6 +2481,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::left_token_ring, "
"raft_topology_cmd::command::barrier failed, error {}",
@@ -2481,6 +2498,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node = retake_node(co_await start_operation(), node.id);
}
// Make decommissioning node a non voter before reporting operation completion below.
// Otherwise the decommissioned node may see the completion and exit before it is removed from
// the config at which point the removal from the config will hang if the cluster had only two
// nodes before the decommission.
co_await _voter_handler.on_node_removed(node.id, _as);
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();

View File

@@ -189,7 +189,19 @@ future<float> try_one_compression_config(
const compression_parameters& params,
const utils::chunked_vector<temporary_buffer<char>>& validation_samples
) {
auto factory = make_sstable_compressor_factory();
co_await factory->set_recommended_dict(initial_schema->id(), dict);
co_return co_await try_one_compression_config(*factory, initial_schema, params, validation_samples);
sharded<default_sstable_compressor_factory> factory;
co_await factory.start();
std::exception_ptr ex;
float result;
try {
co_await factory.local().set_recommended_dict(initial_schema->id(), dict);
result = co_await try_one_compression_config(factory.local(), initial_schema, params, validation_samples);
} catch (...) {
ex = std::current_exception();
}
co_await factory.stop();
if (ex) {
co_return coroutine::exception(ex);
}
co_return result;
}

View File

@@ -10,20 +10,89 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "compress.hh"
#include "schema/schema_fwd.hh"
#include "utils/updateable_value.hh"
#include <span>
namespace db {
class config;
} // namespace db
struct dictionary_holder;
class raw_dict;
struct sstable_compressor_factory {
virtual ~sstable_compressor_factory() {}
virtual future<compressor_ptr> make_compressor_for_writing(schema_ptr) = 0;
virtual future<compressor_ptr> make_compressor_for_reading(sstables::compression&) = 0;
virtual future<> set_recommended_dict(table_id, std::span<const std::byte> dict) = 0;
struct config {
bool register_metrics = false;
utils::updateable_value<bool> enable_writing_dictionaries{true};
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
};
};
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg = {});
// Note: I couldn't make this an inner class of default_sstable_compressor_factory,
// because then the compiler gives weird complains about default member initializers in line
// ```
// default_sstable_compressor_factory(config = config{});
// ```
// apparently due to some compiler bug related default initializers.
struct default_sstable_compressor_factory_config {
using self = default_sstable_compressor_factory_config;
static std::vector<unsigned> get_default_shard_to_numa_node_mapping();
bool register_metrics = false;
utils::updateable_value<bool> enable_writing_dictionaries{true};
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
std::vector<unsigned> numa_config{get_default_shard_to_numa_node_mapping()};
static default_sstable_compressor_factory_config from_db_config(
const db::config&,
std::span<const unsigned> numa_config = get_default_shard_to_numa_node_mapping());
};
// Constructs compressors and decompressors for SSTables,
// making sure that the expensive identical parts (dictionaries) are shared
// between all shards within the same NUMA group.
//
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
// decided by a content hash of the dictionary.
// All requests for a given dict ID go through the owner of this ID and return a foreign shared pointer
// to that dict.
//
// (Note: this centralization shouldn't pose a performance problem because a dict is only requested once
// per an opening of an SSTable).
struct default_sstable_compressor_factory : peering_sharded_service<default_sstable_compressor_factory>, sstable_compressor_factory {
using holder = dictionary_holder;
public:
using self = default_sstable_compressor_factory;
using config = default_sstable_compressor_factory_config;
private:
config _cfg;
// Maps NUMA node ID to the array of shards on that node.
std::vector<std::vector<shard_id>> _numa_groups;
// Holds dictionaries owned by this shard.
std::unique_ptr<dictionary_holder> _holder;
// All recommended dictionary updates are serialized by a single "leader shard".
// We do this to avoid dealing with concurrent updates altogether.
semaphore _recommendation_setting_sem{1};
constexpr static shard_id _leader_shard = 0;
private:
using sha256_type = std::array<std::byte, 32>;
unsigned local_numa_id();
shard_id get_dict_owner(unsigned numa_id, const sha256_type& sha);
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t);
future<> set_recommended_dict_local(table_id, std::span<const std::byte> dict);
future<compressor_ptr> make_compressor_for_writing_impl(const compression_parameters&, table_id);
future<compressor_ptr> make_compressor_for_reading_impl(const compression_parameters&, std::span<const std::byte> dict);
public:
default_sstable_compressor_factory(config = config{});
~default_sstable_compressor_factory();
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
future<compressor_ptr> make_compressor_for_writing_for_tests(const compression_parameters&, table_id);
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
future<compressor_ptr> make_compressor_for_reading_for_tests(const compression_parameters&, std::span<const std::byte> dict);
future<> set_recommended_dict(table_id, std::span<const std::byte> dict) override;
};
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread();

View File

@@ -805,6 +805,14 @@ class incremental_reader_selector : public reader_selector {
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
return _fn(sst, *_pr);
}
dht::ring_position_view pr_end() const {
return dht::ring_position_view::for_range_end(*_pr);
}
bool end_of_stream() const {
return _selector_position.is_max() || dht::ring_position_tri_compare(*_s, _selector_position, pr_end()) > 0;
}
public:
explicit incremental_reader_selector(schema_ptr s,
lw_shared_ptr<const sstable_set> sstables,
@@ -839,8 +847,8 @@ public:
auto selection = _selector->select({_selector_position, _pr});
_selector_position = selection.next_position;
irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(),
_selector_position);
irclogger.trace("{}: {} sstables to consider, advancing selector to {}, eos={}", fmt::ptr(this), selection.sstables.size(),
_selector_position, end_of_stream());
readers.clear();
for (auto& sst : selection.sstables) {
@@ -848,7 +856,7 @@ public:
readers.push_back(create_reader(sst));
}
}
} while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
} while (!end_of_stream() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size());
@@ -865,9 +873,15 @@ public:
_pr = &pr;
auto pos = dht::ring_position_view::for_range_start(*_pr);
if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) {
return create_new_readers(pos);
}
// If selector position Y is contained in new range [X, Z], then we should try selecting new
// sstables since it might have sstables that overlap with that range.
if (!_selector_position.is_max() && dht::ring_position_tri_compare(*_s, _selector_position, pr_end()) <= 0) {
return create_new_readers(std::nullopt);
}
return {};
}
@@ -945,7 +959,21 @@ filter_sstable_for_reader_by_ck(std::vector<shared_sstable>&& sstables, replica:
std::vector<frozen_sstable_run>
sstable_set_impl::all_sstable_runs() const {
throw_with_backtrace<std::bad_function_call>();
auto all_sstables = all();
std::unordered_map<sstables::run_id, sstable_run> runs_m;
std::vector<frozen_sstable_run> all_runs;
for (auto&& sst : *all_sstables) {
// When a run cannot accept sstable due to overlapping, treat the rejected sstable
// as a single-fragment run.
if (!runs_m[sst->run_identifier()].insert(sst)) {
all_runs.push_back(make_lw_shared<const sstable_run>(sst));
}
}
for (auto&& r : runs_m | std::views::values) {
all_runs.push_back(make_lw_shared<const sstable_run>(std::move(r)));
}
return all_runs;
}
mutation_reader

View File

@@ -2602,7 +2602,7 @@ future<lw_shared_ptr<checksum>> sstable::read_checksum() {
co_return nullptr;
}
auto checksum = make_lw_shared<sstables::checksum>();
co_await do_read_simple(component_type::CRC, [checksum, this] (version_types v, file crc_file) -> future<> {
co_await do_read_simple(component_type::CRC, [&checksum, this] (version_types v, file crc_file) -> future<> {
file_input_stream_options options;
options.buffer_size = 4096;
@@ -2629,7 +2629,14 @@ future<lw_shared_ptr<checksum>> sstable::read_checksum() {
co_await crc_stream.close();
maybe_rethrow_exception(std::move(ex));
_components->checksum = checksum->weak_from_this();
if (!_components->checksum) {
_components->checksum = checksum->weak_from_this();
} else {
// Race condition: Another fiber/thread has called `read_checksum()`
// while we were loading the component from disk. Discard our local
// copy and use theirs.
checksum = _components->checksum->shared_from_this();
}
});
co_return std::move(checksum);

View File

@@ -528,7 +528,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
// All the global operations are going to happen here, and just the reloading happens
// in there.
future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary_replica_only, stream_scope scope) {
bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope) {
if (_loading_new_sstables) {
throw std::runtime_error("Already loading SSTables. Try again later");
} else {
@@ -544,8 +544,16 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
load_and_stream_desc = "auto-enabled-for-tablets";
}
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}",
ks_name, cf_name, load_and_stream_desc, primary_replica_only);
if (load_and_stream && skip_reshape) {
throw std::runtime_error("Skipping reshape is not possible when doing load-and-stream");
}
if (!load_and_stream && skip_cleanup) {
throw std::runtime_error("Skipping cleanup is not possible when doing load-and-stream");
}
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, skip_cleanup={}",
ks_name, cf_name, load_and_stream_desc, primary_replica_only, skip_cleanup);
try {
if (load_and_stream) {
::table_id table_id;
@@ -560,7 +568,7 @@ future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only, true, scope, {});
});
} else {
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name);
co_await replica::distributed_loader::process_upload_dir(_db, _view_builder, ks_name, cf_name, skip_cleanup, skip_reshape);
}
} catch (...) {
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",

View File

@@ -104,10 +104,14 @@ public:
*
* @param ks_name the keyspace in which to search for new SSTables.
* @param cf_name the column family in which to search for new SSTables.
* @param load_and_stream load SSTables that do not belong to this node and stream them to the appropriate nodes.
* @param primary_replica_only whether to stream only to the primary replica that owns the data.
* @param skip_cleanup whether to skip the cleanup step when loading SSTables.
* @param skip_reshape whether to skip the reshape step when loading SSTables.
* @return a future<> when the operation finishes.
*/
future<> load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary_replica_only, stream_scope scope);
bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope);
/**
* Download new SSTables not currently tracked by the system from object store

View File

@@ -9,7 +9,6 @@
#include "message/messaging_service.hh"
#include "streaming/stream_blob.hh"
#include "streaming/stream_plan.hh"
#include "gms/inet_address.hh"
#include "utils/pretty_printers.hh"
#include "utils/error_injection.hh"
#include "locator/host_id.hh"
@@ -120,7 +119,7 @@ static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_i
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
@@ -310,7 +309,7 @@ future<> stream_blob_handler(replica::database& db,
future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source) {
@@ -374,7 +373,7 @@ namespace streaming {
// Send files in the files list to the nodes in targets list over network
// Returns number of bytes sent over network
future<size_t>
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) {
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) {
size_t ops_total_size = 0;
if (targets.empty()) {
co_return ops_total_size;
@@ -387,7 +386,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
ops_id, sources.size(), sources, targets);
struct sink_and_source {
gms::inet_address node;
locator::host_id node;
rpc::sink<streaming::stream_blob_cmd_data> sink;
rpc::source<streaming::stream_blob_cmd_data> source;
bool sink_closed = false;
@@ -428,10 +427,9 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
for (auto& x : targets) {
const auto& node = x.node;
meta.dst_shard_id = x.shard;
auto ip = co_await host2ip(node);
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets);
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets);
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)});
ss.push_back(sink_and_source{node, std::move(sink), std::move(source)});
}
// This fiber sends data to peer node
@@ -600,7 +598,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
}
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) {
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
stream_files_response resp;
auto& table = db.find_column_family(req.table);
auto sstables = co_await table.take_storage_snapshot(req.range);
@@ -653,7 +651,7 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
req.ops_id, sstables.size(), files.size(), files, req.range);
auto ops_start_time = std::chrono::steady_clock::now();
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard);
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
resp.stream_bytes = stream_bytes;
auto duration = std::chrono::steady_clock::now() - ops_start_time;
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}",

View File

@@ -116,13 +116,13 @@ struct stream_blob_info {
};
// The handler for the STREAM_BLOB verb.
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, gms::inet_address from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
// Exposed mainly for testing
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
@@ -163,11 +163,9 @@ public:
size_t stream_bytes = 0;
};
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
// The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will
// stream sstables files specified by the stream_files_request req.
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip);
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req);
// Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb.
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard);
@@ -178,7 +176,6 @@ future<size_t> tablet_stream_files(netw::messaging_service& ms,
std::vector<node_and_shard> targets,
table_id table,
file_stream_id ops_id,
host2ip_t host2ip,
service::frozen_topology_guard topo_guard,
bool may_inject_errors = false
);

View File

@@ -294,7 +294,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
}
});
ms.register_stream_blob([this] (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
auto from = netw::messaging_service::get_source(cinfo).addr;
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = _ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(_db.local(), _ms.local(), from, meta, sink, source).handle_exception([ms = _ms.local().shared_from_this()] (std::exception_ptr eptr) {
sslog.warn("Failed to run stream blob handler: {}", eptr);
@@ -392,35 +392,14 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, nr_requests, summaries.size());
// prepare tasks
set_state(stream_session_state::PREPARING);
auto& db = manager().db();
for (auto& request : requests) {
// always flush on stream request
sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request);
const auto& ks = request.keyspace;
// Make sure cf requested by peer node exists
for (auto& cf : request.column_families) {
try {
db.find_column_family(ks, cf);
} catch (replica::no_such_column_family&) {
auto err = format("[Stream #{}] prepare requested ks={} cf={} does not exist", plan_id, ks, cf);
sslog.warn("{}", err.c_str());
throw std::runtime_error(err);
}
}
add_transfer_ranges(std::move(request.keyspace), std::move(request.ranges), std::move(request.column_families));
co_await coroutine::maybe_yield();
}
for (auto& summary : summaries) {
sslog.debug("[Stream #{}] prepare stream_summary={}", plan_id, summary);
auto cf_id = summary.cf_id;
// Make sure cf the peer node will send to us exists
try {
db.find_column_family(cf_id);
} catch (replica::no_such_column_family&) {
auto err = format("[Stream #{}] prepare cf_id={} does not exist", plan_id, cf_id);
sslog.warn("{}", err.c_str());
throw std::runtime_error(err);
}
prepare_receiving(summary);
}

View File

@@ -144,6 +144,13 @@ def test_tag_resource_write_isolation_values(scylla_only, test_table):
test_table.meta.client.tag_resource(ResourceArn=arn, Tags=[{'Key':'system:write_isolation', 'Value':i}])
with pytest.raises(ClientError, match='ValidationException'):
test_table.meta.client.tag_resource(ResourceArn=arn, Tags=[{'Key':'system:write_isolation', 'Value':'bah'}])
# Verify that reading system:write_isolation is possible (we didn't
# accidentally prevent it while fixing #24098)
keys = [tag['Key'] for tag in test_table.meta.client.list_tags_of_resource(ResourceArn=arn)['Tags']]
assert 'system:write_isolation' in keys
# Finally remove the system:write_isolation tag so to not modify the
# default behavior of test_table.
test_table.meta.client.untag_resource(ResourceArn=arn, TagKeys=['system:write_isolation'])
# Test that if trying to create a table with forbidden tags (in this test,
# a list of tags longer than the maximum allowed of 50 tags), the table
@@ -168,9 +175,9 @@ def test_too_long_tags_from_creation(dynamodb):
dynamodb.meta.client.describe_table(TableName=name)
# This test is similar to the above, but uses another case of forbidden tags -
# here an illegal value for the system::write_isolation tag. This is a
# here an illegal value for the system:write_isolation tag. This is a
# scylla_only test because only Alternator checks the validity of the
# system::write_isolation tag.
# system:write_isolation tag.
# Reproduces issue #6809, where the table creation appeared to fail, but it
# was actually created (without the tag).
def test_forbidden_tags_from_creation(scylla_only, dynamodb):

View File

@@ -13,8 +13,7 @@ from decimal import Decimal
import pytest
from botocore.exceptions import ClientError
from test.alternator.util import new_test_table, random_string, full_query, unique_table_name, is_aws, \
client_no_transform
from .util import new_test_table, random_string, full_query, unique_table_name, is_aws, client_no_transform, multiset
# All tests in this file are expected to fail with tablets due to #16567.
# To ensure that Alternator TTL is still being tested, instead of
@@ -809,3 +808,42 @@ def test_ttl_expiration_long(dynamodb, waits_for_expiration):
break
time.sleep(max_duration/100.0)
assert count == 99*N
# Alternator uses a tag "system:ttl_attribute" to store the TTL attribute
# chosen by UpdateTimeToLive. However, this tag is not supposed to be
# readable or writable by the user directly - it should be read or written
# only with the usual UpdateTimeToLive and DescribeTimeToLive operations.
# The following two test confirms that this is the case. The first test
# checks that the internal tag is invisible, i.e., not returned by
# ListTagsOfResource. Basically we check that enabling TTL does not add
# any tags to the list of tags.
# Reproduces issue #24098.
def test_ttl_tag_is_invisible(dynamodb):
with new_test_table(dynamodb,
Tags=TAGS,
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]
) as table:
client = table.meta.client
client.update_time_to_live(TableName=table.name,
TimeToLiveSpecification={'AttributeName': 'x', 'Enabled': True})
# Verify that TTL is set for this table, but no extra tags
# like "system:ttl_attribute" (or anything else) are visible
# in ListTagsOfResource:
assert client.describe_time_to_live(TableName=table.name)['TimeToLiveDescription'] == {'TimeToLiveStatus': 'ENABLED', 'AttributeName': 'x'}
arn = client.describe_table(TableName=table.name)['Table']['TableArn']
assert multiset(TAGS) == multiset(client.list_tags_of_resource(ResourceArn=arn)['Tags'])
# Now check that the internal tag system:ttl_attribute cannot be written with
# TagResource or UntagResource (it can only be modified by UpdateTimeToLive).
# This is an Scylla-only test because in DynamoDB, there is nothing
# special about the tag name "system:ttl_attribute", and it can be written.
# Reproduces issue #24098.
def test_ttl_tag_is_unwritable(test_table, scylla_only):
tag_name = 'system:ttl_attribute'
client = test_table.meta.client
arn = client.describe_table(TableName=test_table.name)['Table']['TableArn']
with pytest.raises(ClientError, match='ValidationException.*internal'):
client.tag_resource(ResourceArn=arn, Tags=[{'Key': tag_name, 'Value': 'x'}])
with pytest.raises(ClientError, match='ValidationException.*internal'):
client.untag_resource(ResourceArn=arn, TagKeys=[tag_name])

View File

@@ -335,6 +335,7 @@ add_scylla_test(combined_tests
secondary_index_test.cc
sessions_test.cc
sstable_compaction_test.cc
sstable_compressor_factory_test.cc
sstable_directory_test.cc
sstable_set_test.cc
statement_restrictions_test.cc

View File

@@ -596,9 +596,10 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
cf_name))
.get();
f1.get();
e.get_system_keyspace().local().load_built_views().get();
auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index");
e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get();
f2.get();
}
}

View File

@@ -25,6 +25,7 @@
#include "ent/encryption/encryption.hh"
#include "ent/encryption/symmetric_key.hh"
#include "ent/encryption/local_file_provider.hh"
#include "ent/encryption/encryption_exceptions.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/cql_test_env.hh"
@@ -649,10 +650,20 @@ SEASTAR_TEST_CASE(test_kms_provider_with_master_key_in_cf, *check_run_test_decor
);
// should fail
BOOST_REQUIRE_THROW(
co_await test_provider("'key_provider': 'KmsKeyProviderFactory', 'kms_host': 'kms_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", tmp, yaml)
, std::exception
);
try {
try {
co_await test_provider("'key_provider': 'KmsKeyProviderFactory', 'kms_host': 'kms_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', "
"'secret_key_strength': 128",
tmp, yaml);
} catch (std::nested_exception& ex) {
std::rethrow_if_nested(ex);
}
BOOST_FAIL("Required an exception to be re-thrown");
} catch (encryption::configuration_error&) {
// EXPECTED
} catch (...) {
BOOST_FAIL(format("Unexpected exception: {}", std::current_exception()));
}
// should be ok
co_await test_provider(fmt::format("'key_provider': 'KmsKeyProviderFactory', 'kms_host': 'kms_test', 'master_key': '{}', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", kms_key_alias)
@@ -949,10 +960,21 @@ SEASTAR_TEST_CASE(test_gcp_provider_with_master_key_in_cf, *check_run_test_decor
);
// should fail
BOOST_REQUIRE_THROW(
co_await test_provider("'key_provider': 'GcpKeyProviderFactory', 'gcp_host': 'gcp_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", tmp, yaml)
, std::exception
);
try {
try {
co_await test_provider(
"'key_provider': 'GcpKeyProviderFactory', 'gcp_host': 'gcp_test', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128",
tmp,
yaml);
} catch (std::nested_exception& ex) {
std::rethrow_if_nested(ex);
}
BOOST_FAIL("Required an exception to be re-thrown");
} catch (encryption::configuration_error&) {
// EXPECTED
} catch (...) {
BOOST_FAIL(format("Unexpected exception: {}", std::current_exception()));
}
// should be ok
co_await test_provider(fmt::format("'key_provider': 'GcpKeyProviderFactory', 'gcp_host': 'gcp_test', 'master_key': '{}', 'cipher_algorithm':'AES/CBC/PKCS5Padding', 'secret_key_strength': 128", gcp.key_name)
@@ -1072,7 +1094,7 @@ static future<> network_error_test_helper(const tmpdir& tmp, const std::string&
BOOST_REQUIRE_THROW(
co_await test_broken_encrypted_commitlog(args, scopts);
, std::exception
, exceptions::mutation_write_timeout_exception
);
co_await proxy.stop();

View File

@@ -76,7 +76,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
if (!verb_register) {
co_await smp::invoke_on_all([&] {
return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
auto from = netw::messaging_service::get_source(cinfo).addr;
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = global_ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto path = meta.filename + suffix;
@@ -115,10 +115,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
co_return make_file_input_stream(std::move(file), foptions);
};
}
auto host2ip = [&global_db] (locator::host_id id) -> future<gms::inet_address> {
co_return global_db.local().get_token_metadata().get_topology().my_address();
};
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, host2ip, service::null_topology_guard, inject_error);
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error);
co_await mark_tablet_stream_done(ops_id);
testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes);
ret = true;

View File

@@ -812,27 +812,20 @@ SEASTAR_THREAD_TEST_CASE(background_reclaim) {
logalloc::shard_tracker().stop().get();
});
sleep(500ms).get(); // sleep a little, to give the reclaimer a head start
std::vector<managed_bytes> std_allocs;
size_t std_alloc_size = 1000000; // note that managed_bytes fragments these, even in std
for (int i = 0; i < 50; ++i) {
// Background reclaim is supposed to eventually ensure a certain amount of free memory.
while (memory::free_memory() < background_reclaim_free_memory_threshold) {
thread::maybe_yield();
}
auto compacted_pre = logalloc::shard_tracker().statistics().memory_compacted;
fmt::print("compacted {} items {} (pre)\n", compacted_pre, evictable_allocs.size());
std_allocs.emplace_back(managed_bytes::initialized_later(), std_alloc_size);
auto compacted_post = logalloc::shard_tracker().statistics().memory_compacted;
fmt::print("compacted {} items {} (post)\n", compacted_post, evictable_allocs.size());
BOOST_REQUIRE_EQUAL(compacted_pre, compacted_post);
// Pretend to do some work. Sleeping would be too easy, as the background reclaim group would use
// all that time.
//
// Use thread_cputime_clock to prevent overcommitted test machines from stealing CPU time
// and causing test failures.
auto deadline = thread_cputime_clock::now() + 100ms;
while (thread_cputime_clock::now() < deadline) {
thread::maybe_yield();
}
}
}

View File

@@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
}
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
return sstables::test_env::do_with([] (sstables::test_env& env) {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type)
@@ -581,7 +581,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
return with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
return seastar::async([&env, s, &cf] {
// populate
auto new_key = [&] {
@@ -645,7 +645,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
flushed.get();
});
}).then([cf_stats] {});
}).get();
});
}
@@ -4023,3 +4023,108 @@ SEASTAR_THREAD_TEST_CASE(test_to_data_query_results_with_distinct_and_per_partit
BOOST_REQUIRE_EQUAL(result.row_count(), pkeys.size() * 2);
}
}
// Max-purgeable has two values: one for regular and one for shadowable
// tombstones. Check that the value is not sticky -- if a shadowable is requested
// first, it won't apply to regular tombstones and vice-versa.
SEASTAR_THREAD_TEST_CASE(test_mutation_compactor_sticky_max_purgeable) {
simple_schema ss;
auto s = ss.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto dk = ss.make_pkey(1);
const auto& v_def = *s->get_column_definition(to_bytes("v"));
const auto value = serialized("v");
const auto deletion_time = gc_clock::now() - std::chrono::hours(1) - s->gc_grace_seconds();
const auto compaction_time = gc_clock::now();
const api::timestamp_type shadowable_max_purgeable = 110;
const api::timestamp_type regular_max_purgeable = 50;
const api::timestamp_type timestamp = 100;
class mutation_rebuilding_consumer {
mutation_rebuilder_v2 _mr;
public:
explicit mutation_rebuilding_consumer(schema_ptr s) : _mr(std::move(s)) { }
void consume_new_partition(dht::decorated_key dk) { _mr.consume_new_partition(std::move(dk)); }
void consume(tombstone t) { _mr.consume(t); }
stop_iteration consume(static_row&& sr, tombstone, bool) { return _mr.consume(std::move(sr)); }
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return _mr.consume(std::move(cr)); }
stop_iteration consume(range_tombstone_change&& rtc) { return _mr.consume(std::move(rtc)); }
stop_iteration consume_end_of_partition() { return _mr.consume_end_of_partition(); }
mutation_opt consume_end_of_stream() { return _mr.consume_end_of_stream(); }
};
auto get_max_purgeable = [] (const dht::decorated_key&, is_shadowable is) {
return is == is_shadowable::yes ? shadowable_max_purgeable : regular_max_purgeable;
};
auto compact_and_expire = [&] (mutation mut) {
auto reader = make_mutation_reader_from_mutations(s, permit, std::move(mut));
auto close_reader = deferred_close(reader);
auto compactor = compact_for_compaction_v2<mutation_rebuilding_consumer>(
*s,
compaction_time,
get_max_purgeable,
tombstone_gc_state(nullptr),
mutation_rebuilding_consumer(s));
auto mut_opt = reader.consume(std::move(compactor)).get();
BOOST_REQUIRE(mut_opt);
return *mut_opt;
};
// max-purgeable returned for shadowable tombstone becomes sticky and applies to row tombstone after it
{
mutation mut(s, dk);
mutation mut_compacted(s, dk);
auto row1 = clustering_row(ss.make_ckey(1));
row1.apply(shadowable_tombstone(timestamp, deletion_time));
auto row2 = clustering_row(ss.make_ckey(2));
row2.apply(tombstone(timestamp, deletion_time));
auto row3 = clustering_row(ss.make_ckey(3));
row3.cells().apply(v_def, atomic_cell::make_live(*v_def.type, timestamp, value));
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row2)));
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row3)));
mut.apply(mutation_fragment(*s, permit, std::move(row1)));
mut.apply(mutation_fragment(*s, permit, std::move(row2)));
mut.apply(mutation_fragment(*s, permit, std::move(row3)));
assert_that(compact_and_expire(std::move(mut))).is_equal_to(mut_compacted);
}
// max-purgeable returned for regular tombstone becomes sticky and applies to shadowable tombstone after it
{
mutation mut(s, dk);
mutation mut_compacted(s, dk);
auto row1 = clustering_row(ss.make_ckey(1));
row1.apply(tombstone(timestamp, deletion_time));
auto row2 = clustering_row(ss.make_ckey(2));
row2.apply(shadowable_tombstone(timestamp, deletion_time));
auto row3 = clustering_row(ss.make_ckey(3));
row3.cells().apply(v_def, atomic_cell::make_live(*v_def.type, timestamp, value));
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row1)));
mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row3)));
mut.apply(mutation_fragment(*s, permit, std::move(row1)));
mut.apply(mutation_fragment(*s, permit, std::move(row2)));
mut.apply(mutation_fragment(*s, permit, std::move(row3)));
assert_that(compact_and_expire(std::move(mut))).is_equal_to(mut_compacted);
}
}

View File

@@ -530,6 +530,22 @@ static void reverse(schema_ptr s, mutation_partition& m) {
m = std::move(reverse(mutation(s, std::move(dk), std::move(m))).partition());
}
void assert_has_same_squashed_continuity(const mutation_partition& actual, mvcc_partition& expected) {
const schema& s = *expected.schema();
auto expected_cont = expected.entry().squashed_continuity(s);
auto actual_cont = actual.get_continuity(s);
bool actual_static_cont = actual.static_row_continuous();
bool expected_static_cont = expected.squashed().static_row_continuous();
if (actual_static_cont != expected_static_cont) {
BOOST_FAIL(format("Static row continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
expected_static_cont, actual_static_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
}
if (!expected_cont.equals(s, actual_cont)) {
BOOST_FAIL(format("Continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
expected_cont, actual_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
}
}
SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
// Tests that reading many versions using a cursor gives the logical mutation back.
return seastar::async([] {
@@ -558,7 +574,21 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
auto snap = e.read();
auto actual = read_using_cursor(*snap);
assert_that(s, actual).has_same_continuity(expected);
// Checks that the squashed continuity of `e` is equal to continuity of `actual`.
// Note: squashed continuity of an entry is slightly different than the continuity
// of a squashed entry.
//
// Squashed continuity is the union of continuities of all versions in the entry,
// and in particular it includes empty dummy rows resulting in the logical merge
// of version.
// The process of actually squashing an entry is allowed to
// remove those empty dummies, so the squashed entry can have slightly
// smaller continuity.
//
// Since a cursor isn't allowed to remove dummy rows, the strongest test
// we can do here is to compare the continuity of the cursor-read mutation
// with the squashed continuity of the entry.
assert_has_same_squashed_continuity(actual, e);
assert_that(s, actual).is_equal_to_compacted(expected);
// Reversed iteration

View File

@@ -242,7 +242,8 @@ SEASTAR_THREAD_TEST_CASE(test_result_map_reduce) {
auto bar_exc = [] () { return result<sstring>(bo::failure(bar_exception())); };
auto foo_throw = [] () { return make_exception_future<result<sstring>>(foo_exception()); };
BOOST_REQUIRE_EQUAL(reduce(sstring("brown"), sstring("fox")).value(), "the brown fox");
auto res = reduce(sstring("brown"), sstring("fox")).value();
BOOST_REQUIRE(res == "the brown fox" || res == "the fox brown");
BOOST_REQUIRE_EQUAL(reduce(foo_exc(), sstring("fox")).error(), exc_container(foo_exception()));
BOOST_REQUIRE_EQUAL(reduce(sstring("brown"), foo_exc()).error(), exc_container(foo_exception()));
BOOST_REQUIRE_EQUAL(reduce(foo_exc(), bar_exc()).error(), exc_container(foo_exception()));

View File

@@ -1666,7 +1666,7 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
}
SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
return test_env::do_with([] (test_env& env) {
return test_env::do_with_async([] (test_env& env) {
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
auto ts_in_ms = std::chrono::milliseconds(ts);
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);
@@ -1702,7 +1702,6 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
BOOST_REQUIRE(ret.second == expected);
}
return make_ready_future<>();
});
}
@@ -2319,6 +2318,7 @@ public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
private:
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<test_env> _env;
uint32_t _seed;
std::unique_ptr<tests::random_schema_specification> _random_schema_spec;
@@ -2336,7 +2336,7 @@ public:
compress))
, _random_schema(_seed, *_random_schema_spec)
{
_env.start().get();
_env.start(test_env_config(), std::ref(*scf)).get();
testlog.info("random_schema: {}", _random_schema.cql());
}
@@ -2402,12 +2402,14 @@ public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
private:
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<test_env> _env;
public:
scrub_test_framework()
{
_env.start().get();
_env.start(test_env_config(), std::ref(*scf)).get();
}
~scrub_test_framework() {

View File

@@ -0,0 +1,137 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <fmt/ranges.h>
#include <seastar/util/defer.hh>
#include <seastar/testing/thread_test_case.hh>
#include "sstables/sstable_compressor_factory.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
BOOST_AUTO_TEST_SUITE(sstable_compressor_factory_test)
// 1. Create a random message.
// 2. Set this random message as the recommended dict.
// 3. On all shards, create compressors.
// 4. Check that they are using the recommended dict (i.e. that the original message compresses perfectly).
// 5. Check that the used dictionaries are owned by shards on the same NUMA node.
// 6. Check that the number of dictionary copies is equal to number of NUMA nodes.
// 7. Repeat this a few times for both lz4 and zstd.
void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
// Create a compressor factory.
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
auto config = default_sstable_compressor_factory::config{
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
};
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
sstable_compressor_factory.start(std::cref(config)).get();
auto stop_compressor_factory = defer([&sstable_compressor_factory] { sstable_compressor_factory.stop().get(); });
// The factory keeps recommended dicts (i.e. dicts for writing) per table ID.
auto table = table_id::create_random_id();
// Retry a few times just to check that it works more than once.
for (int retry = 0; retry < 3; ++retry) {
// Generate a random (and hence uhcompressible without a dict) message.
auto message = tests::random::get_sstring(4096);
auto dict_view = std::as_bytes(std::span(message));
// Set the message as the dict to make the message perfectly compressible.
sstable_compressor_factory.local().set_recommended_dict(table, dict_view).get();
// We'll put the owners here to check that the number of owners matches the number of NUMA nodes.
std::vector<unsigned> compressor_numa_nodes(smp::count);
std::vector<unsigned> decompressor_numa_nodes(smp::count);
// Try for both algorithms, just in case there are some differences in how dictionary
// distribution over shards is implemented between them.
for (const auto algo : {compressor::algorithm::lz4_with_dicts, compressor::algorithm::zstd_with_dicts}) {
sstable_compressor_factory.invoke_on_all(coroutine::lambda([&] (default_sstable_compressor_factory& local) -> seastar::future<> {
// Validate that the dictionaries work as intended,
// and check that their owner is as expected.
auto params = compression_parameters(algo);
auto compressor = co_await local.make_compressor_for_writing_for_tests(params, table);
auto decompressor = co_await local.make_compressor_for_reading_for_tests(params, dict_view);
auto our_numa_node = shard_to_numa_mapping[this_shard_id()];
auto compressor_numa_node = shard_to_numa_mapping[compressor->get_dict_owner_for_test().value()];
auto decompressor_numa_node = shard_to_numa_mapping[decompressor->get_dict_owner_for_test().value()];
// Check that the dictionary used by this shard lies on the same NUMA node.
// This is important to avoid cross-node memory accesses on the hot path.
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
auto output_max_size = compressor->compress_max_size(message.size());
auto compressed = std::vector<char>(output_max_size);
auto compressed_size = compressor->compress(
reinterpret_cast<const char*>(message.data()), message.size(),
reinterpret_cast<char*>(compressed.data()), compressed.size());
BOOST_REQUIRE_GE(compressed_size, 0);
compressed.resize(compressed_size);
// Validate that the recommeded dict was actually used.
BOOST_CHECK(compressed.size() < message.size() / 10);
auto decompressed = std::vector<char>(message.size());
auto decompressed_size = decompressor->uncompress(
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
BOOST_REQUIRE_GE(decompressed_size, 0);
decompressed.resize(decompressed_size);
// Validate that the roundtrip through compressor and decompressor
// resulted in the original message.
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
})).get();
}
// Check that the number of owners (and hence, copies) is equal to the number
// of NUMA nodes.
// This isn't that important, but we don't want to duplicate dictionaries
// within a NUMA node unnecessarily.
BOOST_CHECK_EQUAL(
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
BOOST_CHECK_EQUAL(
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
}
}
SEASTAR_THREAD_TEST_CASE(test_numa_awareness) {
{
std::vector<unsigned> one_numa_node(smp::count);
test_one_numa_topology(one_numa_node);
}
{
std::vector<unsigned> two_numa_nodes(smp::count);
for (size_t i = 0; i < two_numa_nodes.size(); ++i) {
two_numa_nodes[i] = i % 2;
}
test_one_numa_topology(two_numa_nodes);
}
{
std::vector<unsigned> n_numa_nodes(smp::count);
for (size_t i = 0; i < n_numa_nodes.size(); ++i) {
n_numa_nodes[i] = i;
}
test_one_numa_topology(n_numa_nodes);
}
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -466,8 +466,8 @@ static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_p
}
SEASTAR_TEST_CASE(check_read_indexes) {
return test_env::do_with([] (test_env& env) {
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return seastar::async([&env, version] {
auto builder = schema_builder("test", "summary_test")
.with_column("a", int32_type, column_kind::partition_key);
@@ -478,7 +478,7 @@ SEASTAR_TEST_CASE(check_read_indexes) {
auto list = sstables::test(sst).read_indexes(env.make_reader_permit()).get();
BOOST_REQUIRE(list.size() == 130);
});
});
}).get();
});
}
@@ -499,8 +499,8 @@ SEASTAR_TEST_CASE(check_multi_schema) {
// d int,
// e blob
//);
return test_env::do_with([] (test_env& env) {
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return seastar::async([&env, version] {
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
auto builder = schema_builder("test", "test_multi_schema")
@@ -532,7 +532,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
BOOST_REQUIRE(!m);
});
});
});
}).get();
});
}
@@ -2413,7 +2413,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
}
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
return test_env::do_with([] (test_env& env) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
@@ -2441,8 +2441,6 @@ SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
BOOST_REQUIRE(insert(2, 2) == true);
BOOST_REQUIRE(insert(5, 5) == true);
BOOST_REQUIRE(run.all().size() == 5);
return make_ready_future<>();
});
}

View File

@@ -32,7 +32,9 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -56,7 +58,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -100,7 +104,8 @@ static bool partial_create_links(sstable_ptr sst, fs::path dst_path, sstables::g
SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -121,7 +126,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
SEASTAR_THREAD_TEST_CASE(test_sstable_move_exists_failure) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
// please note, the SSTables used by this test are stored under

View File

@@ -22,6 +22,7 @@
#include "test/lib/simple_schema.hh"
#include "test/lib/sstable_utils.hh"
#include "readers/from_mutations.hh"
#include "service/storage_service.hh"
BOOST_AUTO_TEST_SUITE(sstable_set_test)
@@ -203,4 +204,224 @@ SEASTAR_TEST_CASE(test_tablet_sstable_set_copy_ctor) {
}, std::move(cfg));
}
SEASTAR_TEST_CASE(test_sstable_set_fast_forward_by_cache_reader_simulation) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pks = tests::generate_partition_keys(6, s);
std::vector<mutation> muts;
for (auto pk : pks) {
auto mut = mutation(s, pk);
ss.add_row(mut, ss.make_ckey(0), "val");
muts.push_back(std::move(mut));
}
sstable_writer_config cfg = env.manager().configure_writer("");
std::vector<sstables::shared_sstable> ssts;
{
auto mr = make_mutation_reader_from_mutations(s, env.make_reader_permit(), {muts[0], muts[1], muts[2]});
auto sst = make_sstable_easy(env, std::move(mr), cfg);
testlog.info("sstable [{}, {}]", sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
ssts.push_back(std::move(sst));
}
{
auto mr = make_mutation_reader_from_mutations(s, env.make_reader_permit(), {muts[4], muts[5]});
auto sst = make_sstable_easy(env, std::move(mr), cfg);
testlog.info("sstable [{}, {}]", sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
ssts.push_back(std::move(sst));
}
auto token_range = dht::token_range::make(dht::first_token(), dht::last_token());
auto set = make_lw_shared<sstable_set>(std::make_unique<partitioned_sstable_set>(ss.schema(), token_range));
for (auto& sst : ssts) {
set->insert(sst);
}
// simulation of full scan on range [0, 5]
// cache reader fetches [0, 1] -> next [4]
// [2] consumed from cache
// fast forward to [3, 5]
auto first_range = dht::partition_range::make({pks[0]}, {pks[1]});
auto reader = set->make_range_sstable_reader(s, env.make_reader_permit(),
first_range,
s->full_slice(),
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::yes);
auto close_r = deferred_close(reader);
auto mopt = read_mutation_from_mutation_reader(reader).get();
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[0]));
mopt = read_mutation_from_mutation_reader(reader).get();
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[1]));
auto second_range = dht::partition_range::make({pks[3]}, {pks[5]});
reader.fast_forward_to(second_range).get();
mopt = read_mutation_from_mutation_reader(reader).get();
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[4]));
mopt = read_mutation_from_mutation_reader(reader).get();
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, pks[5]));
// EOS
BOOST_REQUIRE(!read_mutation_from_mutation_reader(reader).get());
});
}
static future<> guarantee_all_tablet_replicas_on_shard0(cql_test_env& env) {
auto& ss = env.get_storage_service().local();
auto& stm = env.get_shared_token_metadata().local();
auto my_host_id = ss.get_token_metadata_ptr()->get_topology().my_host_id();
co_await ss.set_tablet_balancing_enabled(false);
co_await stm.mutate_token_metadata([&] (locator::token_metadata& tm) -> future<> {
tm.update_topology(my_host_id, locator::endpoint_dc_rack::default_location, locator::node::state::normal, 1);
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(test_tablet_sstable_set_fast_forward_across_tablet_ranges) {
// enable tablets, to get access to tablet_storage_group_manager
cql_test_config cfg;
cfg.db_config->tablets_mode_for_new_keyspaces(db::tablets_mode_t::mode::enabled);
return do_with_cql_env_thread([&](cql_test_env& env) {
guarantee_all_tablet_replicas_on_shard0(env).get();
env.execute_cql("CREATE KEYSPACE test_tablet_sstable_set"
" WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1} AND TABLETS = {'enabled': true, 'initial': 2};").get();
env.execute_cql("CREATE TABLE test_tablet_sstable_set.test (pk int PRIMARY KEY)").get();
auto& table = env.local_db().find_column_family("test_tablet_sstable_set", "test");
auto s = table.schema();
auto& sgm = column_family_test::get_storage_group_manager(table);
auto erm = table.get_effective_replication_map();
auto& tmap = erm->get_token_metadata().tablets().get_tablet_map(s->id());
std::unordered_map<locator::tablet_id, std::vector<dht::decorated_key>> keys_per_tablet;
table.disable_auto_compaction().get();
for (int i = 0; i < 10; i++) {
env.execute_cql(fmt::format("INSERT INTO test_tablet_sstable_set.test (pk) VALUES ({})", i)).get();
auto key = dht::decorate_key(*s, partition_key::from_singular(*s, i));
keys_per_tablet[tmap.get_tablet_id(key.token())].push_back(key);
// produces single-partition sstables, to stress incremental selector.
table.flush().get();
}
for (auto& [_, keys] : keys_per_tablet) {
auto cmp = dht::decorated_key::less_comparator(s);
std::ranges::sort(keys, cmp);
}
auto set = replica::make_tablet_sstable_set(s, *sgm.get(), tmap);
utils::get_local_injector().enable("enable_read_debug_log");
testlog.info("first tablet range: {}", tmap.get_token_range(locator::tablet_id(0)));
testlog.info("second tablet range: {}", tmap.get_token_range(locator::tablet_id(1)));
auto& keys_for_first_tablet = keys_per_tablet.at(locator::tablet_id(0));
auto& keys_for_second_tablet = keys_per_tablet.at(locator::tablet_id(1));
auto create_reader = [&] (const dht::partition_range& range) {
return set->make_range_sstable_reader(s, make_reader_permit(env),
range,
s->full_slice(),
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::yes);
};
auto read_and_check = [&] (auto& reader, const dht::decorated_key& expected) {
auto mopt = read_mutation_from_mutation_reader(reader).get();
BOOST_REQUIRE(mopt && mopt->decorated_key().equal(*s, expected));
};
auto end_of_stream_check = [&] (auto& reader) {
BOOST_REQUIRE(!read_mutation_from_mutation_reader(reader).get());
};
// simulation of full scan on tablet ranges
// cache reader fetches range of first tablet
// fast forward to range of second tablet
{
auto first_range = dht::partition_range::make({keys_for_first_tablet.front()}, {keys_for_first_tablet.back()});
auto reader = create_reader(first_range);
auto close_r = deferred_close(reader);
for (auto& k : keys_for_first_tablet) {
read_and_check(reader, k);
}
auto second_range = dht::partition_range::make({keys_for_second_tablet.front()}, {keys_for_second_tablet.back()});
reader.fast_forward_to(second_range).get();
for (auto& k: keys_for_second_tablet) {
read_and_check(reader, k);
}
end_of_stream_check(reader);
}
// verify that fast forward will be able to create reader when the new range goes across tablet boundaries.
{
auto first_range = dht::partition_range::make({keys_for_first_tablet[0]}, {keys_for_first_tablet[0]});
auto reader = create_reader(first_range);
auto close_r = deferred_close(reader);
for (auto& k : std::span{keys_for_first_tablet.begin(), 1}) {
read_and_check(reader, k);
}
auto second_range = dht::partition_range::make({keys_for_first_tablet[1]}, {keys_for_second_tablet.back()});
reader.fast_forward_to(second_range).get();
for (auto& k : std::span{keys_for_first_tablet.begin() + 1, keys_for_first_tablet.size() - 1}) {
read_and_check(reader, k);
}
for (auto& k: keys_for_second_tablet) {
read_and_check(reader, k);
}
end_of_stream_check(reader);
}
// Reproduces a scenario of range scan where fast forward will overlap with next position returned by selector
// full scan: [0, 20]
// 1) cache reader emits [0, 10) (position 10 is cached)
// 2) incremental selector returns 0 sstables, next position of 16 (the start of a sstable)
// 3) fast forward to range [14, 20]
// fast forward might expect new range to be after next position (16), but [14, 20] is before and overlaps with next position.
// the incremental selector must be called also when new range overlaps with next position. otherwise, there's chance of
// missing data.
{
auto first_token = tmap.get_first_token(locator::tablet_id(0));
auto first_range = dht::partition_range::make({dht::ring_position::starting_at(first_token)},
{dht::ring_position::ending_at(first_token)});
auto reader = create_reader(first_range);
auto close_r = deferred_close(reader);
end_of_stream_check(reader);
auto& keys_for_second_tablet = keys_per_tablet.at(locator::tablet_id(1));
auto second_range = dht::partition_range::make({dht::ring_position::starting_at(dht::next_token(first_token))},
{keys_for_second_tablet.back()});
reader.fast_forward_to(second_range).get();
for (auto& k : keys_for_first_tablet) {
read_and_check(reader, k);
}
for (auto& k: keys_for_second_tablet) {
read_and_check(reader, k);
}
end_of_stream_check(reader);
}
}, std::move(cfg));
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -10,6 +10,7 @@
#include <seastar/core/shard_id.hh>
#include <seastar/coroutine/as_future.hh>
#include <source_location>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/random_utils.hh"
@@ -1687,6 +1688,12 @@ void check_no_rack_overload(const token_metadata& tm) {
}
SEASTAR_THREAD_TEST_CASE(test_merge_does_not_overload_racks) {
cql_test_config cfg{};
// This test relies on the fact that we use an RF strictly smaller than the number of racks.
// Because of that, we cannot enable `rf_rack_valid_keyspaces` in this test because we won't
// be able to create a keyspace.
cfg.db_config->rf_rack_valid_keyspaces.set(false);
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
@@ -1733,7 +1740,7 @@ SEASTAR_THREAD_TEST_CASE(test_merge_does_not_overload_racks) {
});
BOOST_REQUIRE_EQUAL(1, stm.get()->tablets().get_tablet_map(table1).tablet_count());
}).get();
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_skiplist) {
@@ -2123,9 +2130,10 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
// which is a proof that it doesn't stop due to active migrations.
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 1);
auto host1 = topo.add_node(node_state::normal, 2);
topo.start_new_rack();
auto host2 = topo.add_node(node_state::normal, 1);
auto host3 = topo.add_node(node_state::normal, 2);
auto host3 = topo.add_node(node_state::normal, 1);
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 4);
auto table1 = add_table(e, ks_name).get();
@@ -2146,8 +2154,8 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
tablet_transition_stage::allow_write_both_read_old,
tablet_transition_kind::migration,
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host3, 0},
tablet_replica {host2, 0},
},
tablet_replica {host3, 0}
});
@@ -2183,6 +2191,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) {
topology_builder topo(e);
auto host1 = topo.add_node(node_state::normal, 1);
topo.start_new_rack();
auto host2 = topo.add_node(node_state::normal, 1);
topo.add_node(node_state::normal, 2);
@@ -2225,10 +2234,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
topology_builder topo(e);
const auto shard_count = 2;
auto host1 = topo.add_node(node_state::normal, shard_count);
auto host2 = topo.add_node(node_state::normal, shard_count);
auto host3 = topo.add_node(node_state::normal, shard_count);
auto host4 = topo.add_node(node_state::normal, shard_count);
auto rack1 = topo.rack();
auto rack2 = topo.start_new_rack();
auto host1 = topo.add_node(node_state::normal, shard_count, rack1);
auto host2 = topo.add_node(node_state::normal, shard_count, rack2);
auto host3 = topo.add_node(node_state::normal, shard_count, rack1);
auto host4 = topo.add_node(node_state::normal, shard_count, rack2);
auto ks_name = add_keyspace(e, {{topo.dc(), 2}}, 16);
auto table1 = add_table(e, ks_name).get();
@@ -2533,35 +2545,39 @@ allocate_replicas_in_racks(const std::vector<endpoint_dc_rack>& racks, int rf,
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
do_with_cql_env_thread([] (auto& e) {
topology_builder topo(e);
const int n_hosts = 6;
auto shard_count = 2;
auto do_test_case = [] (const shard_id rf) {
return do_with_cql_env_thread([rf] (auto& e) {
topology_builder topo(e);
const int n_hosts = 6;
auto shard_count = 2;
std::vector<host_id> hosts;
std::unordered_map<sstring, std::vector<host_id>> hosts_by_rack;
// Sanity check just in case someone modifies the caller of this lambda
// and starts providing RF > n_hosts. In that case, we wouldn't be able
// to create an RF-rack-valid keyspace.
assert(rf <= n_hosts);
std::vector<endpoint_dc_rack> racks {
topo.rack(),
topo.start_new_rack(),
};
std::vector<host_id> hosts;
std::unordered_map<sstring, std::vector<host_id>> hosts_by_rack;
for (int i = 0; i < n_hosts; ++i) {
auto rack = racks[(i + 1) % racks.size()];
auto h = topo.add_node(node_state::normal, shard_count, rack);
if (i) {
// Leave the first host empty by making it invisible to allocation algorithm.
hosts_by_rack[rack.rack].push_back(h);
std::vector<endpoint_dc_rack> racks{topo.rack()};
for (shard_id i = 1; i < rf; ++i) {
racks.push_back(topo.start_new_rack());
}
}
auto& stm = e.shared_token_metadata().local();
for (int i = 0; i < n_hosts; ++i) {
auto rack = racks[(i + 1) % racks.size()];
auto h = topo.add_node(node_state::normal, shard_count, rack);
if (i) {
// Leave the first host empty by making it invisible to allocation algorithm.
hosts_by_rack[rack.rack].push_back(h);
}
}
auto& stm = e.shared_token_metadata().local();
for (int i = 0; i < 13; ++i) {
size_t total_tablet_count = 0;
std::vector<sstring> keyspaces;
size_t tablet_count_bits = 8;
int rf = tests::random::get_int<shard_id>(2, 4);
for (size_t log2_tablets = 0; log2_tablets < tablet_count_bits; ++log2_tablets) {
if (tests::random::get_bool()) {
continue;
@@ -2622,8 +2638,15 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
seastar::parallel_for_each(keyspaces, [&] (const sstring& ks) {
return e.execute_cql(fmt::format("DROP KEYSPACE {}", ks)).discard_result();
}).get();
}
}).get();
});
};
const int test_case_number = 13;
for (int i = 0; i < test_case_number; ++i) {
const shard_id rf = tests::random::get_int<shard_id>(2, 4);
testlog.info("{}: Starting test case {} for RF={}", std::source_location::current().function_name(), i + 1, rf);
do_test_case(rf).get();
}
}
SEASTAR_THREAD_TEST_CASE(test_balancing_heterogeneous_cluster) {
@@ -2820,6 +2843,13 @@ SEASTAR_THREAD_TEST_CASE(test_imbalance_in_hetero_cluster_with_two_tables_imbala
}
SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) {
cql_test_config cfg = tablet_cql_test_config();
// FIXME: This test creates two keyspaces with two different replication factors.
// What's more, we distribute the nodes across only two racks. Because of that,
// we won't be able to enable `rf_rack_valid_keyspaces`. That would require
// increasing the number of racks to three, as well as implementing scylladb/scylladb#23426.
cfg.db_config->rf_rack_valid_keyspaces.set(false);
do_with_cql_env_thread([] (auto& e) {
auto per_shard_goal = e.local_db().get_config().tablets_per_shard_goal();
@@ -2871,7 +2901,7 @@ SEASTAR_THREAD_TEST_CASE(test_per_shard_goal_mixed_dc_rf) {
BOOST_REQUIRE_LE(l.max(), 2 * per_shard_goal);
}
}
}, tablet_cql_test_config()).get();
}, cfg).get();
}
// This test verifies that per-table tablet count is adjusted
@@ -3174,6 +3204,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_random_load)
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack) {
cql_test_config cfg{};
// This test purposefully uses just one rack, which means that we cannot enable
// the `rf_rack_valid_keyspaces` configuration option because we won't be able to create
// a keyspace with RF > 1.
cfg.db_config->rf_rack_valid_keyspaces.set(false);
do_with_cql_env_thread([] (auto& e) {
const int rf = 2;
const int n_racks = 1;
@@ -3200,7 +3236,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_single_rack)
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}).get();
}, cfg).get();
}
// Verify merge can proceed with multiple racks and RF=#racks
@@ -3243,6 +3279,20 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_multiple_rack
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission) {
cql_test_config cfg{};
// The scenario this test addresses cannot happen with `rf_rack_valid_keyspaces` set to true.
//
// Among the tablet replicas for a given tablet, there CANNOT be two nodes from the same rack.
// After the decommission of B, both tablets will reside on ALL other nodes, which implies that
// they're on pairwise distinct racks. However, since B was taking part in replication of the
// tablets, it must've been among the replicas of at least one of the tablets and, for the very
// same reason, it must be on a separate rack. Hence, all nodes must reside on pairwise distinct racks.
//
// So, we if want to keep the current number of nodes and RF, we must have 4 racks. But we cannot
// do that until we've implemented scylladb/scylladb#23737. Besides, the test seems to rely on
// using just one rack, which makes it incompatible with `rf_rack_valid_keyspaces: true` anyway.
cfg.db_config->rf_rack_valid_keyspaces.set(false);
do_with_cql_env_thread([] (auto& e) {
const int rf = 3;
const int n_racks = 1;
@@ -3292,7 +3342,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_merge_colocation_with_decomission)
};
do_test_load_balancing_merge_colocation(e, n_racks, rf, n_hosts, shard_count, initial_tablets, set_tablets);
}).get();
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
@@ -3300,6 +3350,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_resize_requests) {
topology_builder topo(e);
topo.add_node(node_state::normal, 2);
topo.start_new_rack();
topo.add_node(node_state::normal, 2);
const size_t initial_tablets = 2;

View File

@@ -14,11 +14,12 @@ from test.pylib.internal_types import HostID
import pytest
import asyncio
import logging
import time
from test.cluster.conftest import skip_mode
from test.cluster.util import get_topology_coordinator, find_server_by_host_id
from test.cluster.mv.tablets.test_mv_tablets import get_tablet_replicas
from test.cluster.util import new_test_keyspace
from test.cluster.util import new_test_keyspace, wait_for
logger = logging.getLogger(__name__)
@@ -44,6 +45,14 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk) WITH SYNCHRONOUS_UPDATES = TRUE")
async def replicas_balanced():
base_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "test", 0)]
view_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "tv", 0)]
return len(set(base_replicas) & set(view_replicas)) == 0 or None
# There's 4 nodes and 4 tablets, so even if the initial placement is not balanced,
# each node should get 1 replica after some time.
await wait_for(replicas_balanced, time.time() + 60)
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
# See https://github.com/scylladb/scylladb/issues/16527
await manager.api.disable_tablet_balancing(servers[0].ip_addr)

View File

@@ -400,24 +400,18 @@ class topo:
self.racks = racks
self.dcs = dcs
@pytest.mark.asyncio
@pytest.mark.parametrize("topology", [
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 3, dcs = 1),
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
])
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology):
'''Check that restoring of a cluster with stream scopes works'''
async def create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, s3_server=None):
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
cfg = {'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
if s3_server:
objconf = MinioServer.create_conf(s3_server.address, s3_server.port, s3_server.region)
cfg['object_storage_endpoints'] = objconf
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks')
objconf = MinioServer.create_conf(s3_server.address, s3_server.port, s3_server.region)
cfg = { 'object_storage_endpoints': objconf, 'task_ttl_in_seconds': 300 }
cmd = [ '--logger-log-level', 'sstables_loader=debug:sstable_directory=trace:snapshots=trace:s3=trace:sstable=debug:http=debug' ]
servers = []
host_ids = {}
for s in range(topology.nodes):
dc = f'dc{s % topology.dcs}'
rack = f'rack{s % topology.racks}'
@@ -426,13 +420,12 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
servers.append(s)
host_ids[s.server_id] = await manager.get_host_id(s.server_id)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
return servers,host_ids
def create_dataset(manager, ks, cf, topology, logger):
cql = manager.get_cql()
logger.info(f'Create keyspace, rf={topology.rf}')
keys = range(256)
ks = 'ks'
cf = 'cf'
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy', 'replication_factor': f'{topology.rf}'})
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
@@ -441,9 +434,11 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
for k in keys:
cql.execute(f"INSERT INTO {ks}.{cf} ( pk, value ) VALUES ({k}, '{k}');")
snap_name = unique_name('backup_')
return schema, keys, replication_opts
async def take_snapshot(ks, servers, manager, logger):
logger.info(f'Take snapshot and collect sstables lists')
snap_name = unique_name('backup_')
sstables = []
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
@@ -454,27 +449,9 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
logger.info(f'Collected sstables from {s.ip_addr}:{cf_dir}/snapshots/{snap_name}: {tocs}')
sstables += tocs
logger.info(f'Backup to {snap_name}')
prefix = f'{cf}/{snap_name}'
async def do_backup(s):
tid = await manager.api.backup(s.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, prefix)
status = await manager.api.wait_task(s.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
await asyncio.gather(*(do_backup(s) for s in servers))
logger.info(f'Re-initialize keyspace')
cql.execute(f'DROP KEYSPACE {ks}')
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
cql.execute(schema)
logger.info(f'Restore')
async def do_restore(s, toc_names, scope):
logger.info(f'Restore {s.ip_addr} with {toc_names}, scope={scope}')
tid = await manager.api.restore(s.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names, scope)
status = await manager.api.wait_task(s.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
return snap_name,sstables
def compute_scope(topology, servers):
if topology.dcs > 1:
scope = 'dc'
r_servers = servers[:topology.dcs]
@@ -485,10 +462,11 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
scope = 'node'
r_servers = servers
await asyncio.gather(*(do_restore(s, sstables, scope) for s in r_servers))
return scope,r_servers
async def check_data_is_back(manager, logger, cql, ks, cf, keys, servers, topology, r_servers, host_ids, scope):
logger.info(f'Check the data is back')
async def collect_mutations(server, key):
async def collect_mutations(server):
host = await wait_for_cql_and_get_hosts(cql, [server], time.time() + 30)
await read_barrier(manager.api, server.ip_addr) # scylladb/scylladb#18199
ret = {}
@@ -498,7 +476,7 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
ret[frag.pk].append({'mutation_source': frag.mutation_source, 'partition_region': frag.partition_region, 'node': server.ip_addr})
return ret
by_node = await asyncio.gather(*(collect_mutations(s, k) for s in servers))
by_node = await asyncio.gather(*(collect_mutations(s) for s in servers))
mutations = {}
for node_frags in by_node:
for pk in node_frags:
@@ -507,7 +485,6 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
mutations[pk].append(node_frags[pk])
for k in random.sample(keys, 17):
real_rf = 0
if not k in mutations:
logger.info(f'{k} not found in mutations')
logger.info(f'Mutations: {mutations}')
@@ -531,6 +508,58 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server,
logger.info(f'{s.ip_addr} streamed to {streamed_to}, expected {scope_nodes}')
assert streamed_to == scope_nodes
@pytest.mark.asyncio
@pytest.mark.parametrize("topology_rf_validity", [
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
(topo(rf = 3, nodes = 6, racks = 3, dcs = 1), True),
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
])
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology_rf_validity):
'''Check that restoring of a cluster with stream scopes works'''
topology, rf_rack_valid_keyspaces = topology_rf_validity
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, s3_server)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
ks = 'ks'
cf = 'cf'
schema, keys, replication_opts = create_dataset(manager, ks, cf, topology, logger)
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
logger.info(f'Backup to {snap_name}')
prefix = f'{cf}/{snap_name}'
async def do_backup(s):
tid = await manager.api.backup(s.ip_addr, ks, cf, snap_name, s3_server.address, s3_server.bucket_name, prefix)
status = await manager.api.wait_task(s.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
await asyncio.gather(*(do_backup(s) for s in servers))
logger.info(f'Re-initialize keyspace')
cql.execute(f'DROP KEYSPACE {ks}')
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
cql.execute(schema)
logger.info(f'Restore')
async def do_restore(s, toc_names, scope):
logger.info(f'Restore {s.ip_addr} with {toc_names}, scope={scope}')
tid = await manager.api.restore(s.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names, scope)
status = await manager.api.wait_task(s.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
scope,r_servers = compute_scope(topology, servers)
await asyncio.gather(*(do_restore(s, sstables, scope) for s in r_servers))
await check_data_is_back(manager, logger, cql, ks, cf, keys, servers, topology, r_servers, host_ids, scope)
@pytest.mark.asyncio
async def test_restore_with_non_existing_sstable(manager: ManagerClient, s3_server):

View File

@@ -474,7 +474,7 @@ async def add_new_node(manager: ManagerClient,
yield
LOGGER.info("Add a new node to the cluster")
await manager.server_add(timeout=TOPOLOGY_TIMEOUT)
await manager.server_add(config={"rf_rack_valid_keyspaces": False}, timeout=TOPOLOGY_TIMEOUT)
yield

View File

@@ -67,7 +67,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
async def four_nodes_cluster(manager: ManagerClient) -> None:
LOGGER.info("Booting initial 4-node cluster.")
for _ in range(4):
server = await manager.server_add()
server = await manager.server_add(config={"rf_rack_valid_keyspaces": False})
await manager.api.enable_injection(
node_ip=server.ip_addr,
injection="raft_server_set_snapshot_thresholds",
@@ -93,6 +93,8 @@ async def test_random_failures(manager: ManagerClient,
TESTS_COUNT, TESTS_SHUFFLE_SEED, ERROR_INJECTIONS_COUNT, CLUSTER_EVENTS_COUNT,
)
rf_rack_cfg = {"rf_rack_valid_keyspaces": False}
table = await random_tables.add_table(ncolumns=5)
await table.insert_seq()
@@ -116,7 +118,7 @@ async def test_random_failures(manager: ManagerClient,
)
coordinator_log = await manager.server_open_log(server_id=coordinator.server_id)
coordinator_log_mark = await coordinator_log.mark()
s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED)
s_info = await manager.server_add(config=rf_rack_cfg, expected_server_up_state=ServerUpState.PROCESS_STARTED)
await coordinator_log.wait_for(
pattern="topology_coordinator_pause_after_updating_cdc_generation: waiting",
from_mark=coordinator_log_mark,
@@ -128,7 +130,7 @@ async def test_random_failures(manager: ManagerClient,
)
else:
s_info = await manager.server_add(
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]},
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]} | rf_rack_cfg,
expected_server_up_state=ServerUpState.PROCESS_STARTED,
)

View File

@@ -6,6 +6,7 @@ extra_scylla_config_options:
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
enable_user_defined_functions: False
rf_rack_valid_keyspaces: True
tablets_mode_for_new_keyspaces: enabled
run_first:
- test_raft_recovery_stuck

View File

@@ -3,7 +3,7 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.rest_client import inject_error
from test.pylib.rest_client import inject_error, read_barrier
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.cluster.util import wait_for_cdc_generations_publishing, \
@@ -14,6 +14,7 @@ from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module
import asyncio
import pytest
import logging
import time
@@ -64,6 +65,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
logger.info(f"Generations after second clearing attempt: {gen_ids}")
assert len(gen_ids) == 2 and first_gen_id in gen_ids
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
second_gen_id = max(gen_ids)
@@ -75,6 +77,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
logger.info(f"Generations after third clearing attempt: {gen_ids}")
assert len(gen_ids) == 1 and first_gen_id not in gen_ids and second_gen_id not in gen_ids
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
third_gen_id = max(gen_ids)
@@ -85,6 +88,7 @@ async def test_cdc_generation_clearing(manager: ManagerClient):
mark, gen_ids, hosts = await wait_for(tried_to_remove_new_gen, time.time() + 60)
logger.info(f"Generations after fourth clearing attempt: {gen_ids}")
assert len(gen_ids) == 1 and third_gen_id not in gen_ids
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
@@ -140,6 +144,7 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient)
mark = await log_file1.mark()
gen_ids = await get_gen_ids()
assert len(gen_ids) == 2 and first_gen_id not in gen_ids
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])
# Allow the CDC generation publisher to finish its job. One generation should remain.
@@ -147,4 +152,5 @@ async def test_unpublished_cdc_generations_arent_cleared(manager: ManagerClient)
await log_file1.wait_for(f"CDC generation publisher fiber has nothing to do. Sleeping.", mark)
gen_ids = await get_gen_ids()
assert len(gen_ids) == 1
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1, host2, host3])

View File

@@ -82,7 +82,7 @@ async def test_multiple_unpublished_cdc_generations(request, manager: ManagerCli
"""Test that the CDC generation publisher works correctly when there is more than one unpublished CDC generation."""
query_gen_timestamps = SimpleStatement(
"select time from system_distributed.cdc_generation_timestamps where key = 'timestamps'",
consistency_level = ConsistencyLevel.ONE)
consistency_level = ConsistencyLevel.ALL)
logger.info("Bootstrapping first node")
servers = [await manager.server_add()]

View File

@@ -20,7 +20,7 @@ from test.pylib.util import wait_for_cql_and_get_hosts, wait_for
from test.cluster.util import reconnect_driver
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_nodes_cluster
pytestmark = pytest.mark.prepare_3_racks_cluster
@@ -117,6 +117,9 @@ async def test_change_two(manager, random_tables, build_mode):
# IP-s before they are send back to servers[1] and servers[2],
# and the mentioned above code is not exercised by this test.
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
# sleep_before_start_gossiping injections are needed to reproduce #22777
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_start(servers[1].server_id)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
if build_mode != 'release':

View File

@@ -32,7 +32,7 @@ N_SERVERS = 2
@pytest.fixture
async def two_nodes_cluster(manager: ManagerClient) -> list[ServerNum]:
logger.info(f"Booting initial 2-nodes cluster")
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS)]
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS, auto_rack_dc="dc1")]
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
return servers

View File

@@ -42,7 +42,7 @@ async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureReq
"""
logger.info("Creating a new cluster")
srvs = await manager.servers_add(3)
srvs = await manager.servers_add(3, auto_rack_dc="dc1")
cql, _ = await manager.get_ready_cql(srvs)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};") as ks:

View File

@@ -15,7 +15,7 @@ from cassandra.query import SimpleStatement # type: ignore
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
logger = logging.getLogger(__name__)
@@ -39,7 +39,7 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
"""
cmdline = ["--hinted-handoff-enabled", "0", "--cache-hit-rate-read-balancing", "0", "--logger-log-level", "debug_error_injection=trace"]
nodes = await manager.servers_add(3, cmdline=cmdline)
nodes = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
node1, node2, node3 = nodes
@@ -47,19 +47,6 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
host1, host2, host3 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
def execute_with_tracing(cql, statement, *args, **kwargs):
kwargs['trace'] = True
query_result = cql.execute(statement, *args, **kwargs)
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
page_traces = []
for trace in tracing:
trace_events = []
for event in trace.events:
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
page_traces.append("\n".join(trace_events))
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = { 'enabled': true }") as ks:
cql.execute(f"CREATE TABLE {ks}.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
" WITH speculative_retry = 'NONE'"
@@ -67,9 +54,9 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
" AND compaction = {'class': 'NullCompactionStrategy'}")
for write_statement, delete_statement in statement_pairs:
execute_with_tracing(cql, write_statement.format(ks=ks))
execute_with_tracing(cql, write_statement.format(ks=ks), log = True)
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False)
execute_with_tracing(cql, delete_statement.format(ks=ks))
execute_with_tracing(cql, delete_statement.format(ks=ks), log = True)
await manager.api.disable_injection(node3.ip_addr, "database_apply")
def check_data(host, data):

View File

@@ -64,12 +64,15 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
cfg = {'tablets_mode_for_new_keyspaces' : 'enabled' if tablets_enabled else 'disabled'}
logger.info("Bootstrapping first two nodes")
servers = await manager.servers_add(2, config=cfg)
servers = await manager.servers_add(2, config=cfg, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"}
])
# The third node is started as the last one, so we can be sure that is has
# the latest topology version
logger.info("Bootstrapping the last node")
servers += [await manager.server_add(config=cfg)]
servers += [await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": "r3"})]
# Disable load balancer as it might bump topology version, undoing the decrement below.
# This should be done before adding the last two servers,
@@ -123,9 +126,10 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
@skip_mode('release', 'error injections are not supported in release mode')
async def test_fence_hints(request, manager: ManagerClient):
logger.info("Bootstrapping cluster with three nodes")
s0 = await manager.server_add(config={
'error_injections_at_startup': ['decrease_hints_flush_period']
}, cmdline=['--logger-log-level', 'hints_manager=trace'])
s0 = await manager.server_add(
config={'error_injections_at_startup': ['decrease_hints_flush_period']},
cmdline=['--logger-log-level', 'hints_manager=trace'],
property_file={"dc": "dc1", "rack": "r1"})
# Disable load balancer as it might bump topology version, potentially creating a race condition
# with read modify write below.
@@ -134,7 +138,10 @@ async def test_fence_hints(request, manager: ManagerClient):
# which the test relies on.
await manager.api.disable_tablet_balancing(s0.ip_addr)
[s1, s2] = await manager.servers_add(2)
[s1, s2] = await manager.servers_add(2, property_file=[
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"}
])
logger.info(f'Creating test table')
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)

View File

@@ -10,7 +10,7 @@ import uuid
import logging
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_nodes_cluster
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
@@ -21,8 +21,8 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
since ignore node is permanent now and B is removed from the quorum early so it is enough to
have two live nodes for the quorum.
"""
await manager.servers_add(2)
servers = await manager.running_servers()
servers += await manager.servers_add(2, property_file=[servers[1].property_file(), servers[2].property_file()])
await manager.server_stop_gracefully(servers[3].server_id)
await manager.server_stop_gracefully(servers[4].server_id)
# test that non existing uuid is rejected
@@ -37,6 +37,6 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
# is 2
await manager.server_stop_gracefully(servers[2].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(start=False, replace_cfg=replace_cfg)
await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[2].property_file())

View File

@@ -58,10 +58,10 @@ async def test_putget_2dc_with_rf(
table_name = "test_table_name"
columns = [Column("name", TextType), Column("value", TextType)]
logger.info("Create two servers in different DC's")
for i in nodes_list:
for rack_idx, dc_idx in enumerate(nodes_list):
s_info = await manager.server_add(
config=CONFIG,
property_file={"dc": f"dc{i}", "rack": "myrack"},
property_file={"dc": f"dc{dc_idx}", "rack": f"rack{rack_idx}"},
)
logger.info(s_info)
conn = manager.get_cql()

View File

@@ -23,14 +23,15 @@ async def test_not_enough_token_owners(manager: ManagerClient):
"""
logging.info('Trying to add a zero-token server as the first server in the cluster')
await manager.server_add(config={'join_ring': False},
property_file={"dc": "dc1", "rack": "rz"},
expected_error='Cannot start the first node in the cluster as zero-token')
logging.info('Adding the first server')
server_a = await manager.server_add()
server_a = await manager.server_add(property_file={"dc": "dc1", "rack": "r1"})
logging.info('Adding two zero-token servers')
# The second server is needed only to preserve the Raft majority.
server_b = (await manager.servers_add(2, config={'join_ring': False}))[0]
server_b = (await manager.servers_add(2, config={'join_ring': False}, property_file={"dc": "dc1", "rack": "rz"}))[0]
logging.info(f'Trying to decommission the only token owner {server_a}')
await manager.decommission_node(server_a.server_id,
@@ -47,7 +48,7 @@ async def test_not_enough_token_owners(manager: ManagerClient):
await manager.server_start(server_a.server_id)
logging.info('Adding a normal server')
await manager.server_add()
await manager.server_add(property_file={"dc": "dc1", "rack": "r2"})
cql = manager.get_cql()

Some files were not shown because too many files have changed in this diff Show More