Compare commits

...

61 Commits

Author SHA1 Message Date
Botond Dénes
19513fa47e Merge '[Backport 2025.2] raft_sys_table_storage: avoid temp buffer when deserializing log_entry' from Scylladb[bot]
The get_blob method linearizes data by copying it into a single buffer, which can cause 'oversized allocation' warnings.

In this commit we avoid copying by creating input stream on top of the original fragmened managed bytes, returned by untyped_result_set_row::get_view.

fixes scylladb/scylladb#23903

backport: no need, not a critical issue.

- (cherry picked from commit 6496ae6573)

- (cherry picked from commit f245b05022)

Parent PR: #24123

Closes scylladb/scylladb#24317

* github.com:scylladb/scylladb:
  raft_sys_table_storage: avoid temporary buffer when deserializing log_entry
  serializer_impl.hh:  add as_input_stream(managed_bytes_view) overload
2025-05-30 09:14:43 +03:00
Wojciech Mitros
dec10d348e test: actually wait for tablets to distribute across nodes
In test_tablet_mv_replica_pairing_during_replace, after we create
the tables, we want to wait for their tablets to distribute evenly
across nodes and we have a wait_for for that.
But we don't await this wait_for, so it's a no-op. This patch fixes
it by adding the missing await.

Refs scylladb/scylladb#23982
Refs scylladb/scylladb#23997

Closes scylladb/scylladb#24250

(cherry picked from commit 5074daf1b7)

Closes scylladb/scylladb#24311
2025-05-29 16:44:51 +02:00
Petr Gusev
ffea5e67c1 raft_sys_table_storage: avoid temporary buffer when deserializing log_entry
The get_blob() method linearizes data by copying it into a
single buffer, which can trigger "oversized allocation" warnings.
This commit avoids that extra copy by creating an input stream
directly over the original fragmented managed bytes returned by
untyped_result_set_row::get_view().

Fixes scylladb/scylladb#23903

(cherry picked from commit f245b05022)
2025-05-29 08:42:09 +00:00
Petr Gusev
bcbbc40026 serializer_impl.hh: add as_input_stream(managed_bytes_view) overload
It's useful to have it here so that people can find it easily.

(cherry picked from commit 6496ae6573)
2025-05-29 08:42:09 +00:00
Anna Stuchlik
70d9352cec doc: remove the redundant pages
This commit removes two redundant pages and adds the related redirections.

- The Tutorials page is a duplicate and is not maintained anymore.
  Having it in the docs hurts the SEO of the up-to-date Tutorias page.
- The Contributing page is not helpful. Contributions-related information
  should be maintained in the project README file.

Fixes https://github.com/scylladb/scylladb/issues/17279
Fixes https://github.com/scylladb/scylladb/issues/24060

Closes scylladb/scylladb#24090

(cherry picked from commit eed8373b77)

Closes scylladb/scylladb#24220
2025-05-26 10:30:03 +03:00
Pavel Emelyanov
e215350c61 Revert "encryption_test: Catch exact exception"
This reverts commit 59bf300e83.

KMS tests became flaky after it: #24218
Need to revisit.
2025-05-20 13:51:07 +03:00
Ernest Zaslavsky
59bf300e83 encryption_test: Catch exact exception
Apparently `test_kms_network_error` will succeed at any circumstances since most of our exceptions derive from `std::exception`, so whatever happens to the test, for whatever reason it will throw, the test will be marked as passed.

Start catching the exact exception that we expect to be thrown.

Closes scylladb/scylladb#24065

(cherry picked from commit 2d5c0f0cfd)

Closes scylladb/scylladb#24147
2025-05-20 08:27:56 +03:00
Aleksandra Martyniuk
6d733051de cql_test_env: main: move stream_manager initialization
Currently, stream_manager is initialized after storage_service and
so it is stopped before the storage_service is. In its stop method
storage_service accesses stream_manager which is uninitialized
at a time.

Move stream_manager initialization over the storage_service initialization.

Fixes: #23207.

Closes scylladb/scylladb#24008

(cherry picked from commit 9c03255fd2)

Closes scylladb/scylladb#24190
2025-05-20 08:27:26 +03:00
Ernest Zaslavsky
24c134992b database_test: Wait for the index to be created
Just call `wait_until_built` for the index in question

fix: https://github.com/scylladb/scylladb/issues/24059

Closes scylladb/scylladb#24117

(cherry picked from commit 4a7c847cba)

Closes scylladb/scylladb#24132
2025-05-19 12:08:41 +03:00
Wojciech Mitros
9247c9472a mv: remove queue length limit from the view update read concurrency semaphore
Each view update is correlated to a write that generates it (aside from view
building which is throttled separately). These writes are limited by a throttling
mechanism, which effectively works by performing the writes with CL=ALL if
ongoing writes exceed some memory usage limit

When writes generate view updates, they usually also need to perform a read. This read
goes through a read concurrency semaphore where it can get delayed or killed. The
semaphore allows up to 100 concurrent reads and puts all remaining reads in a queue.
If the number of queued reads exceeds a specific limit, the view update will fail on
the replica, causing inconsistencies.

This limit is not necessary. When a read gets queued on the semaphore, the write that's
causing the view update is paused, so the write takes part in the regular write throttling.
If too many writes get stuck on view update reads, they will get throttled, so their
number is limited and the number of queued reads is also limited to the same amount.

In this patch we remove the specified queue length limit for the view update read concurrency
semaphore. Instead of this limit, the queue will be now limited indirectly, by the base write
throttling mechanism. This may allow the queue grow longer than with the previous limit, but
it shouldn't ever cause issues - we only perform up to 100 actual reads at once, and the
remaining ones that get queued use a tiny amount of memory, less than the writes that generated
them and which are getting limited directly.

Fixes https://github.com/scylladb/scylladb/issues/23319

Closes scylladb/scylladb#24112

(cherry picked from commit 5920647617)

Closes scylladb/scylladb#24170
2025-05-19 12:05:48 +03:00
Anna Stuchlik
ab8d50b5e7 doc: fix the product name for version 2025.1
Starting with 2025.1, ScyllaDB versions are no longer called "Enterprise",
but the OS support page still uses that label.
This commit fixes that by replacing "Enterprise" with "ScyllaDB".

This update is required since we've removed "Enterprise" from everywhere else,
including the commands, so having it here is confusing.

Fixes https://github.com/scylladb/scylladb/issues/24179

Closes scylladb/scylladb#24181

(cherry picked from commit 2d7db0867c)

Closes scylladb/scylladb#24204
2025-05-19 12:03:35 +03:00
Dawid Mędrek
7986ef73da locator/production_snitch_base: Reduce log level when property file incomplete
We're reducing the log level in case the provided property file is incomplete.
The rationale behind this change is related to how CCM interacts with Scylla:

* The `GossipingPropertyFileSnitch` reloads the `cassandra-rackdc.properties`
  configuration every 60 seconds.
* When a new node is added to the cluster, CCM recreates the
  `cassandra-rackdc.properties` file for EVERY node.

If those two processes start happening at about the same time, it may lead
to Scylla trying to read a not-completely-recreated file, and an error will
be produced.

Although we would normally fix this issue and try to avoid the race, that
behavior will be no longer relevant as we're making the rack and DC values
immutable (cf. scylladb/scylladb#23278). What's more, trying to fix the problem
in the older versions of Scylla could bring a more serious regression. Having
that in mind, this commit is a compromise between making CI less flaky and
having minimal impact when backported.

We do the same for when the format of the file is invalid: the rationale
is the same.

We also do that for when there is a double declaration. Although it seems
impossible that this can stem from the same scenario the other two errors
can (since if the format of the file is valid, the error is justified;
if the format is invalid, it should be detected sooner than a doubled
declaration), let's stay consistent with the logging level.

Fixes scylladb/scylladb#20092

Closes scylladb/scylladb#23956

(cherry picked from commit 9ebd6df43a)

Closes scylladb/scylladb#24143
2025-05-16 11:51:23 +03:00
Wojciech Mitros
847504ad25 test_mv_tablets_replace: wait for tablet replicas to balance before working on them
In the test test_tablet_mv_replica_pairing_during_replace we stop 2 out of 4 servers while using RF=2.
Even though in the test we use exactly 4 tablets (1 for each replica of a base table and view), intially,
the tablets may not be split evenly between all nodes. Because of this, even when we chose a server that
hosts the view and a different server that hosts the base table, we sometimes stoped all replicas of the
base or the view table because the node with the base table replica may also be a view replica.

After some time, the tablets should be distributed across all nodes. When that happens, there will be
no common nodes with a base and view replica, so the test scenario will continue as planned.

In this patch, we add this waiting period after creating the base and view, and continue the test only
when all 4 tablets are on distinct nodes.

Fixes https://github.com/scylladb/scylladb/issues/23982
Fixes https://github.com/scylladb/scylladb/issues/23997

Closes scylladb/scylladb#24111

(cherry picked from commit bceb64fb5a)

Closes scylladb/scylladb#24126
2025-05-16 11:51:07 +03:00
Pavel Emelyanov
854587c10c Merge '[Backport 2025.2] test/cluster: Adjust tests to RF-rack-valid keyspaces' from Scylladb[bot]
In this PR, we're adjusting most of the cluster tests so that they pass
with the `rf_rack_valid_keyspaces` configuration option enabled. In most
cases, the changes are straightforward and require little to no additional
insight into what the tests are doing or verifying. In some, however, doing
that does require a deeper understanding of the tests we're modifying.
The justification for those changes and their correctness is included in
the commit messages corresponding to them.

Note that this PR does not cover all of the cluster tests. There are few
remaining ones, but they require a bit more effort, so we delegate that
work to a separate PR.

I tested all of the modified tests locally with `rf_rack_valid_keyspaces`
set to true, and they all passed.

Fixes scylladb/scylladb#23959

Backport: we want to backport these changes to 2025.1 since that's the version where we introduced RF-rack-valid keyspaces in. Although the tests are not, by default, run with `rf_rack_valid_keyspaces` enabled yet, that will most likely change in the near future and we'll also want to backport those changes too. The reason for this is that we want to verify that Scylla works correctly even with that constraint.

- (cherry picked from commit dbb8835fdf)

- (cherry picked from commit 9281bff0e3)

- (cherry picked from commit 5b83304b38)

- (cherry picked from commit 73b22d4f6b)

- (cherry picked from commit 2882b7e48a)

- (cherry picked from commit 4c46551c6b)

- (cherry picked from commit 92f7d5bf10)

- (cherry picked from commit 5d1bb8ebc5)

- (cherry picked from commit d3c0cd6d9d)

- (cherry picked from commit 04567c28a3)

- (cherry picked from commit c8c28dae92)

- (cherry picked from commit c4b32c38a3)

- (cherry picked from commit ee96f8dcfc)

Parent PR: #23661

Closes scylladb/scylladb#24121

* github.com:scylladb/scylladb:
  test/cluster/suite.yaml: Enable rf_rack_valid_keyspaces in suite
  test/cluster: Disable rf_rack_valid_keyspaces in problematic tests
  test/cluster/test_tablets: Divide rack into two to adjust tests to RF-rack-validity
  test/cluster/test_tablets: Adjust test_tablet_rf_change to RF-rack-validity
  test/cluster/test_tablet_repair_scheduler.py: Adjust to RF-rack-validity
  test/pylib/repair.py: Assign nodes to multiple racks in create_table_insert_data_for_repair
  test/cluster/test_zero_token_nodes_topology_ops: Adjust to RF-rack-validity
  test/cluster/test_zero_token_nodes_no_replication.py: Adjust to RF-rack-validity
  test/cluster/test_zero_token_nodes_multidc.py: Adjust to RF-rack-validity
  test/cluster/test_not_enough_token_owners.py: Adjust to RF-rack-validity
  test/cluster/test_multidc.py: Adjust to RF-rack-validity
  test/cluster/object_store/test_backup.py: Adjust to RF-rack-validity
  test/cluster: Adjust simple tests to RF-rack-validity
2025-05-16 11:50:45 +03:00
Pavel Emelyanov
9058d5658b Merge '[Backport 2025.2] logalloc_test: don't test performance in test background_reclaim' from Scylladb[bot]
The test is failing in CI sometimes due to performance reasons.

There are at least two problems:
1. The initial 500ms (wall time) sleep might be too short. If the reclaimer
   doesn't manage to evict enough memory during this time, the test will fail.
2. During the 100ms (thread CPU time) window given by the test to background
   reclaim, the `background_reclaim` scheduling group isn't actually
   guaranteed to get any CPU, regardless of shares. If the process is
   switched out inside the `background_reclaim` group, it might
   accumulate so much vruntime that it won't get any more CPU again
   for a long time.

We have seen both.

This kind of timing test can't be run reliably on overcommitted machines
without modifying the Seastar scheduler to support that (by e.g. using
thread clock instead of wall time clock in the scheduler), and that would
require an amount of effort disproportionate to the value of the test.

So for now, to unflake the test, this patch removes the performance test
part. (And the tradeoff is a weakening of the test). After the patch,
we only check that the background reclaim happens *eventually*.

Fixes https://github.com/scylladb/scylladb/issues/15677

Backporting this is optional. The test is flaky even in stable branches, but the failure is rare.

- (cherry picked from commit c47f438db3)

- (cherry picked from commit 1c1741cfbc)

Parent PR: #24030

Closes scylladb/scylladb#24094

* github.com:scylladb/scylladb:
  logalloc_test: don't test performance in test `background_reclaim`
  logalloc: make background_reclaimer::free_memory_threshold publicly visible
2025-05-16 11:50:17 +03:00
Gleb Natapov
dd9ec03323 topology coordinator: make decommissioning node non voter before completing the operation
A decommissioned node is removed from a raft config after operation is
marked as completed. This is required since otherwise the decommissioned
node will not see that decommission has completed (the status is
propagated through raft). But right after the decommission is marked as
completed a decommissioned node may terminate, so in case of a two node
cluster, the configuration change that removes it from the raft will fail,
because there will no be quorum.

The solution is to mark the decommissioning node as non voter before
reporting the operation as completed.

Fixes: #24026

Backport to 2025.2 because it fixes a potential hang. Don't backport to
branches older than 2025.2 because they don't have
8b186ab0ff, which caused this issue.

Closes scylladb/scylladb#24027

(cherry picked from commit c6e1758457)

Closes scylladb/scylladb#24093
2025-05-16 11:49:46 +03:00
Pavel Emelyanov
eef6a95e26 Merge '[Backport 2025.2] replica: Fix use-after-free with concurrent schema change and sstable set update' from Scylladb[bot]
When schema is changed, sstable set is updated according to the compaction strategy of the new schema (no changes to set are actually made, just the underlying set type is updated), but the problem is that it happens without a lock, causing a use-after-free when running concurrently to another set update.

Example:

1) A: sstable set is being updated on compaction completion
2) B: schema change updates the set (it's non deferring, so it happens in one go) and frees the set used by A.
3) when A resumes, system will likely crash since the set is freed already.

ASAN screams about it:
SUMMARY: AddressSanitizer: heap-use-after-free sstables/sstable_set.cc ...

Fix is about deferring update of the set on schema change to compaction, which is triggered after new schema is set. Only strategy state and backlog tracker are updated immediately, which is fine since strategy doesn't depend on any particular implementation of sstable set.

Fixes #22040.

- (cherry picked from commit 628bec4dbd)

- (cherry picked from commit 434c2c4649)

Parent PR: #23680

Closes scylladb/scylladb#24085

* github.com:scylladb/scylladb:
  replica: Fix use-after-free with concurrent schema change and sstable set update
  sstables: Implement sstable_set_impl::all_sstable_runs()
2025-05-16 11:49:21 +03:00
Piotr Smaron
9f2a13c8c2 cql: fix CREATE tablets KS warning msg
Materialized Views and Secondary Indexes are yet another features that
keyspaces with tablets do not support, but these were not listed in a
warning message returned to the user on CREATE KEYSPACE statement. This
commit adds the 2 missing features.

Fixes: #24006

Closes scylladb/scylladb#23902

(cherry picked from commit f740f9f0e1)

Closes scylladb/scylladb#24084
2025-05-16 11:49:00 +03:00
Piotr Dulikowski
4792a27396 topology_coordinator: silence ERROR messages on abort
When the topology coordinator is shut down while doing a long-running
operation, the current operation might throw a raft::request_aborted
exception. This is not a critical issue and should not be logged with
ERROR verbosity level.

Make sure that all the try..catch blocks in the topology coordinator
which:

- May try to acquire a new group0 guard in the `try` part
- Have a `catch (...)` block that print an ERROR-level message

...have a pass-through `catch (raft::request_aborted&)` block which does
not log the exception.

Fixes: scylladb/scylladb#22649

Closes scylladb/scylladb#23962

(cherry picked from commit 156ff8798b)

Closes scylladb/scylladb#24082
2025-05-16 11:48:43 +03:00
Aleksandra Martyniuk
f26c2b22dc test_tablet_repair_hosts_filter: change injected error
test_tablet_repair_hosts_filter checks whether the host filter
specfied for tablet repair is correctly persisted. To check this,
we need to ensure that the repair is still ongoing and its data
is kept. The test achieves that by failing the repair on replica
side - as the failed repair is going to be retried.

However, if the filter does not contain any host (included_host_count = 0),
the repair is started on no replica, so the request succeeds
and its data is deleted. The test fails if it checks the filter
after repair request data is removed.

Fail repair on topology coordinator side, so the request is ongoing
regardless of the specified hosts.

Fixes: #23986.

Closes scylladb/scylladb#24003

(cherry picked from commit 2549f5e16b)

Closes scylladb/scylladb#24080
2025-05-16 11:48:27 +03:00
Botond Dénes
163b65cec4 tools/scylla-nodetool: status: handle negative load sizes
Negative load sizes don't make sense, but we've seen a case in
production, where a negative number was returned by ScyllaDB REST API,
so be prepared to handle these too.

Fixes: scylladb/scylladb#24134

Closes scylladb/scylladb#24135

(cherry picked from commit 700a5f86ed)

Closes scylladb/scylladb#24169
2025-05-15 17:36:48 +03:00
Aleksandra Martyniuk
fcde30d2b0 streaming: use host_id in file streaming
Use host ids instead of ips in file-streaming.

Fixes: #22421.

Closes scylladb/scylladb#24055

(cherry picked from commit 2dcea5a27d)

Closes scylladb/scylladb#24119
2025-05-14 22:13:48 +02:00
Jenkins Promoter
26bd28dac9 Update ScyllaDB version to: 2025.2.0-rc2 2025-05-14 20:59:54 +03:00
Jenkins Promoter
6f1efcff31 Update ScyllaDB version to: 2025.2.0-rc1 2025-05-13 22:48:32 +03:00
Dawid Mędrek
204f9e2cc8 test/cluster/suite.yaml: Enable rf_rack_valid_keyspaces in suite
Almost all of the tests have been adjusted to be able to be run with
the `rf_rack_valid_keyspaces` configuration option enabled, while
the rest, a minority, create nodes with it disabled. Thanks to that,
we can enable it by default, so let's do that.

(cherry picked from commit ee96f8dcfc)
2025-05-12 23:11:34 +02:00
Dawid Mędrek
0c6a449a30 test/cluster: Disable rf_rack_valid_keyspaces in problematic tests
Some of the tests in the test suite have proven to be more problematic
in adjusting to RF-rack-validity. Since we'd like to run as many tests
as possible with the `rf_rack_valid_keyspaces` configuration option
enabled, let's disable it in those. In the following commit, we'll enable
it by default.

(cherry picked from commit c4b32c38a3)
2025-05-12 23:11:30 +02:00
Botond Dénes
7673a17365 Merge 'compress: fix an internal error when a specific debug log is enabled' from Michał Chojnowski
compress: fix an internal error when a specific debug log is enabled
While iterating over the recent 69684e16d8,
series I shot myself in the foot by defining `algorithm_to_name(algorithm::none)`
to be an internal error, and later calling that anyway in a debug log.

(Tests didn't catch it because there's no test which simultaneously
enables the debug log and configures some table to have no compression).

This proves that `algorithm_to_name` is too much of a footgun.
Fix it so that calling `algorithm_to_name(algorithm::none)` is legal.
In hindsight, I should have done that immediately.

Fixes #23624

Fix for recently-added code, no backporting needed.

Closes scylladb/scylladb#23625

* github.com:scylladb/scylladb:
  test_sstable_compression_dictionaries: reproduce an internal error in debug logging
  compress: fix an internal error when a specific debug log is enabled

(cherry picked from commit 746382257c)
2025-05-12 23:13:59 +03:00
Avi Kivity
ae05d62b97 Merge '[Backport 2025.2] compress: make sstable compression dictionaries NUMA-aware ' from Scylladb[bot]
compress: distribute compression dictionaries over shards
We don't want each shard to have its own copy of each dictionary.
It would unnecessary pressure on cache and memory.
Instead, we want to share dictionaries between shards.

Before this commit, all dictionaries live on shard 0.
All other shards borrow foreign shared pointers from shard 0.

There's a problem with this setup: dictionary blobs receive many random
accesses. If shard 0 is on a remote NUMA node, this could pose
a performance problem.

Therefore, for each dictionary, we would like to have one copy per NUMA node,
not one copy per the entire machine. And each shard should use the copy
belonging to its own NUMA node. This is the main goal of this patch.

There is another issue with putting all dicts on shard 0: it eats
an assymetric amount of memory from shard 0.
This commit spreads the ownership of dicts over all shards within
the NUMA group, to make the situation more symmetric.
(Dict owner is decided based on the hash of dict contents).

It should be noted that the last part isn't necessarily a good thing,
though.
While it makes the situation more symmetric within each node,
it makes it less symmetric across the cluster, if different node
sizes are present.

If dicts occupy 1% of memory on each shard of a 100-shard node,
then the same dicts would occupy 100% of memory on a 1-shard node.

So for the sake of cluster-wide symmetry, we might later want to consider
e.g. making the memory limit for dictionaries inversely proportional
to the number of shards.

New functionality, added to a feature which isn't in any stable branch yet. No backporting.

Edit: no backporting to <=2025.1, but need backporting to 2025.2, where the feature is introduced.

Fixes #24108

- (cherry picked from commit 0e4d0ded8d)

- (cherry picked from commit 8649adafa8)

- (cherry picked from commit 1bcf77951c)

- (cherry picked from commit 6b831aaf1b)

- (cherry picked from commit e952992560)

- (cherry picked from commit 66a454f61d)

- (cherry picked from commit 518f04f1c4)

- (cherry picked from commit f075674ebe)

Parent PR: #23590

Closes scylladb/scylladb#24109

* github.com:scylladb/scylladb:
  test: add test/boost/sstable_compressor_factory_test
  compress: add some test-only APIs
  compress: rename sstable_compressor_factory_impl to dictionary_holder
  compress: fix indentation
  compress: remove sstable_compressor_factory_impl::_owner_shard
  compress: distribute compression dictionaries over shards
  test: switch uses of make_sstable_compressor_factory() to a seastar::thread-dependent version
  test: remove sstables::test_env::do_with()
2025-05-12 23:11:12 +03:00
Dawid Mędrek
5c5911d874 test/cluster/test_tablets: Divide rack into two to adjust tests to RF-rack-validity
Three tests in the file use a multi-DC cluster. Unfortunately, they put
all of the nodes in a DC in the same rack and because of that, they fail
when run with the `rf_rack_valid_keyspaces` configuration option enabled.
Since the tests revolve mostly around zero-token nodes and how they
affect replication in a keyspace, this change should have zero impact on
them.

(cherry picked from commit c8c28dae92)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
6a2e52d250 test/cluster/test_tablets: Adjust test_tablet_rf_change to RF-rack-validity
We reduce the number of nodes and the RF values used in the test
to make sure that the test can be run with the `rf_rack_valid_keyspaces`
configuration option. The test doesn't seem to be reliant on the
exact number of nodes, so the reduction should not make any difference.

(cherry picked from commit 04567c28a3)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
f98c83b92f test/cluster/test_tablet_repair_scheduler.py: Adjust to RF-rack-validity
The change boils down to matching the number of created racks to the number
of created nodes in each DC in the auxiliary function `prepare_multi_dc_repair`.
This way, we ensure that the created keyspace will be RF-rack-valid and so
we can run the test file even with the `rf_rack_valid_keyspaces` configuration
option enabled.

The change has no impact on the tests that use the function; the distribution
of nodes across racks does not affect how repair is performed or what the
tests do and verify. Because of that, the change is correct.

(cherry picked from commit d3c0cd6d9d)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
f5cf4a3893 test/pylib/repair.py: Assign nodes to multiple racks in create_table_insert_data_for_repair
We assign the newly created nodes to multiple racks. If RF <= 3,
we create as many racks as the provided RF. We disallow the case
of  RF > 3 to avoid trying to create an RF-rack-invalid keyspace;
note that no existing test calls `create_table_insert_data_for_repair`
providing a higher RF. The rationale for doing this is we want to ensure
that the tests calling the function can be run with the
`rf_rack_valid_keyspaces` configuration option enabled.

(cherry picked from commit 5d1bb8ebc5)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
12f0136b26 test/cluster/test_zero_token_nodes_topology_ops: Adjust to RF-rack-validity
We assign the nodes to the same DC, but multiple racks to ensure that
the created keyspace is RF-rack-valid and we can run the test with
the `rf_rack_valid_keyspaces` configuration option enabled. The changes
do not affect what the test does and verifies.

(cherry picked from commit 92f7d5bf10)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
4e45ceda21 test/cluster/test_zero_token_nodes_no_replication.py: Adjust to RF-rack-validity
We simply assign the nodes used in the test to seprate racks to
ensure that the created keyspace is RF-rack-valid to be able
to run the test with the `rf_rack_valid_keyspaces` configuration
option set to true. The change does not affect what the test
does and verifies -- it only depends on the type of nodes,
whether they are normal token owners or not -- and so the changes
are correct in that sense.

(cherry picked from commit 4c46551c6b)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
2c8b5143ba test/cluster/test_zero_token_nodes_multidc.py: Adjust to RF-rack-validity
We parameterize the test so it's run with and without enforced
RF-rack-valid keyspaces. In the test itself, we introduce a branch
to make sure that we won't run into a situation where we're
attempting to create an RF-rack-invalid keyspace.

Since the `rf_rack_valid_keyspaces` option is not commonly used yet
and because its semantics will most likely change in the future, we
decide to parameterize the test rather than try to get rid of some
of the test cases that are problematic with the option enabled.

(cherry picked from commit 2882b7e48a)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
474de0f048 test/cluster/test_not_enough_token_owners.py: Adjust to RF-rack-validity
We simply assign DC/rack properties to every node used in the test.
We put all of them in the same DC to make sure that the cluster behaves
as closely to how it would before these changes. However, we distribute
them over multiple racks to ensure that the keyspace used in the test
is RF-rack-valid, so we can also run it with the `rf_rack_valid_keyspaces`
configuration option set to true. The distribution of nodes between racks
has no effect on what the test does and verifies, so the changes are
correct in that sense.

(cherry picked from commit 73b22d4f6b)
2025-05-12 13:10:12 +00:00
Dawid Mędrek
5ac07a6c72 test/cluster/test_multidc.py: Adjust to RF-rack-validity
Instead of putting all of the nodes in a DC in the same rack
in `test_putget_2dc_with_rf`, we assign them to different racks.
The distribution of nodes in racks is orthogonal to what the test
is doing and verifying, so the change is correct in that sense.
At the same time, it ensures that the test never violates the
invariant of RF-rack-valid keyspaces, so we can also run it
with `rf_rack_valid_keyspaces` set to true.

(cherry picked from commit 5b83304b38)
2025-05-12 13:10:11 +00:00
Dawid Mędrek
f88d8edcaf test/cluster/object_store/test_backup.py: Adjust to RF-rack-validity
We modify the parameters of `test_restore_with_streaming_scopes`
so that it now represents a pair of values: topology layout and
the value `rf_rack_valid_keyspaces` should be set to.

Two of the already existing parameters violate RF-rack-validity
and so the test would fail when run with `rf_rack_valid_keyspaces: true`.
However, since the option isn't commonly used yet and since the
semantics of RF-rack-valid keyspaces will most likely change in
the future, let's keep those cases and just run them with the
option disabled. This way, we still test everything we can
without running into undesired failures that don't indicate anything.

(cherry picked from commit 9281bff0e3)
2025-05-12 13:10:11 +00:00
Dawid Mędrek
05c70b0820 test/cluster: Adjust simple tests to RF-rack-validity
We adjust all of the simple cases of cluster tests so they work
with `rf_rack_valid_keyspaces: true`. It boils down to assigning
nodes to multiple racks. For most of the changes, we do that by:

* Using `pytest.mark.prepare_3_racks_cluster` instead of
  `pytest.mark.prepare_3_nodes_cluster`.
* Using an additional argument -- `auto_rack_dc` -- when calling
  `ManagerClient::servers_add()`.

In some cases, we need to assign the racks manually, which may be
less obvious, but in every such situation, the tests didn't rely
on that assignment, so that doesn't affect them or what they verify.

(cherry picked from commit dbb8835fdf)
2025-05-12 13:10:11 +00:00
Michał Chojnowski
732321e3b8 test: add test/boost/sstable_compressor_factory_test
Add a basic test for NUMA awareness of `default_sstable_compressor_factory`.

(cherry picked from commit f075674ebe)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
a2622e1919 compress: add some test-only APIs
Will be needed by the test added in the next patch.

(cherry picked from commit 518f04f1c4)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
270bf34846 compress: rename sstable_compressor_factory_impl to dictionary_holder
Since sstable_compressor_factory_impl no longer
implements sstable_compressor_factory, the name can be
misleading. Rename it to something closer to its new role.

(cherry picked from commit 66a454f61d)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
168f694c5d compress: fix indentation
Purely cosmetic.

(cherry picked from commit e952992560)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
b5579be915 compress: remove sstable_compressor_factory_impl::_owner_shard
Before the series, sstable_compressor_factory_impl was directly
accessed by multiple shards. Now, it's a part of a `sharded`
data structure and is never directly from other shards,
so there's no need to check for that. Remove the leftover logic.

(cherry picked from commit 6b831aaf1b)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
ad60d765f9 compress: distribute compression dictionaries over shards
We don't want each shard to have its own copy of each dictionary.
It would unnecessary pressure on cache and memory.
Instead, we want to share dictionaries between shards.

Before this commit, all dictionaries live on shard 0.
All other shards borrow foreign shared pointers from shard 0.

There's a problem with this setup: dictionary blobs receive many random
accesses. If shard 0 is on a remote NUMA node, this could pose
a performance problem.

Therefore, for each dictionary, we would like to have one copy per NUMA node,
not one copy per the entire machine. And each shard should use the copy
belonging to its own NUMA node. This is the main goal of this patch.

There is another issue with putting all dicts on shard 0: it eats
an assymetric amount of memory from shard 0.
This commit spreads the ownership of dicts over all shards within
the NUMA group, to make the situation more symmetric.
(Dict owner is decided based on the hash of dict contents).

It should be noted that the last part isn't necessarily a good thing,
though.
While it makes the situation more symmetric within each node,
it makes it less symmetric across the cluster, if different node
sizes are present.

If dicts occupy 1% of memory on each shard of a 100-shard node,
then the same dicts would occupy 100% of memory on a 1-shard node.

So for the sake of cluster-wide symmetry, we might later want to consider
e.g. making the memory limit for dictionaries inversely proportional
to the number of shards.

(cherry picked from commit 1bcf77951c)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
68d2086fa5 test: switch uses of make_sstable_compressor_factory() to a seastar::thread-dependent version
In next patches, make_sstable_compressor_factory() will have to
disappear.
In preparation for that, we switch to a seastar::thread-dependent
replacement.

(cherry picked from commit 8649adafa8)
2025-05-12 09:12:05 +00:00
Michał Chojnowski
403d43093f test: remove sstables::test_env::do_with()
`sstable_manager` depends on `sstable_compressor_factory&`.
Currently, `test_env` obtains an implementation of this
interface with the synchronous `make_sstable_compressor_factory()`.

But after this patch, the only implementation of that interface
`sstable_compressor_factory&` will use `sharded<...>`,
so its construction will become asynchronous,
and the synchronous `make_sstable_compressor_factory()` must disappear.

There are several possible ways to deal with this, but I think the
easiest one is to write an asynchronous replacement for
`make_sstable_compressor_factory()`
that will keep the same signature but will be only usable
in a `seastar::thread`.

All other uses of `make_sstable_compressor_factory()` outside of
`test_env::do_with()` already are in seastar threads,
so if we just get rid of `test_env::do_with()`, then we will
be able to use that thread-dependent replacement. This is the
purpose of this commit.

We shouldn't be losing much.

(cherry picked from commit 0e4d0ded8d)
2025-05-12 09:12:04 +00:00
Patryk Jędrzejczak
2b1b4d1dfc Merge '[Backport 2025.2] Correctly skip updating node's own ip address due to oudated gossiper data ' from Scylladb[bot]
Used host id to check if the update is for the node itself. Using IP is unreliable since if a node is restarted with different IP a gossiper message with previous IP can be misinterpreted as belonging to a different node.

Fixes: #22777

Backport to 2025.1 since this fixes a crash. Older version do not have the code.

- (cherry picked from commit a2178b7c31)

- (cherry picked from commit ecd14753c0)

- (cherry picked from commit 7403de241c)

Parent PR: #24000

Closes scylladb/scylladb#24089

* https://github.com/scylladb/scylladb:
  test: add reproducer for #22777
  storage_service: Do not remove gossiper entry on address change
  storage_service: use id to check for local node
2025-05-12 09:31:20 +02:00
Michał Chojnowski
a5b513dde7 logalloc_test: don't test performance in test background_reclaim
The test is failing in CI sometimes due to performance reasons.

There are at least two problems:
1. The initial 500ms (wall time) sleep might be too short. If the reclaimer
   doesn't manage to evict enough memory during this time, the test will fail.
2. During the 100ms (thread CPU time) window given by the test to background
   reclaim, the `background_reclaim` scheduling group isn't actually
   guaranteed to get any CPU, regardless of shares. If the process is
   switched out inside the `background_reclaim` group, it might
   accumulate so much vruntime that it won't get any more CPU again
   for a long time.

We have seen both.

This kind of timing test can't be run reliably on overcommitted machines
without modifying the Seastar scheduler to support that (by e.g. using
thread clock instead of wall time clock in the scheduler), and that would
require an amount of effort disproportionate to the value of the test.

So for now, to unflake the test, this patch removes the performance test
part. (And the tradeoff is a weakening of the test).

(cherry picked from commit 1c1741cfbc)
2025-05-09 16:12:22 +00:00
Michał Chojnowski
2c431c1ea2 logalloc: make background_reclaimer::free_memory_threshold publicly visible
Wanted by the change to the background_reclaim test in the next patch.

(cherry picked from commit c47f438db3)
2025-05-09 16:12:22 +00:00
Gleb Natapov
827563902c test: add reproducer for #22777
Add sleep before starting gossiper to increase a chance of getting old
gossiper entry about yourself before updating local gossiper info with
new IP address.

(cherry picked from commit 7403de241c)
2025-05-09 12:56:15 +00:00
Gleb Natapov
ccf194bd89 storage_service: Do not remove gossiper entry on address change
When gossiper indexed entries by ip an old entry had to be removed on an
address change, but the index is id based, so even if ip was change the
entry should stay. Gossiper simply updates an ip address there.

(cherry picked from commit ecd14753c0)
2025-05-09 12:56:15 +00:00
Gleb Natapov
9b735bb4dc storage_service: use id to check for local node
IP may change and an old gossiper message with previous IP may be
processed when it shouldn't.

Fixes: #22777
(cherry picked from commit a2178b7c31)
2025-05-09 12:56:15 +00:00
Michał Chojnowski
f29b87970a test/boost/mvcc_test: fix an overly-strong assertion in test_snapshot_cursor_is_consistent_with_merging
The test checks that merging the partition versions on-the-fly using the
cursor gives the same results as merging them destructively with apply_monotonically.

In particular, it tests that the continuity of both results is equal.
However, there's a subtlety which makes this not true.
The cursor puts empty dummy rows (i.e. dummies shadowed by the partition
tombstone) in the output.
But the destructive merge is allowed (as an expection to the general
rule, for optimization reasons), to remove those dummies and thus reduce
the continuity.

So after this patch we instead check that the output of the cursor
has continuity equal to the merged continuities of version.
(Rather than to the continuity of merged versions, which can be
smaller as described above).

Refs https://github.com/scylladb/scylladb/pull/21459, a patch which did
the same in a different test.
Fixes https://github.com/scylladb/scylladb/issues/13642

Closes scylladb/scylladb#24044

(cherry picked from commit 746ec1d4e4)

Closes scylladb/scylladb#24083
2025-05-09 13:00:34 +02:00
Raphael S. Carvalho
82ca17e70d replica: Fix use-after-free with concurrent schema change and sstable set update
When schema is changed, sstable set is updated according to the compaction
strategy of the new schema (no changes to set are actually made, just
the underlying set type is updated), but the problem is that it happens
without a lock, causing a use-after-free when running concurrently to
another set update.

Example:

1) A: sstable set is being updated on compaction completion
2) B: schema change updates the set (it's non deferring, so it
happens in one go) and frees the set used by A.
3) when A resumes, system will likely crash since the set is freed
already.

ASAN screams about it:
SUMMARY: AddressSanitizer: heap-use-after-free sstables/sstable_set.cc ...

Fix is about deferring update of the set on schema change to compaction,
which is triggered after new schema is set. Only strategy state and
backlog tracker are updated immediately, which is fine since strategy
doesn't depend on any particular implementation of sstable set, since
patch "sstables: Implement sstable_set_impl::all_sstable_runs()".

Fixes #22040.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 434c2c4649)
2025-05-09 05:57:40 +00:00
Raphael S. Carvalho
ddf9d047db sstables: Implement sstable_set_impl::all_sstable_runs()
With upcoming change where table::set_compaction_strategy() might delay
update of sstable set, ICS might temporarily work with sstable set
implementations other than partitioned_sstable_set. ICS relies on
all_sstable_runs() during regular compaction, and today it triggers
bad_function_call exception if not overriden by set implementation.
To remove this strong dependency between compaction strategy and
a particular set implementation, let's provide a default implementation
of all_sstable_runs(), such that ICS will still work until the set
is updated eventually through a process that adds or remove a
sstable.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 628bec4dbd)
2025-05-09 05:57:40 +00:00
Botond Dénes
17a76b6264 Merge '[Backport 2025.2] test/cluster/test_read_repair.py: improve trace logging test (again)' from Scylladb[bot]
The test test_read_repair_with_trace_logging wants to test read repair with trace logging. Turns out that node restart + trace-level logging + debug mode is too much and even with 1 minute timeout, the read repair     times out sometimes. Refactor the test to use injection point instead of restart. To make sure the test still tests what it supposed to test, use tracing to assert that read repair did indeed happen.

Fixes: scylladb/scylladb#23968

Needs backport to 2025.1 and 6.2, both have the flaky test

- (cherry picked from commit 51025de755)

- (cherry picked from commit 29eedaa0e5)

Parent PR: #23989

Closes scylladb/scylladb#24051

* github.com:scylladb/scylladb:
  test/cluster/test_read_repair.py: improve trace logging test (again)
  test/cluster: extract execute_with_tracing() into pylib/util.py
2025-05-08 11:01:18 +03:00
Aleksandra Martyniuk
ab45df1aa1 streaming: skip dropped tables
Currently, stream_session::prepare throws when a table in requests
or summaries is dropped. However, we do not want to fail streaming
if the table is dropped.

Delete table checks from stream_session::prepare. Further streaming
steps can handle the dropped table and finish the streaming successfully.

Fixes: #15257.

Closes scylladb/scylladb#23915

(cherry picked from commit 20c2d6210e)

Closes scylladb/scylladb#24053
2025-05-08 11:00:27 +03:00
Botond Dénes
97f0f312e0 test/cluster/test_read_repair.py: improve trace logging test (again)
The test test_read_repair_with_trace_logging wants to test read repair
with trace logging. Turns out that node restart + trace-level logging
+ debug mode is too much and even with 1 minute timeout, the read repair
times out sometimes.
Refactor the test to use injection point instead of restart. To make
sure the test still tests what it supposed to test, use tracing to
assert that read repair did indeed happen.

(cherry picked from commit 29eedaa0e5)
2025-05-07 13:26:08 +00:00
Botond Dénes
4df6a17d30 test/cluster: extract execute_with_tracing() into pylib/util.py
To allow reuse in other tests.

(cherry picked from commit 51025de755)
2025-05-07 13:26:08 +00:00
Anna Mikhlin
b3dbfaf27a Update ScyllaDB version to: 2025.2.0-rc0 2025-05-07 11:41:33 +03:00
91 changed files with 1086 additions and 570 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2025.2.0-dev
VERSION=2025.2.0-rc2
if test -f version
then

View File

@@ -15,6 +15,8 @@
#include <seastar/core/metrics.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/weak_ptr.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/reactor.hh>
#include "utils/reusable_buffer.hh"
#include "sstables/compress.hh"
#include "sstables/exceptions.hh"
@@ -27,7 +29,7 @@
// SHA256
using dict_id = std::array<std::byte, 32>;
class sstable_compressor_factory_impl;
class dictionary_holder;
static seastar::logger compressor_factory_logger("sstable_compressor_factory");
@@ -41,11 +43,11 @@ template <> struct fmt::formatter<compression_parameters::algorithm> : fmt::form
// raw dicts might be used (and kept alive) directly by compressors (in particular, lz4 decompressor)
// or referenced by algorithm-specific dicts.
class raw_dict : public enable_lw_shared_from_this<raw_dict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
dict_id _id;
std::vector<std::byte> _dict;
public:
raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict);
raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict);
~raw_dict();
const std::span<const std::byte> raw() const { return _dict; }
dict_id id() const { return _id; }
@@ -79,13 +81,13 @@ struct zstd_callback_allocator {
// (which internally holds a pointer to the raw dictionary blob
// and parsed entropy tables).
class zstd_ddict : public enable_lw_shared_from_this<zstd_ddict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
size_t _used_memory = 0;
zstd_callback_allocator _alloc;
std::unique_ptr<ZSTD_DDict, decltype(&ZSTD_freeDDict)> _dict;
public:
zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
~zstd_ddict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -100,14 +102,14 @@ public:
// so the level of compression is decided at the time of construction
// of this dict.
class zstd_cdict : public enable_lw_shared_from_this<zstd_cdict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
int _level;
size_t _used_memory = 0;
zstd_callback_allocator _alloc;
std::unique_ptr<ZSTD_CDict, decltype(&ZSTD_freeCDict)> _dict;
public:
zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level);
zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level);
~zstd_cdict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -119,11 +121,11 @@ public:
// and a hash index over the substrings of the blob).
//
class lz4_cdict : public enable_lw_shared_from_this<lz4_cdict> {
weak_ptr<sstable_compressor_factory_impl> _owner;
weak_ptr<dictionary_holder> _owner;
lw_shared_ptr<const raw_dict> _raw;
std::unique_ptr<LZ4_stream_t, decltype(&LZ4_freeStream)> _dict;
public:
lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw);
lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw);
~lz4_cdict();
auto dict() const { return _dict.get(); }
auto raw() const { return _raw->raw(); }
@@ -164,6 +166,7 @@ public:
size_t compress_max_size(size_t input_len) const override;
std::map<sstring, sstring> options() const override;
algorithm get_algorithm() const override;
std::optional<unsigned> get_dict_owner_for_test() const override;
};
class snappy_processor: public compressor {
@@ -266,6 +269,7 @@ public:
size_t compress_max_size(size_t input_len) const override;
algorithm get_algorithm() const override;
std::map<sstring, sstring> options() const override;
std::optional<unsigned> get_dict_owner_for_test() const override;
};
zstd_processor::zstd_processor(const compression_parameters& opts, cdict_ptr cdict, ddict_ptr ddict) {
@@ -323,6 +327,16 @@ auto zstd_processor::get_algorithm() const -> algorithm {
return (_cdict || _ddict) ? algorithm::zstd_with_dicts : algorithm::zstd;
}
std::optional<unsigned> zstd_processor::get_dict_owner_for_test() const {
if (_cdict) {
return _cdict.get_owner_shard();
} else if (_ddict) {
return _ddict.get_owner_shard();
} else {
return std::nullopt;
}
}
const std::string_view DICTIONARY_OPTION = ".dictionary.";
static std::map<sstring, sstring> dict_as_options(std::span<const std::byte> d) {
@@ -384,6 +398,10 @@ std::map<sstring, sstring> compressor::options() const {
return {};
}
std::optional<unsigned> compressor::get_dict_owner_for_test() const {
return std::nullopt;
}
std::string compressor::name() const {
return compression_parameters::algorithm_to_qualified_name(get_algorithm());
}
@@ -434,7 +452,7 @@ std::string_view compression_parameters::algorithm_to_name(algorithm alg) {
case algorithm::snappy: return "SnappyCompressor";
case algorithm::zstd: return "ZstdCompressor";
case algorithm::zstd_with_dicts: return "ZstdWithDictsCompressor";
case algorithm::none: on_internal_error(compressor_factory_logger, "algorithm_to_name(): called with algorithm::none");
case algorithm::none: return "none"; // Name used only for logging purposes, can't be chosen by the user.
}
abort();
}
@@ -660,6 +678,16 @@ std::map<sstring, sstring> lz4_processor::options() const {
}
}
std::optional<unsigned> lz4_processor::get_dict_owner_for_test() const {
if (_cdict) {
return _cdict.get_owner_shard();
} else if (_ddict) {
return _ddict.get_owner_shard();
} else {
return std::nullopt;
}
}
compressor_ptr make_lz4_sstable_compressor_for_tests() {
return std::make_unique<lz4_processor>();
}
@@ -751,21 +779,12 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
return snappy_max_compressed_length(input_len);
}
// Constructs compressors and decompressors for SSTables,
// making sure that the expensive identical parts (dictionaries) are shared
// across nodes.
//
// Holds weak pointers to all live dictionaries
// (so that they can be cheaply shared with new SSTables if an identical dict is requested),
// and shared (lifetime-extending) pointers to the current writer ("recommended")
// dict for each table (so that they can be shared with new SSTables without consulting
// `system.dicts`).
//
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
// (and are borrowed by foreign shared pointers) and all requests for a given dict ID go through its owner.
// (Note: this shouldn't pose a performance problem because a dict is only requested once per an opening of an SSTable).
// (Note: at the moment of this writing, one shard owns all. Later we can spread the ownership. (E.g. shard it by dict hash)).
//
// Whenever a dictionary dies (because its refcount reaches 0), its weak pointer
// is removed from the factory.
//
@@ -774,10 +793,10 @@ size_t snappy_processor::compress_max_size(size_t input_len) const {
// Has a configurable memory budget for live dicts. If the budget is exceeded,
// will return null dicts to new writers (to avoid making the memory usage even worse)
// and print warnings.
class sstable_compressor_factory_impl : public sstable_compressor_factory, public weakly_referencable<sstable_compressor_factory_impl> {
class dictionary_holder : public weakly_referencable<dictionary_holder> {
mutable logger::rate_limit budget_warning_rate_limit{std::chrono::minutes(10)};
shard_id _owner_shard;
config _cfg;
using config = default_sstable_compressor_factory::config;
const config& _cfg;
uint64_t _total_live_dict_memory = 0;
metrics::metric_groups _metrics;
struct zstd_cdict_id {
@@ -789,7 +808,7 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
std::map<zstd_cdict_id, const zstd_cdict*> _zstd_cdicts;
std::map<dict_id, const zstd_ddict*> _zstd_ddicts;
std::map<dict_id, const lz4_cdict*> _lz4_cdicts;
std::map<table_id, lw_shared_ptr<const raw_dict>> _recommended;
std::map<table_id, foreign_ptr<lw_shared_ptr<const raw_dict>>> _recommended;
size_t memory_budget() const {
return _cfg.memory_fraction_starting_at_which_we_stop_writing_dicts() * seastar::memory::stats().total_memory();
@@ -806,8 +825,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
memory_budget()
);
}
public:
lw_shared_ptr<const raw_dict> get_canonical_ptr(std::span<const std::byte> dict) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (dict.empty()) {
return nullptr;
}
auto id = get_sha256(dict);
if (auto it = _raw_dicts.find(id); it != _raw_dicts.end()) {
return it->second->shared_from_this();
@@ -819,7 +841,9 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
using foreign_zstd_ddict = foreign_ptr<lw_shared_ptr<const zstd_ddict>>;
foreign_zstd_ddict get_zstd_dict_for_reading(lw_shared_ptr<const raw_dict> raw, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!raw) {
return nullptr;
}
lw_shared_ptr<const zstd_ddict> ddict;
// Fo reading, we must allocate a new dict, even if memory budget is exceeded. We have no other choice.
// In any case, if the budget is exceeded after we print a rate-limited warning about it.
@@ -835,15 +859,11 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(ddict));
}
future<foreign_zstd_ddict> get_zstd_dict_for_reading(std::span<const std::byte> dict, int level) {
return smp::submit_to(_owner_shard, [this, dict, level] -> foreign_zstd_ddict {
auto raw = get_canonical_ptr(dict);
return get_zstd_dict_for_reading(raw, level);
});
}
using foreign_zstd_cdict = foreign_ptr<lw_shared_ptr<const zstd_cdict>>;
foreign_zstd_cdict get_zstd_dict_for_writing(lw_shared_ptr<const raw_dict> raw, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!_cfg.enable_writing_dictionaries() || !raw) {
return nullptr;
}
lw_shared_ptr<const zstd_cdict> cdict;
// If we can share an already-allocated dict, we do that regardless of memory budget.
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
@@ -859,19 +879,6 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(cdict));
}
future<foreign_zstd_cdict> get_zstd_dict_for_writing(table_id t, int level) {
return smp::submit_to(_owner_shard, [this, t, level] -> foreign_zstd_cdict {
if (!_cfg.enable_writing_dictionaries()) {
return {};
}
auto rec_it = _recommended.find(t);
if (rec_it != _recommended.end()) {
return get_zstd_dict_for_writing(rec_it->second, level);
} else {
return {};
}
});
}
using lz4_dicts = std::pair<
foreign_ptr<lw_shared_ptr<const raw_dict>>,
foreign_ptr<lw_shared_ptr<const lz4_cdict>>
@@ -879,18 +886,12 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
using foreign_lz4_ddict = foreign_ptr<lw_shared_ptr<const raw_dict>>;
using foreign_lz4_cdict = foreign_ptr<lw_shared_ptr<const lz4_cdict>>;
foreign_lz4_ddict get_lz4_dict_for_reading(lw_shared_ptr<const raw_dict> raw) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
lw_shared_ptr<const raw_dict> ddict;
return make_foreign(std::move(raw));
}
future<foreign_lz4_ddict> get_lz4_dicts_for_reading(std::span<const std::byte> dict) {
return smp::submit_to(_owner_shard, [this, dict] -> foreign_lz4_ddict {
auto raw = get_canonical_ptr(dict);
return get_lz4_dict_for_reading(raw);
});
}
foreign_lz4_cdict get_lz4_dict_for_writing(lw_shared_ptr<const raw_dict> raw) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (!_cfg.enable_writing_dictionaries() || !raw) {
return nullptr;
}
lw_shared_ptr<const lz4_cdict> cdict;
// If we can share an already-allocated dict, we do that regardless of memory budget.
// If we would have to allocate a new dict for writing, we only do that if we haven't exceeded
@@ -905,24 +906,10 @@ class sstable_compressor_factory_impl : public sstable_compressor_factory, publi
}
return make_foreign(std::move(cdict));
}
future<foreign_lz4_cdict> get_lz4_dicts_for_writing(table_id t) {
return smp::submit_to(_owner_shard, [this, t] -> foreign_lz4_cdict {
if (!_cfg.enable_writing_dictionaries()) {
return {};
}
auto rec_it = _recommended.find(t);
if (rec_it != _recommended.end()) {
return get_lz4_dict_for_writing(rec_it->second);
} else {
return {};
}
});
}
public:
sstable_compressor_factory_impl(config cfg)
: _owner_shard(this_shard_id())
, _cfg(std::move(cfg))
dictionary_holder(const config& cfg)
: _cfg(cfg)
{
if (_cfg.register_metrics) {
namespace sm = seastar::metrics;
@@ -931,8 +918,8 @@ public:
});
}
}
sstable_compressor_factory_impl(sstable_compressor_factory_impl&&) = delete;
~sstable_compressor_factory_impl() {
dictionary_holder(dictionary_holder&&) = delete;
~dictionary_holder() {
// Note: `_recommended` might be the only thing keeping some dicts alive,
// so clearing it will destroy them.
//
@@ -948,39 +935,36 @@ public:
_recommended.clear();
}
void forget_raw_dict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_raw_dicts.erase(id);
}
void forget_zstd_cdict(dict_id id, int level) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_zstd_cdicts.erase({id, level});
}
void forget_zstd_ddict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_zstd_ddicts.erase(id);
}
void forget_lz4_cdict(dict_id id) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
_lz4_cdicts.erase(id);
}
future<> set_recommended_dict(table_id t, std::span<const std::byte> dict) override {
return smp::submit_to(_owner_shard, [this, t, dict] {
_recommended.erase(t);
if (dict.size()) {
auto canonical_ptr = get_canonical_ptr(dict);
_recommended.emplace(t, canonical_ptr);
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
t, dict.size(), fmt_hex(canonical_ptr->id()));
} else {
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
}
});
void set_recommended_dict(table_id t, foreign_ptr<lw_shared_ptr<const raw_dict>> dict) {
_recommended.erase(t);
if (dict) {
compressor_factory_logger.debug("set_recommended_dict: table={} size={} id={}",
t, dict->raw().size(), fmt_hex(dict->id()));
_recommended.emplace(t, std::move(dict));
} else {
compressor_factory_logger.debug("set_recommended_dict: table={} size=0", t);
}
}
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t) {
auto rec_it = _recommended.find(t);
if (rec_it == _recommended.end()) {
co_return nullptr;
}
co_return co_await rec_it->second.copy();
}
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
void account_memory_delta(ssize_t n) {
SCYLLA_ASSERT(this_shard_id() == _owner_shard);
if (static_cast<ssize_t>(_total_live_dict_memory) + n < 0) {
compressor_factory_logger.error(
"Error in dictionary memory accounting: delta {} brings live memory {} below 0",
@@ -990,19 +974,85 @@ public:
}
};
default_sstable_compressor_factory::default_sstable_compressor_factory(config cfg)
: _cfg(std::move(cfg))
, _holder(std::make_unique<dictionary_holder>(_cfg))
{
for (shard_id i = 0; i < smp::count; ++i) {
auto numa_id = _cfg.numa_config[i];
_numa_groups.resize(std::max<size_t>(_numa_groups.size(), numa_id + 1));
_numa_groups[numa_id].push_back(i);
}
}
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writing(schema_ptr s) {
const auto params = s->get_compressor_params();
default_sstable_compressor_factory::~default_sstable_compressor_factory() {
}
std::vector<unsigned> default_sstable_compressor_factory_config::get_default_shard_to_numa_node_mapping() {
auto sp = local_engine->smp().shard_to_numa_node_mapping();
return std::vector<unsigned>(sp.begin(), sp.end());
}
unsigned default_sstable_compressor_factory::local_numa_id() {
return _cfg.numa_config[this_shard_id()];
}
shard_id default_sstable_compressor_factory::get_dict_owner(unsigned numa_id, const sha256_type& sha) {
auto hash = read_unaligned<uint64_t>(sha.data());
const auto& group = _numa_groups[numa_id];
if (group.empty()) {
on_internal_error(compressor_factory_logger, "get_dict_owner called on an empty NUMA group");
}
return group[hash % group.size()];
}
future<> default_sstable_compressor_factory::set_recommended_dict_local(table_id t, std::span<const std::byte> dict) {
if (_leader_shard != this_shard_id()) {
on_internal_error(compressor_factory_logger, fmt::format("set_recommended_dict_local called on wrong shard. Expected: {}, got {}", _leader_shard, this_shard_id()));
}
auto units = co_await get_units(_recommendation_setting_sem, 1);
auto sha = get_sha256(dict);
for (unsigned numa_id = 0; numa_id < _numa_groups.size(); ++numa_id) {
const auto& group = _numa_groups[numa_id];
if (group.empty()) {
continue;
}
auto r = get_dict_owner(numa_id, sha);
auto d = co_await container().invoke_on(r, [dict](self& local) {
return make_foreign(local._holder->get_canonical_ptr(dict));
});
auto local_coordinator = group[0];
co_await container().invoke_on(local_coordinator, coroutine::lambda([t, d = std::move(d)](self& local) mutable {
local._holder->set_recommended_dict(t, std::move(d));
}));
}
}
future<> default_sstable_compressor_factory::set_recommended_dict(table_id t, std::span<const std::byte> dict) {
return container().invoke_on(_leader_shard, &self::set_recommended_dict_local, t, dict);
}
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> default_sstable_compressor_factory::get_recommended_dict(table_id t) {
const auto local_coordinator = _numa_groups[local_numa_id()][0];
return container().invoke_on(local_coordinator, [t](self& local) {
return local._holder->get_recommended_dict(t);
});
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_impl(const compression_parameters& params, table_id id) {
using algorithm = compression_parameters::algorithm;
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", s->id(), algo);
compressor_factory_logger.debug("make_compressor_for_writing: table={} algo={}", id, algo);
switch (algo) {
case algorithm::lz4:
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
case algorithm::lz4_with_dicts: {
auto cdict = _cfg.enable_writing_dictionaries()
? co_await get_lz4_dicts_for_writing(s->id())
: nullptr;
holder::foreign_lz4_cdict cdict;
if (auto recommended = co_await get_recommended_dict(id)) {
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [recommended = std::move(recommended)] (self& local) mutable {
return local._holder->get_lz4_dict_for_writing(recommended.release());
});
}
if (cdict) {
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
}
@@ -1015,9 +1065,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
case algorithm::zstd:
co_return std::make_unique<zstd_processor>(params, nullptr, nullptr);
case algorithm::zstd_with_dicts: {
auto cdict = _cfg.enable_writing_dictionaries()
? co_await get_zstd_dict_for_writing(s->id(), params.zstd_compression_level().value_or(ZSTD_defaultCLevel()))
: nullptr;
holder::foreign_zstd_cdict cdict;
if (auto recommended = co_await get_recommended_dict(id)) {
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
cdict = co_await container().invoke_on(recommended.get_owner_shard(), [level, recommended = std::move(recommended)] (self& local) mutable {
return local._holder->get_zstd_dict_for_writing(recommended.release(), level);
});
}
if (cdict) {
compressor_factory_logger.debug("make_compressor_for_writing: using dict id={}", fmt_hex(cdict->id()));
}
@@ -1029,17 +1083,28 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_writ
abort();
}
future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_reading(sstables::compression& c) {
const auto params = compression_parameters(sstables::options_from_compression(c));
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing(schema_ptr s) {
return make_compressor_for_writing_impl(s->get_compressor_params(), s->id());
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_writing_for_tests(const compression_parameters& params, table_id id) {
return make_compressor_for_writing_impl(params, id);
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_impl(const compression_parameters& params, std::span<const std::byte> dict) {
using algorithm = compression_parameters::algorithm;
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
switch (algo) {
case algorithm::lz4:
co_return std::make_unique<lz4_processor>(nullptr, nullptr);
case algorithm::lz4_with_dicts: {
auto dict = dict_from_options(c);
auto ddict = co_await get_lz4_dicts_for_reading(std::as_bytes(std::span(*dict)));
auto dict_span = dict;
auto sha = get_sha256(dict_span);
auto dict_owner = get_dict_owner(local_numa_id(), sha);
auto ddict = co_await container().invoke_on(dict_owner, [dict_span] (self& local) mutable {
auto d = local._holder->get_canonical_ptr(dict_span);
return local._holder->get_lz4_dict_for_reading(std::move(d));
});
if (ddict) {
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
}
@@ -1054,8 +1119,13 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
}
case algorithm::zstd_with_dicts: {
auto level = params.zstd_compression_level().value_or(ZSTD_defaultCLevel());
auto dict = dict_from_options(c);
auto ddict = co_await get_zstd_dict_for_reading(std::as_bytes(std::span(*dict)), level);
auto dict_span = dict;
auto sha = get_sha256(dict_span);
auto dict_owner = get_dict_owner(local_numa_id(), sha);
auto ddict = co_await container().invoke_on(dict_owner, [level, dict_span] (self& local) mutable {
auto d = local._holder->get_canonical_ptr(dict_span);
return local._holder->get_zstd_dict_for_reading(std::move(d), level);
});
if (ddict) {
compressor_factory_logger.debug("make_compressor_for_reading: using dict id={}", fmt_hex(ddict->id()));
}
@@ -1067,7 +1137,19 @@ future<compressor_ptr> sstable_compressor_factory_impl::make_compressor_for_read
abort();
}
raw_dict::raw_dict(sstable_compressor_factory_impl& owner, dict_id key, std::span<const std::byte> dict)
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading(sstables::compression& c) {
const auto params = compression_parameters(sstables::options_from_compression(c));
auto dict = dict_from_options(c);
const auto algo = params.get_algorithm();
compressor_factory_logger.debug("make_compressor_for_reading: compression={} algo={}", fmt::ptr(&c), algo);
co_return co_await make_compressor_for_reading_impl(params, std::as_bytes(std::span(*dict)));
}
future<compressor_ptr> default_sstable_compressor_factory::make_compressor_for_reading_for_tests(const compression_parameters& params, std::span<const std::byte> dict) {
return make_compressor_for_reading_impl(params, dict);
}
raw_dict::raw_dict(dictionary_holder& owner, dict_id key, std::span<const std::byte> dict)
: _owner(owner.weak_from_this())
, _id(key)
, _dict(dict.begin(), dict.end())
@@ -1082,7 +1164,7 @@ raw_dict::~raw_dict() {
}
}
zstd_cdict::zstd_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw, int level)
zstd_cdict::zstd_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw, int level)
: _owner(owner.weak_from_this())
, _raw(raw)
, _level(level)
@@ -1114,7 +1196,7 @@ zstd_cdict::~zstd_cdict() {
}
}
zstd_ddict::zstd_ddict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
zstd_ddict::zstd_ddict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
: _owner(owner.weak_from_this())
, _raw(raw)
, _alloc([this] (ssize_t n) {
@@ -1143,7 +1225,7 @@ zstd_ddict::~zstd_ddict() {
}
}
lz4_cdict::lz4_cdict(sstable_compressor_factory_impl& owner, lw_shared_ptr<const raw_dict> raw)
lz4_cdict::lz4_cdict(dictionary_holder& owner, lw_shared_ptr<const raw_dict> raw)
: _owner(owner.weak_from_this())
, _raw(raw)
, _dict(LZ4_createStream(), LZ4_freeStream)
@@ -1162,6 +1244,28 @@ lz4_cdict::~lz4_cdict() {
}
}
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg) {
return std::make_unique<sstable_compressor_factory_impl>(std::move(cfg));
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread() {
SCYLLA_ASSERT(thread::running_in_thread());
struct wrapper : sstable_compressor_factory {
using impl = default_sstable_compressor_factory;
sharded<impl> _impl;
future<compressor_ptr> make_compressor_for_writing(schema_ptr s) override {
return _impl.local().make_compressor_for_writing(s);
}
future<compressor_ptr> make_compressor_for_reading(sstables::compression& c) override {
return _impl.local().make_compressor_for_reading(c);
}
future<> set_recommended_dict(table_id t, std::span<const std::byte> d) override {
return _impl.local().set_recommended_dict(t, d);
};
wrapper(wrapper&&) = delete;
wrapper() {
_impl.start().get();
}
~wrapper() {
_impl.stop().get();
}
};
return std::make_unique<wrapper>();
}

View File

@@ -64,6 +64,8 @@ public:
virtual algorithm get_algorithm() const = 0;
virtual std::optional<unsigned> get_dict_owner_for_test() const;
using ptr_type = std::unique_ptr<compressor>;
};

View File

@@ -1538,6 +1538,7 @@ deps['test/boost/combined_tests'] += [
'test/boost/secondary_index_test.cc',
'test/boost/sessions_test.cc',
'test/boost/sstable_compaction_test.cc',
'test/boost/sstable_compressor_factory_test.cc',
'test/boost/sstable_directory_test.cc',
'test/boost/sstable_set_test.cc',
'test/boost/statement_restrictions_test.cc',

View File

@@ -113,10 +113,9 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
if (rs->uses_tablets()) {
warnings.push_back(
"Tables in this keyspace will be replicated using Tablets "
"and will not support CDC, LWT and counters features. "
"To use CDC, LWT or counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} "
"to the CREATE KEYSPACE statement.");
"and will not support Materialized Views, Secondary Indexes, CDC, LWT and counters features. "
"To use Materialized Views, Secondary Indexes, CDC, LWT or counters, drop this keyspace and re-create it "
"without tablets by adding AND TABLETS = {'enabled': false} to the CREATE KEYSPACE statement.");
if (ksm->initial_tablets().value()) {
warnings.push_back("Keyspace `initial` tablets option is deprecated. Use per-table tablet options instead.");
}

View File

@@ -7,7 +7,7 @@
},
"ScyllaDB Versions": [
{
"version": "Enterprise 2025.1",
"version": "ScyllaDB 2025.1",
"supported_OS": {
"Ubuntu": ["20.04", "22.04", "24.04"],
"Debian": ["11"],

View File

@@ -2,6 +2,11 @@
#old path: new path
# Remove reduntant pages
/stable/getting-started/tutorials: https://docs.scylladb.com/stable/get-started/develop-with-scylladb/tutorials-example-projects.html
/stable/contribute: https://github.com/scylladb/scylladb/blob/master/CONTRIBUTING.md
# Remove an oudated article
/stable/troubleshooting/nodetool-memory-read-timeout.html: /stable/troubleshooting/index.html

View File

@@ -1,31 +0,0 @@
Contribute to ScyllaDB
=======================
Thank you for your interest in making ScyllaDB better!
We appreciate your help and look forward to welcoming you to the ScyllaDB Community.
There are two ways you can contribute:
* Send a patch to the ScyllaDB source code
* Write documentation for ScyllaDB Docs
Contribute to ScyllaDB's Source Code
------------------------------------
ScyllaDB developers use patches and email to share and discuss changes.
Setting up can take a little time, but once you have done it the first time, its easy.
The basic steps are:
* Join the ScyllaDB community
* Create a Git branch to work on
* Commit your work with clear commit messages and sign-offs.
* Send a PR or use ``git format-patch`` and ``git send-email`` to send to the list
The entire process is `documented here <https://github.com/scylladb/scylla/blob/master/CONTRIBUTING.md>`_.
Contribute to ScyllaDB Docs
---------------------------
Each ScyllaDB project has accompanying documentation. For information about contributing documentation to a specific ScyllaDB project, refer to the README file for the individual project.
For general information or to contribute to the ScyllaDB Sphinx theme, read the `Contributor's Guide <https://sphinx-theme.scylladb.com/stable/contribute/>`_.

View File

@@ -11,7 +11,6 @@ Getting Started
requirements
Migrate to ScyllaDB </using-scylla/migrate-scylla>
Integration Solutions </using-scylla/integrations/index>
tutorials
.. panel-box::
:title: ScyllaDB Requirements

View File

@@ -1,21 +0,0 @@
============
Tutorials
============
The tutorials will show you how to use ScyllaDB as a data source for an application.
ScyllaDB Tutorial
===================
`Build an IoT App with sensor simulator and a REST API <https://iot.scylladb.com/stable/>`_
ScyllaDB Cloud Tutorial
=======================
`Implement CRUD operations with a TODO App <https://github.com/scylladb/scylla-cloud-getting-started/>`_
ScyllaDB Cloud Feature Store Tutorial
=====================================
`Build a machine learning (ML) feature store with ScyllaDB <https://feature-store.scylladb.com/stable/>`_

View File

@@ -73,6 +73,5 @@ In addition, you can read our `blog <https://www.scylladb.com/blog/>`_ and atten
kb/index
reference/index
faq
Contribute to ScyllaDB <contribute>
2024.2 and earlier documentation <https://enterprise.docs.scylladb.com/branch-2024.2/>

15
init.cc
View File

@@ -13,6 +13,7 @@
#include <boost/algorithm/string/trim.hpp>
#include <seastar/core/coroutine.hh>
#include "sstables/sstable_compressor_factory.hh"
logging::logger startlog("init");
@@ -129,3 +130,17 @@ void service_set::add(std::any value) {
std::any service_set::find(const std::type_info& type) const {
return _impl->find(type);
}
// Placed here to avoid dependency on db::config in compress.cc,
// where the rest of default_sstable_compressor_factory_config is.
auto default_sstable_compressor_factory_config::from_db_config(
const db::config& cfg,
std::span<const unsigned> numa_config) -> self
{
return self {
.register_metrics = true,
.enable_writing_dictionaries = cfg.sstable_compression_dictionaries_enable_writing,
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg.sstable_compression_dictionaries_memory_budget_fraction,
.numa_config{numa_config.begin(), numa_config.end()},
};
}

View File

@@ -112,19 +112,19 @@ void production_snitch_base::parse_property_file(std::string contents) {
[[noreturn]]
void production_snitch_base::throw_double_declaration(const sstring& key) const {
logger().error("double \"{}\" declaration in {}", key, _prop_file_name);
logger().warn("double \"{}\" declaration in {}", key, _prop_file_name);
throw bad_property_file_error();
}
[[noreturn]]
void production_snitch_base::throw_bad_format(const sstring& line) const {
logger().error("Bad format in properties file {}: {}", _prop_file_name, line);
logger().warn("Bad format in properties file {}: {}", _prop_file_name, line);
throw bad_property_file_error();
}
[[noreturn]]
void production_snitch_base::throw_incomplete_file() const {
logger().error("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
logger().warn("Property file {} is incomplete. Some obligatory fields are missing.", _prop_file_name);
throw bad_property_file_error();
}

50
main.cc
View File

@@ -1236,17 +1236,19 @@ sharded<locator::shared_token_metadata> token_metadata;
auto stop_lang_man = defer_verbose_shutdown("lang manager", [] { langman.invoke_on_all(&lang::manager::stop).get(); });
langman.invoke_on_all(&lang::manager::start).get();
auto sstable_compressor_factory = make_sstable_compressor_factory(sstable_compressor_factory::config{
.register_metrics = true,
.enable_writing_dictionaries = cfg->sstable_compression_dictionaries_enable_writing,
.memory_fraction_starting_at_which_we_stop_writing_dicts = cfg->sstable_compression_dictionaries_memory_budget_fraction,
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
sstable_compressor_factory.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config,
std::cref(*cfg), std::cref(numa_groups))).get();
auto stop_compressor_factory = defer_verbose_shutdown("sstable_compressor_factory", [&sstable_compressor_factory] {
sstable_compressor_factory.stop().get();
});
checkpoint(stop_signal, "starting database");
debug::the_database = &db;
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier), std::ref(feature_service), std::ref(token_metadata),
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(*sstable_compressor_factory),
std::ref(cm), std::ref(sstm), std::ref(langman), std::ref(sst_dir_semaphore), std::ref(sstable_compressor_factory),
std::ref(stop_signal.as_sharded_abort_source()), utils::cross_shard_barrier()).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
@@ -1717,7 +1719,7 @@ sharded<locator::shared_token_metadata> token_metadata;
auto sstables_prefix = std::string_view("sstables/");
if (name.starts_with(sstables_prefix)) {
auto table = table_id(utils::UUID(name.substr(sstables_prefix.size())));
co_await sstable_compressor_factory->set_recommended_dict(table, std::move(dict.data));
co_await sstable_compressor_factory.local().set_recommended_dict(table, std::move(dict.data));
} else if (name == dictionary_service::rpc_compression_dict_name) {
co_await utils::announce_dict_to_shards(compressor_tracker, std::move(dict));
}
@@ -1755,6 +1757,24 @@ sharded<locator::shared_token_metadata> token_metadata;
utils::get_local_injector().inject("stop_after_starting_repair", [] { std::raise(SIGSTOP); });
debug::the_stream_manager = &stream_manager;
checkpoint(stop_signal, "starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
});
checkpoint(stop_signal, "starting streaming manager");
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
return sm.start(stop_signal.as_local_abort_source());
}).get();
api::set_server_stream_manager(ctx, stream_manager).get();
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "initializing storage service");
debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
@@ -1921,24 +1941,6 @@ sharded<locator::shared_token_metadata> token_metadata;
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
});
debug::the_stream_manager = &stream_manager;
checkpoint(stop_signal, "starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
});
checkpoint(stop_signal, "starting streaming manager");
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
return sm.start(stop_signal.as_local_abort_source());
}).get();
api::set_server_stream_manager(ctx, stream_manager).get();
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "starting hinted handoff manager");
if (!hinted_handoff_enabled.is_disabled_for_all()) {
hints_dir_initializer.ensure_rebalanced().get();

View File

@@ -397,7 +397,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
, _view_update_read_concurrency_semaphores_group(
max_memory_concurrent_view_update_reads(),
utils::updateable_value<int>(max_count_concurrent_view_update_reads),
max_inactive_view_update_queue_length(),
std::numeric_limits<size_t>::max(),
_cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier,
_cfg.view_update_reader_concurrency_semaphore_cpu_concurrency,

View File

@@ -1488,7 +1488,6 @@ private:
size_t max_memory_concurrent_view_update_reads() { return _dbcfg.available_memory * 0.01; }
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
size_t max_inactive_queue_length() { return _dbcfg.available_memory * 0.02 / 1000; }
size_t max_inactive_view_update_queue_length() { return _dbcfg.available_memory * 0.01 / 1000; }
// They're rather heavyweight, so limit more
static constexpr size_t max_count_streaming_concurrent_reads{10};
size_t max_memory_streaming_concurrent_reads() { return _dbcfg.available_memory * 0.02; }

View File

@@ -1892,6 +1892,8 @@ table::sstable_list_builder::build_new_list(const sstables::sstable_set& current
const std::vector<sstables::shared_sstable>& old_sstables) {
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
co_await utils::get_local_injector().inject("sstable_list_builder_delay", std::chrono::milliseconds(100));
// add sstables from the current list into the new list except the ones that are in the old list
std::vector<sstables::shared_sstable> removed_sstables;
co_await current_sstables.for_each_sstable_gently([&s, &removed_sstables, &new_sstable_list] (const sstables::shared_sstable& tab) {
@@ -2178,14 +2180,13 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
tlogger.debug("Setting compaction strategy of {}.{} to {}", _schema->ks_name(), _schema->cf_name(), sstables::compaction_strategy::name(strategy));
auto new_cs = make_compaction_strategy(strategy, _schema->compaction_strategy_options());
struct compaction_group_sstable_set_updater {
struct compaction_group_strategy_updater {
table& t;
compaction_group& cg;
compaction_backlog_tracker new_bt;
compaction::compaction_strategy_state new_cs_state;
lw_shared_ptr<sstables::sstable_set> new_sstables;
compaction_group_sstable_set_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
compaction_group_strategy_updater(table& t, compaction_group& cg, sstables::compaction_strategy& new_cs)
: t(t)
, cg(cg)
, new_bt(new_cs.make_backlog_tracker())
@@ -2196,26 +2197,26 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
auto move_read_charges = new_cs.type() == t._compaction_strategy.type();
cg.get_backlog_tracker().copy_ongoing_charges(new_bt, move_read_charges);
new_sstables = make_lw_shared<sstables::sstable_set>(new_cs.make_sstable_set(cg.as_table_state()));
std::vector<sstables::shared_sstable> new_sstables_for_backlog_tracker;
new_sstables_for_backlog_tracker.reserve(cg.main_sstables()->size());
cg.main_sstables()->for_each_sstable([this, &new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
new_sstables->insert(s);
cg.main_sstables()->for_each_sstable([&new_sstables_for_backlog_tracker] (const sstables::shared_sstable& s) {
new_sstables_for_backlog_tracker.push_back(s);
});
new_bt.replace_sstables({}, std::move(new_sstables_for_backlog_tracker));
}
void execute() noexcept {
// Update strategy state and backlog tracker according to new strategy. SSTable set update
// is delayed until new compaction, which is triggered on strategy change. SSTable set
// cannot be updated here since it must happen under the set update lock.
t._compaction_manager.register_backlog_tracker(cg.as_table_state(), std::move(new_bt));
cg.set_main_sstables(std::move(new_sstables));
cg.set_compaction_strategy_state(std::move(new_cs_state));
}
};
std::vector<compaction_group_sstable_set_updater> cg_sstable_set_updaters;
std::vector<compaction_group_strategy_updater> cg_sstable_set_updaters;
for_each_compaction_group([&] (compaction_group& cg) {
compaction_group_sstable_set_updater updater(*this, cg, new_cs);
compaction_group_strategy_updater updater(*this, cg, new_cs);
updater.prepare(new_cs);
cg_sstable_set_updaters.push_back(std::move(updater));
});
@@ -2224,7 +2225,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
for (auto& updater : cg_sstable_set_updaters) {
updater.execute();
}
refresh_compound_sstable_set();
}
size_t table::sstables_count() const {

View File

@@ -769,6 +769,12 @@ utils::input_stream as_input_stream(const bytes_ostream& b) {
return utils::input_stream::fragmented(b.fragments().begin(), b.size());
}
template<FragmentedView View>
inline
auto as_input_stream(View v) {
return fragmented_memory_input_stream(fragment_range(v).begin(), v.size_bytes());
}
template<typename Output, typename ...T>
void serialize(Output& out, const boost::variant<T...>& v) {}

View File

@@ -89,7 +89,6 @@ future<raft::index_t> raft_sys_table_storage::load_commit_idx() {
co_return raft::index_t(static_row.get_or<int64_t>("commit_idx", raft::index_t{}.value()));
}
future<raft::log_entries> raft_sys_table_storage::load_log() {
static const auto load_cql = format("SELECT term, \"index\", data FROM system.{} WHERE group_id = ?", db::system_keyspace::RAFT);
::shared_ptr<cql3::untyped_result_set> rs = co_await _qp.execute_internal(load_cql, {_group_id.id}, cql3::query_processor::cache_internal::yes);
@@ -103,7 +102,7 @@ future<raft::log_entries> raft_sys_table_storage::load_log() {
}
raft::term_t term = raft::term_t(row.get_as<int64_t>("term"));
raft::index_t idx = raft::index_t(row.get_as<int64_t>("index"));
auto raw_data = row.get_blob("data");
auto raw_data = row.get_view("data");
auto in = ser::as_input_stream(raw_data);
using data_variant_type = decltype(raft::log_entry::data);
data_variant_type data = ser::deserialize(in, std::type_identity<data_variant_type>());

View File

@@ -457,7 +457,7 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
switch (rs.state) {
case node_state::normal: {
if (is_me(ip)) {
if (is_me(id)) {
co_return;
}
// In replace-with-same-ip scenario the replaced node IP will be the same
@@ -490,8 +490,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
auto old_ip = it->second;
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
co_await _gossiper.force_remove_endpoint(id, gms::null_permit_id);
}
}
break;
@@ -945,22 +943,13 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
if (prev_ip == endpoint) {
co_return;
}
if (_address_map.find(id) != endpoint) {
// Address map refused to update IP for the host_id,
// this means prev_ip has higher generation than endpoint.
// We can immediately remove endpoint from gossiper
// since it represents an old IP (before an IP change)
// for the given host_id. This is not strictly
// necessary, but it reduces the noise circulated
// in gossiper messages and allows for clearer
// expectations of the gossiper state in tests.
co_await _ss._gossiper.force_remove_endpoint(id, permit_id);
// Do not update address.
co_return;
}
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
if (_ss.raft_topology_change_enabled()) {
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
@@ -1691,6 +1680,8 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
slogger.info("Starting up server gossip");
co_await utils::get_local_injector().inject("sleep_before_start_gossiping", std::chrono::milliseconds{500});
co_await _gossiper.start_gossiping(new_generation, app_states);
utils::get_local_injector().inject("stop_after_starting_gossiping",
@@ -6010,14 +6001,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
});
}
inet_address storage_service::host2ip(locator::host_id host) const {
auto ip = _address_map.find(host);
if (!ip) {
throw std::runtime_error(::format("Cannot map host {} to ip", host));
}
return *ip;
}
// Performs a replica-side operation for a given tablet.
// What operation is performed is determined by "op" based on the
// current state of tablet metadata. The coordinator is supposed to prepare tablet
@@ -7281,11 +7264,7 @@ void storage_service::init_messaging_service() {
[this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future<streaming::stream_files_response> {
streaming::stream_files_response resp;
resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future<size_t> {
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req, [&ss] (locator::host_id host) -> future<gms::inet_address> {
return ss.container().invoke_on(0, [host] (storage_service& ss) {
return ss.host2ip(host);
});
});
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req);
co_return res.stream_bytes;
},
size_t(0),

View File

@@ -210,7 +210,6 @@ private:
// when both of which sit on the same node. So all the movement is local.
future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending);
future<> cleanup_tablet(locator::global_tablet_id);
inet_address host2ip(locator::host_id) const;
// Handler for table load stats RPC.
future<locator::load_stats> load_stats_for_tablet_based_tables();
future<> process_tablet_split_candidate(table_id) noexcept;

View File

@@ -1534,7 +1534,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
break;
case locator::tablet_transition_stage::repair: {
if (action_failed(tablet_state.repair)) {
bool fail_repair = utils::get_local_injector().enter("handle_tablet_migration_repair_fail");
if (fail_repair || action_failed(tablet_state.repair)) {
if (do_barrier()) {
updates.emplace_back(get_mutation_builder()
.set_stage(last_token, locator::tablet_transition_stage::end_repair)
@@ -2021,6 +2022,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::join_group0, "
"global_token_metadata_barrier failed, error {}",
@@ -2157,6 +2160,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::commit_cdc_generation, "
"raft_topology_cmd::command::barrier failed, error {}", std::current_exception());
@@ -2236,6 +2241,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("tablets draining failed with {}. Aborting the topology operation", std::current_exception());
_rollback = fmt::format("Failed to drain tablets: {}", std::current_exception());
@@ -2251,6 +2258,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::write_both_read_old, "
"global_token_metadata_barrier failed, error {}",
@@ -2297,6 +2306,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
} catch (term_changed_error&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("send_raft_topology_cmd(stream_ranges) failed with exception"
" (node state is {}): {}", state, std::current_exception());
@@ -2327,6 +2338,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::write_both_read_new, "
"global_token_metadata_barrier failed, error {}",
@@ -2466,6 +2479,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (raft::request_aborted&) {
throw;
} catch (...) {
rtlogger.error("transition_state::left_token_ring, "
"raft_topology_cmd::command::barrier failed, error {}",
@@ -2481,6 +2496,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node = retake_node(co_await start_operation(), node.id);
}
// Make decommissioning node a non voter before reporting operation completion below.
// Otherwise the decommissioned node may see the completion and exit before it is removed from
// the config at which point the removal from the config will hang if the cluster had only two
// nodes before the decommission.
co_await _voter_handler.on_node_removed(node.id, _as);
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();

View File

@@ -189,7 +189,19 @@ future<float> try_one_compression_config(
const compression_parameters& params,
const utils::chunked_vector<temporary_buffer<char>>& validation_samples
) {
auto factory = make_sstable_compressor_factory();
co_await factory->set_recommended_dict(initial_schema->id(), dict);
co_return co_await try_one_compression_config(*factory, initial_schema, params, validation_samples);
sharded<default_sstable_compressor_factory> factory;
co_await factory.start();
std::exception_ptr ex;
float result;
try {
co_await factory.local().set_recommended_dict(initial_schema->id(), dict);
result = co_await try_one_compression_config(factory.local(), initial_schema, params, validation_samples);
} catch (...) {
ex = std::current_exception();
}
co_await factory.stop();
if (ex) {
co_return coroutine::exception(ex);
}
co_return result;
}

View File

@@ -10,20 +10,89 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include "compress.hh"
#include "schema/schema_fwd.hh"
#include "utils/updateable_value.hh"
#include <span>
namespace db {
class config;
} // namespace db
struct dictionary_holder;
class raw_dict;
struct sstable_compressor_factory {
virtual ~sstable_compressor_factory() {}
virtual future<compressor_ptr> make_compressor_for_writing(schema_ptr) = 0;
virtual future<compressor_ptr> make_compressor_for_reading(sstables::compression&) = 0;
virtual future<> set_recommended_dict(table_id, std::span<const std::byte> dict) = 0;
struct config {
bool register_metrics = false;
utils::updateable_value<bool> enable_writing_dictionaries{true};
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
};
};
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory(sstable_compressor_factory::config cfg = {});
// Note: I couldn't make this an inner class of default_sstable_compressor_factory,
// because then the compiler gives weird complains about default member initializers in line
// ```
// default_sstable_compressor_factory(config = config{});
// ```
// apparently due to some compiler bug related default initializers.
struct default_sstable_compressor_factory_config {
using self = default_sstable_compressor_factory_config;
static std::vector<unsigned> get_default_shard_to_numa_node_mapping();
bool register_metrics = false;
utils::updateable_value<bool> enable_writing_dictionaries{true};
utils::updateable_value<float> memory_fraction_starting_at_which_we_stop_writing_dicts{1};
std::vector<unsigned> numa_config{get_default_shard_to_numa_node_mapping()};
static default_sstable_compressor_factory_config from_db_config(
const db::config&,
std::span<const unsigned> numa_config = get_default_shard_to_numa_node_mapping());
};
// Constructs compressors and decompressors for SSTables,
// making sure that the expensive identical parts (dictionaries) are shared
// between all shards within the same NUMA group.
//
// To make coordination work without resorting to std::mutex and such, dicts have owner shards,
// decided by a content hash of the dictionary.
// All requests for a given dict ID go through the owner of this ID and return a foreign shared pointer
// to that dict.
//
// (Note: this centralization shouldn't pose a performance problem because a dict is only requested once
// per an opening of an SSTable).
struct default_sstable_compressor_factory : peering_sharded_service<default_sstable_compressor_factory>, sstable_compressor_factory {
using holder = dictionary_holder;
public:
using self = default_sstable_compressor_factory;
using config = default_sstable_compressor_factory_config;
private:
config _cfg;
// Maps NUMA node ID to the array of shards on that node.
std::vector<std::vector<shard_id>> _numa_groups;
// Holds dictionaries owned by this shard.
std::unique_ptr<dictionary_holder> _holder;
// All recommended dictionary updates are serialized by a single "leader shard".
// We do this to avoid dealing with concurrent updates altogether.
semaphore _recommendation_setting_sem{1};
constexpr static shard_id _leader_shard = 0;
private:
using sha256_type = std::array<std::byte, 32>;
unsigned local_numa_id();
shard_id get_dict_owner(unsigned numa_id, const sha256_type& sha);
future<foreign_ptr<lw_shared_ptr<const raw_dict>>> get_recommended_dict(table_id t);
future<> set_recommended_dict_local(table_id, std::span<const std::byte> dict);
future<compressor_ptr> make_compressor_for_writing_impl(const compression_parameters&, table_id);
future<compressor_ptr> make_compressor_for_reading_impl(const compression_parameters&, std::span<const std::byte> dict);
public:
default_sstable_compressor_factory(config = config{});
~default_sstable_compressor_factory();
future<compressor_ptr> make_compressor_for_writing(schema_ptr) override;
future<compressor_ptr> make_compressor_for_writing_for_tests(const compression_parameters&, table_id);
future<compressor_ptr> make_compressor_for_reading(sstables::compression&) override;
future<compressor_ptr> make_compressor_for_reading_for_tests(const compression_parameters&, std::span<const std::byte> dict);
future<> set_recommended_dict(table_id, std::span<const std::byte> dict) override;
};
std::unique_ptr<sstable_compressor_factory> make_sstable_compressor_factory_for_tests_in_thread();

View File

@@ -945,7 +945,21 @@ filter_sstable_for_reader_by_ck(std::vector<shared_sstable>&& sstables, replica:
std::vector<frozen_sstable_run>
sstable_set_impl::all_sstable_runs() const {
throw_with_backtrace<std::bad_function_call>();
auto all_sstables = all();
std::unordered_map<sstables::run_id, sstable_run> runs_m;
std::vector<frozen_sstable_run> all_runs;
for (auto&& sst : *all_sstables) {
// When a run cannot accept sstable due to overlapping, treat the rejected sstable
// as a single-fragment run.
if (!runs_m[sst->run_identifier()].insert(sst)) {
all_runs.push_back(make_lw_shared<const sstable_run>(sst));
}
}
for (auto&& r : runs_m | std::views::values) {
all_runs.push_back(make_lw_shared<const sstable_run>(std::move(r)));
}
return all_runs;
}
mutation_reader

View File

@@ -9,7 +9,6 @@
#include "message/messaging_service.hh"
#include "streaming/stream_blob.hh"
#include "streaming/stream_plan.hh"
#include "gms/inet_address.hh"
#include "utils/pretty_printers.hh"
#include "utils/error_injection.hh"
#include "locator/host_id.hh"
@@ -120,7 +119,7 @@ static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_i
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
@@ -310,7 +309,7 @@ future<> stream_blob_handler(replica::database& db,
future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source) {
@@ -374,7 +373,7 @@ namespace streaming {
// Send files in the files list to the nodes in targets list over network
// Returns number of bytes sent over network
future<size_t>
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) {
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) {
size_t ops_total_size = 0;
if (targets.empty()) {
co_return ops_total_size;
@@ -387,7 +386,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
ops_id, sources.size(), sources, targets);
struct sink_and_source {
gms::inet_address node;
locator::host_id node;
rpc::sink<streaming::stream_blob_cmd_data> sink;
rpc::source<streaming::stream_blob_cmd_data> source;
bool sink_closed = false;
@@ -428,10 +427,9 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
for (auto& x : targets) {
const auto& node = x.node;
meta.dst_shard_id = x.shard;
auto ip = co_await host2ip(node);
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets);
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets);
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)});
ss.push_back(sink_and_source{node, std::move(sink), std::move(source)});
}
// This fiber sends data to peer node
@@ -600,7 +598,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
}
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) {
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
stream_files_response resp;
auto& table = db.find_column_family(req.table);
auto sstables = co_await table.take_storage_snapshot(req.range);
@@ -653,7 +651,7 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
req.ops_id, sstables.size(), files.size(), files, req.range);
auto ops_start_time = std::chrono::steady_clock::now();
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard);
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
resp.stream_bytes = stream_bytes;
auto duration = std::chrono::steady_clock::now() - ops_start_time;
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}",

View File

@@ -116,13 +116,13 @@ struct stream_blob_info {
};
// The handler for the STREAM_BLOB verb.
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, gms::inet_address from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
// Exposed mainly for testing
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
@@ -163,11 +163,9 @@ public:
size_t stream_bytes = 0;
};
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
// The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will
// stream sstables files specified by the stream_files_request req.
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip);
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req);
// Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb.
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard);
@@ -178,7 +176,6 @@ future<size_t> tablet_stream_files(netw::messaging_service& ms,
std::vector<node_and_shard> targets,
table_id table,
file_stream_id ops_id,
host2ip_t host2ip,
service::frozen_topology_guard topo_guard,
bool may_inject_errors = false
);

View File

@@ -294,7 +294,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
}
});
ms.register_stream_blob([this] (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
auto from = netw::messaging_service::get_source(cinfo).addr;
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = _ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(_db.local(), _ms.local(), from, meta, sink, source).handle_exception([ms = _ms.local().shared_from_this()] (std::exception_ptr eptr) {
sslog.warn("Failed to run stream blob handler: {}", eptr);
@@ -392,35 +392,14 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, nr_requests, summaries.size());
// prepare tasks
set_state(stream_session_state::PREPARING);
auto& db = manager().db();
for (auto& request : requests) {
// always flush on stream request
sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request);
const auto& ks = request.keyspace;
// Make sure cf requested by peer node exists
for (auto& cf : request.column_families) {
try {
db.find_column_family(ks, cf);
} catch (replica::no_such_column_family&) {
auto err = format("[Stream #{}] prepare requested ks={} cf={} does not exist", plan_id, ks, cf);
sslog.warn("{}", err.c_str());
throw std::runtime_error(err);
}
}
add_transfer_ranges(std::move(request.keyspace), std::move(request.ranges), std::move(request.column_families));
co_await coroutine::maybe_yield();
}
for (auto& summary : summaries) {
sslog.debug("[Stream #{}] prepare stream_summary={}", plan_id, summary);
auto cf_id = summary.cf_id;
// Make sure cf the peer node will send to us exists
try {
db.find_column_family(cf_id);
} catch (replica::no_such_column_family&) {
auto err = format("[Stream #{}] prepare cf_id={} does not exist", plan_id, cf_id);
sslog.warn("{}", err.c_str());
throw std::runtime_error(err);
}
prepare_receiving(summary);
}

View File

@@ -335,6 +335,7 @@ add_scylla_test(combined_tests
secondary_index_test.cc
sessions_test.cc
sstable_compaction_test.cc
sstable_compressor_factory_test.cc
sstable_directory_test.cc
sstable_set_test.cc
statement_restrictions_test.cc

View File

@@ -596,9 +596,10 @@ future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<>
cf_name))
.get();
f1.get();
e.get_system_keyspace().local().load_built_views().get();
auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index");
e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get();
f2.get();
}
}

View File

@@ -76,7 +76,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
if (!verb_register) {
co_await smp::invoke_on_all([&] {
return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
auto from = netw::messaging_service::get_source(cinfo).addr;
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = global_ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto path = meta.filename + suffix;
@@ -115,10 +115,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
co_return make_file_input_stream(std::move(file), foptions);
};
}
auto host2ip = [&global_db] (locator::host_id id) -> future<gms::inet_address> {
co_return global_db.local().get_token_metadata().get_topology().my_address();
};
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, host2ip, service::null_topology_guard, inject_error);
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error);
co_await mark_tablet_stream_done(ops_id);
testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes);
ret = true;

View File

@@ -812,27 +812,20 @@ SEASTAR_THREAD_TEST_CASE(background_reclaim) {
logalloc::shard_tracker().stop().get();
});
sleep(500ms).get(); // sleep a little, to give the reclaimer a head start
std::vector<managed_bytes> std_allocs;
size_t std_alloc_size = 1000000; // note that managed_bytes fragments these, even in std
for (int i = 0; i < 50; ++i) {
// Background reclaim is supposed to eventually ensure a certain amount of free memory.
while (memory::free_memory() < background_reclaim_free_memory_threshold) {
thread::maybe_yield();
}
auto compacted_pre = logalloc::shard_tracker().statistics().memory_compacted;
fmt::print("compacted {} items {} (pre)\n", compacted_pre, evictable_allocs.size());
std_allocs.emplace_back(managed_bytes::initialized_later(), std_alloc_size);
auto compacted_post = logalloc::shard_tracker().statistics().memory_compacted;
fmt::print("compacted {} items {} (post)\n", compacted_post, evictable_allocs.size());
BOOST_REQUIRE_EQUAL(compacted_pre, compacted_post);
// Pretend to do some work. Sleeping would be too easy, as the background reclaim group would use
// all that time.
//
// Use thread_cputime_clock to prevent overcommitted test machines from stealing CPU time
// and causing test failures.
auto deadline = thread_cputime_clock::now() + 100ms;
while (thread_cputime_clock::now() < deadline) {
thread::maybe_yield();
}
}
}

View File

@@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
}
SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
return sstables::test_env::do_with([] (sstables::test_env& env) {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type)
@@ -581,7 +581,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
cfg.enable_incremental_backups = false;
cfg.cf_stats = &*cf_stats;
return with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) {
return seastar::async([&env, s, &cf] {
// populate
auto new_key = [&] {
@@ -645,7 +645,7 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
flushed.get();
});
}).then([cf_stats] {});
}).get();
});
}

View File

@@ -530,6 +530,22 @@ static void reverse(schema_ptr s, mutation_partition& m) {
m = std::move(reverse(mutation(s, std::move(dk), std::move(m))).partition());
}
void assert_has_same_squashed_continuity(const mutation_partition& actual, mvcc_partition& expected) {
const schema& s = *expected.schema();
auto expected_cont = expected.entry().squashed_continuity(s);
auto actual_cont = actual.get_continuity(s);
bool actual_static_cont = actual.static_row_continuous();
bool expected_static_cont = expected.squashed().static_row_continuous();
if (actual_static_cont != expected_static_cont) {
BOOST_FAIL(format("Static row continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
expected_static_cont, actual_static_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
}
if (!expected_cont.equals(s, actual_cont)) {
BOOST_FAIL(format("Continuity doesn't match, expected: {}\nbut got: {}, partition entry (expected): {}\n ...and mutation (actual): {}",
expected_cont, actual_cont, partition_entry::printer(expected.entry()), mutation_partition::printer(s, actual)));
}
}
SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
// Tests that reading many versions using a cursor gives the logical mutation back.
return seastar::async([] {
@@ -558,7 +574,21 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) {
auto snap = e.read();
auto actual = read_using_cursor(*snap);
assert_that(s, actual).has_same_continuity(expected);
// Checks that the squashed continuity of `e` is equal to continuity of `actual`.
// Note: squashed continuity of an entry is slightly different than the continuity
// of a squashed entry.
//
// Squashed continuity is the union of continuities of all versions in the entry,
// and in particular it includes empty dummy rows resulting in the logical merge
// of version.
// The process of actually squashing an entry is allowed to
// remove those empty dummies, so the squashed entry can have slightly
// smaller continuity.
//
// Since a cursor isn't allowed to remove dummy rows, the strongest test
// we can do here is to compare the continuity of the cursor-read mutation
// with the squashed continuity of the entry.
assert_has_same_squashed_continuity(actual, e);
assert_that(s, actual).is_equal_to_compacted(expected);
// Reversed iteration

View File

@@ -1666,7 +1666,7 @@ SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
}
SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
return test_env::do_with([] (test_env& env) {
return test_env::do_with_async([] (test_env& env) {
auto ts = 1451001601000L; // 2015-12-25 @ 00:00:01, in milliseconds
auto ts_in_ms = std::chrono::milliseconds(ts);
auto ts_in_us = std::chrono::duration_cast<std::chrono::microseconds>(ts_in_ms);
@@ -1702,7 +1702,6 @@ SEASTAR_TEST_CASE(time_window_strategy_ts_resolution_check) {
BOOST_REQUIRE(ret.second == expected);
}
return make_ready_future<>();
});
}
@@ -2319,6 +2318,7 @@ public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
private:
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<test_env> _env;
uint32_t _seed;
std::unique_ptr<tests::random_schema_specification> _random_schema_spec;
@@ -2336,7 +2336,7 @@ public:
compress))
, _random_schema(_seed, *_random_schema_spec)
{
_env.start().get();
_env.start(test_env_config(), std::ref(*scf)).get();
testlog.info("random_schema: {}", _random_schema.cql());
}
@@ -2402,12 +2402,14 @@ public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
private:
std::unique_ptr<sstable_compressor_factory> scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<test_env> _env;
public:
scrub_test_framework()
{
_env.start().get();
_env.start(test_env_config(), std::ref(*scf)).get();
}
~scrub_test_framework() {

View File

@@ -0,0 +1,133 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#undef SEASTAR_TESTING_MAIN
#include <fmt/ranges.h>
#include <seastar/util/defer.hh>
#include <seastar/testing/thread_test_case.hh>
#include "sstables/sstable_compressor_factory.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
// 1. Create a random message.
// 2. Set this random message as the recommended dict.
// 3. On all shards, create compressors.
// 4. Check that they are using the recommended dict (i.e. that the original message compresses perfectly).
// 5. Check that the used dictionaries are owned by shards on the same NUMA node.
// 6. Check that the number of dictionary copies is equal to number of NUMA nodes.
// 7. Repeat this a few times for both lz4 and zstd.
void test_one_numa_topology(std::span<unsigned> shard_to_numa_mapping) {
testlog.info("Testing NUMA topology {}", shard_to_numa_mapping);
// Create a compressor factory.
SCYLLA_ASSERT(shard_to_numa_mapping.size() == smp::count);
auto config = default_sstable_compressor_factory::config{
.numa_config = std::vector(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()),
};
sharded<default_sstable_compressor_factory> sstable_compressor_factory;
sstable_compressor_factory.start(std::cref(config)).get();
auto stop_compressor_factory = defer([&sstable_compressor_factory] { sstable_compressor_factory.stop().get(); });
// The factory keeps recommended dicts (i.e. dicts for writing) per table ID.
auto table = table_id::create_random_id();
// Retry a few times just to check that it works more than once.
for (int retry = 0; retry < 3; ++retry) {
// Generate a random (and hence uhcompressible without a dict) message.
auto message = tests::random::get_sstring(4096);
auto dict_view = std::as_bytes(std::span(message));
// Set the message as the dict to make the message perfectly compressible.
sstable_compressor_factory.local().set_recommended_dict(table, dict_view).get();
// We'll put the owners here to check that the number of owners matches the number of NUMA nodes.
std::vector<unsigned> compressor_numa_nodes(smp::count);
std::vector<unsigned> decompressor_numa_nodes(smp::count);
// Try for both algorithms, just in case there are some differences in how dictionary
// distribution over shards is implemented between them.
for (const auto algo : {compressor::algorithm::lz4_with_dicts, compressor::algorithm::zstd_with_dicts}) {
sstable_compressor_factory.invoke_on_all(coroutine::lambda([&] (default_sstable_compressor_factory& local) -> seastar::future<> {
// Validate that the dictionaries work as intended,
// and check that their owner is as expected.
auto params = compression_parameters(algo);
auto compressor = co_await local.make_compressor_for_writing_for_tests(params, table);
auto decompressor = co_await local.make_compressor_for_reading_for_tests(params, dict_view);
auto our_numa_node = shard_to_numa_mapping[this_shard_id()];
auto compressor_numa_node = shard_to_numa_mapping[compressor->get_dict_owner_for_test().value()];
auto decompressor_numa_node = shard_to_numa_mapping[decompressor->get_dict_owner_for_test().value()];
// Check that the dictionary used by this shard lies on the same NUMA node.
// This is important to avoid cross-node memory accesses on the hot path.
BOOST_CHECK_EQUAL(our_numa_node, compressor_numa_node);
BOOST_CHECK_EQUAL(our_numa_node, decompressor_numa_node);
compressor_numa_nodes[this_shard_id()] = compressor_numa_node;
decompressor_numa_nodes[this_shard_id()] = compressor_numa_node;
auto output_max_size = compressor->compress_max_size(message.size());
auto compressed = std::vector<char>(output_max_size);
auto compressed_size = compressor->compress(
reinterpret_cast<const char*>(message.data()), message.size(),
reinterpret_cast<char*>(compressed.data()), compressed.size());
BOOST_REQUIRE_GE(compressed_size, 0);
compressed.resize(compressed_size);
// Validate that the recommeded dict was actually used.
BOOST_CHECK(compressed.size() < message.size() / 10);
auto decompressed = std::vector<char>(message.size());
auto decompressed_size = decompressor->uncompress(
reinterpret_cast<const char*>(compressed.data()), compressed.size(),
reinterpret_cast<char*>(decompressed.data()), decompressed.size());
BOOST_REQUIRE_GE(decompressed_size, 0);
decompressed.resize(decompressed_size);
// Validate that the roundtrip through compressor and decompressor
// resulted in the original message.
BOOST_CHECK_EQUAL_COLLECTIONS(message.begin(), message.end(), decompressed.begin(), decompressed.end());
})).get();
}
// Check that the number of owners (and hence, copies) is equal to the number
// of NUMA nodes.
// This isn't that important, but we don't want to duplicate dictionaries
// within a NUMA node unnecessarily.
BOOST_CHECK_EQUAL(
std::set(compressor_numa_nodes.begin(), compressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
BOOST_CHECK_EQUAL(
std::set(decompressor_numa_nodes.begin(), decompressor_numa_nodes.end()).size(),
std::set(shard_to_numa_mapping.begin(), shard_to_numa_mapping.end()).size()
);
}
}
SEASTAR_THREAD_TEST_CASE(test_numa_awareness) {
{
std::vector<unsigned> one_numa_node(smp::count);
test_one_numa_topology(one_numa_node);
}
{
std::vector<unsigned> two_numa_nodes(smp::count);
for (size_t i = 0; i < two_numa_nodes.size(); ++i) {
two_numa_nodes[i] = i % 2;
}
test_one_numa_topology(two_numa_nodes);
}
{
std::vector<unsigned> n_numa_nodes(smp::count);
for (size_t i = 0; i < n_numa_nodes.size(); ++i) {
n_numa_nodes[i] = i;
}
test_one_numa_topology(n_numa_nodes);
}
}

View File

@@ -466,8 +466,8 @@ static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_p
}
SEASTAR_TEST_CASE(check_read_indexes) {
return test_env::do_with([] (test_env& env) {
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return seastar::async([&env, version] {
auto builder = schema_builder("test", "summary_test")
.with_column("a", int32_type, column_kind::partition_key);
@@ -478,7 +478,7 @@ SEASTAR_TEST_CASE(check_read_indexes) {
auto list = sstables::test(sst).read_indexes(env.make_reader_permit()).get();
BOOST_REQUIRE(list.size() == 130);
});
});
}).get();
});
}
@@ -499,8 +499,8 @@ SEASTAR_TEST_CASE(check_multi_schema) {
// d int,
// e blob
//);
return test_env::do_with([] (test_env& env) {
return for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return seastar::async([&env, version] {
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
auto builder = schema_builder("test", "test_multi_schema")
@@ -532,7 +532,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
BOOST_REQUIRE(!m);
});
});
});
}).get();
});
}
@@ -2413,7 +2413,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
}
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
return test_env::do_with([] (test_env& env) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
@@ -2441,8 +2441,6 @@ SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
BOOST_REQUIRE(insert(2, 2) == true);
BOOST_REQUIRE(insert(5, 5) == true);
BOOST_REQUIRE(run.all().size() == 5);
return make_ready_future<>();
});
}

View File

@@ -32,7 +32,9 @@ static auto copy_sst_to_tmpdir(fs::path tmp_path, test_env& env, sstables::schem
SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -56,7 +58,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move) {
SEASTAR_THREAD_TEST_CASE(test_sstable_move_idempotent) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -100,7 +104,8 @@ static bool partial_create_links(sstable_ptr sst, fs::path dst_path, sstables::g
SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
sstables::sstable_generation_generator gen_generator{0};
@@ -121,7 +126,9 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_move_replay) {
SEASTAR_THREAD_TEST_CASE(test_sstable_move_exists_failure) {
tmpdir tmp;
auto env = test_env();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
auto env = test_env({}, *scf);
auto stop_env = defer([&env] { env.stop().get(); });
// please note, the SSTables used by this test are stored under

View File

@@ -14,11 +14,12 @@ from test.pylib.internal_types import HostID
import pytest
import asyncio
import logging
import time
from test.cluster.conftest import skip_mode
from test.cluster.util import get_topology_coordinator, find_server_by_host_id
from test.cluster.mv.tablets.test_mv_tablets import get_tablet_replicas
from test.cluster.util import new_test_keyspace
from test.cluster.util import new_test_keyspace, wait_for
logger = logging.getLogger(__name__)
@@ -44,6 +45,14 @@ async def test_tablet_mv_replica_pairing_during_replace(manager: ManagerClient):
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.tv AS SELECT * FROM {ks}.test WHERE c IS NOT NULL AND pk IS NOT NULL PRIMARY KEY (c, pk) WITH SYNCHRONOUS_UPDATES = TRUE")
async def replicas_balanced():
base_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "test", 0)]
view_replicas = [replica[0] for replica in await get_tablet_replicas(manager, servers[0], ks, "tv", 0)]
return len(set(base_replicas) & set(view_replicas)) == 0 or None
# There's 4 nodes and 4 tablets, so even if the initial placement is not balanced,
# each node should get 1 replica after some time.
await wait_for(replicas_balanced, time.time() + 60)
# Disable migrations concurrent with replace since we don't handle nodes going down during migration yet.
# See https://github.com/scylladb/scylladb/issues/16527
await manager.api.disable_tablet_balancing(servers[0].ip_addr)

View File

@@ -401,20 +401,21 @@ class topo:
self.dcs = dcs
@pytest.mark.asyncio
@pytest.mark.parametrize("topology", [
topo(rf = 1, nodes = 3, racks = 1, dcs = 1),
topo(rf = 3, nodes = 5, racks = 1, dcs = 1),
topo(rf = 1, nodes = 4, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 2, dcs = 1),
topo(rf = 3, nodes = 6, racks = 3, dcs = 1),
topo(rf = 2, nodes = 8, racks = 4, dcs = 2)
@pytest.mark.parametrize("topology_rf_validity", [
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
(topo(rf = 3, nodes = 6, racks = 3, dcs = 1), True),
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
])
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology):
async def test_restore_with_streaming_scopes(manager: ManagerClient, s3_server, topology_rf_validity):
'''Check that restoring of a cluster with stream scopes works'''
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks')
topology, rf_rack_valid_keyspaces = topology_rf_validity
logger.info(f'Start cluster with {topology.nodes} nodes in {topology.dcs} DCs, {topology.racks} racks, rf_rack_valid_keyspaces: {rf_rack_valid_keyspaces}')
objconf = MinioServer.create_conf(s3_server.address, s3_server.port, s3_server.region)
cfg = { 'object_storage_endpoints': objconf, 'task_ttl_in_seconds': 300 }
cfg = { 'object_storage_endpoints': objconf, 'task_ttl_in_seconds': 300, 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces }
cmd = [ '--logger-log-level', 'sstables_loader=debug:sstable_directory=trace:snapshots=trace:s3=trace:sstable=debug:http=debug' ]
servers = []
host_ids = {}

View File

@@ -474,7 +474,7 @@ async def add_new_node(manager: ManagerClient,
yield
LOGGER.info("Add a new node to the cluster")
await manager.server_add(timeout=TOPOLOGY_TIMEOUT)
await manager.server_add(config={"rf_rack_valid_keyspaces": False}, timeout=TOPOLOGY_TIMEOUT)
yield

View File

@@ -67,7 +67,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None:
async def four_nodes_cluster(manager: ManagerClient) -> None:
LOGGER.info("Booting initial 4-node cluster.")
for _ in range(4):
server = await manager.server_add()
server = await manager.server_add(config={"rf_rack_valid_keyspaces": False})
await manager.api.enable_injection(
node_ip=server.ip_addr,
injection="raft_server_set_snapshot_thresholds",
@@ -93,6 +93,8 @@ async def test_random_failures(manager: ManagerClient,
TESTS_COUNT, TESTS_SHUFFLE_SEED, ERROR_INJECTIONS_COUNT, CLUSTER_EVENTS_COUNT,
)
rf_rack_cfg = {"rf_rack_valid_keyspaces": False}
table = await random_tables.add_table(ncolumns=5)
await table.insert_seq()
@@ -116,7 +118,7 @@ async def test_random_failures(manager: ManagerClient,
)
coordinator_log = await manager.server_open_log(server_id=coordinator.server_id)
coordinator_log_mark = await coordinator_log.mark()
s_info = await manager.server_add(expected_server_up_state=ServerUpState.PROCESS_STARTED)
s_info = await manager.server_add(config=rf_rack_cfg, expected_server_up_state=ServerUpState.PROCESS_STARTED)
await coordinator_log.wait_for(
pattern="topology_coordinator_pause_after_updating_cdc_generation: waiting",
from_mark=coordinator_log_mark,
@@ -128,7 +130,7 @@ async def test_random_failures(manager: ManagerClient,
)
else:
s_info = await manager.server_add(
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]},
config={"error_injections_at_startup": [{"name": error_injection, "one_shot": True}]} | rf_rack_cfg,
expected_server_up_state=ServerUpState.PROCESS_STARTED,
)

View File

@@ -6,6 +6,7 @@ extra_scylla_config_options:
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
enable_user_defined_functions: False
rf_rack_valid_keyspaces: True
tablets_mode_for_new_keyspaces: enabled
run_first:
- test_raft_recovery_stuck

View File

@@ -20,7 +20,7 @@ from test.pylib.util import wait_for_cql_and_get_hosts, wait_for
from test.cluster.util import reconnect_driver
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_nodes_cluster
pytestmark = pytest.mark.prepare_3_racks_cluster
@@ -117,6 +117,9 @@ async def test_change_two(manager, random_tables, build_mode):
# IP-s before they are send back to servers[1] and servers[2],
# and the mentioned above code is not exercised by this test.
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
# sleep_before_start_gossiping injections are needed to reproduce #22777
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_start(servers[1].server_id)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
if build_mode != 'release':

View File

@@ -32,7 +32,7 @@ N_SERVERS = 2
@pytest.fixture
async def two_nodes_cluster(manager: ManagerClient) -> list[ServerNum]:
logger.info(f"Booting initial 2-nodes cluster")
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS)]
servers = [srv.server_id for srv in await manager.servers_add(N_SERVERS, auto_rack_dc="dc1")]
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
return servers

View File

@@ -42,7 +42,7 @@ async def test_read_repair_with_conflicting_hash_keys(request: pytest.FixtureReq
"""
logger.info("Creating a new cluster")
srvs = await manager.servers_add(3)
srvs = await manager.servers_add(3, auto_rack_dc="dc1")
cql, _ = await manager.get_ready_cql(srvs)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};") as ks:

View File

@@ -15,7 +15,7 @@ from cassandra.query import SimpleStatement # type: ignore
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
logger = logging.getLogger(__name__)
@@ -39,7 +39,7 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
"""
cmdline = ["--hinted-handoff-enabled", "0", "--cache-hit-rate-read-balancing", "0", "--logger-log-level", "debug_error_injection=trace"]
nodes = await manager.servers_add(3, cmdline=cmdline)
nodes = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
node1, node2, node3 = nodes
@@ -47,19 +47,6 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
host1, host2, host3 = await wait_for_cql_and_get_hosts(cql, nodes, time.time() + 30)
def execute_with_tracing(cql, statement, *args, **kwargs):
kwargs['trace'] = True
query_result = cql.execute(statement, *args, **kwargs)
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
page_traces = []
for trace in tracing:
trace_events = []
for event in trace.events:
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
page_traces.append("\n".join(trace_events))
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = { 'enabled': true }") as ks:
cql.execute(f"CREATE TABLE {ks}.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
" WITH speculative_retry = 'NONE'"
@@ -67,9 +54,9 @@ async def run_test_cache_tombstone_gc(manager: ManagerClient, statement_pairs: l
" AND compaction = {'class': 'NullCompactionStrategy'}")
for write_statement, delete_statement in statement_pairs:
execute_with_tracing(cql, write_statement.format(ks=ks))
execute_with_tracing(cql, write_statement.format(ks=ks), log = True)
await manager.api.enable_injection(node3.ip_addr, "database_apply", one_shot=False)
execute_with_tracing(cql, delete_statement.format(ks=ks))
execute_with_tracing(cql, delete_statement.format(ks=ks), log = True)
await manager.api.disable_injection(node3.ip_addr, "database_apply")
def check_data(host, data):

View File

@@ -64,12 +64,15 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
cfg = {'tablets_mode_for_new_keyspaces' : 'enabled' if tablets_enabled else 'disabled'}
logger.info("Bootstrapping first two nodes")
servers = await manager.servers_add(2, config=cfg)
servers = await manager.servers_add(2, config=cfg, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"}
])
# The third node is started as the last one, so we can be sure that is has
# the latest topology version
logger.info("Bootstrapping the last node")
servers += [await manager.server_add(config=cfg)]
servers += [await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": "r3"})]
# Disable load balancer as it might bump topology version, undoing the decrement below.
# This should be done before adding the last two servers,
@@ -123,9 +126,10 @@ async def test_fence_writes(request, manager: ManagerClient, tablets_enabled: bo
@skip_mode('release', 'error injections are not supported in release mode')
async def test_fence_hints(request, manager: ManagerClient):
logger.info("Bootstrapping cluster with three nodes")
s0 = await manager.server_add(config={
'error_injections_at_startup': ['decrease_hints_flush_period']
}, cmdline=['--logger-log-level', 'hints_manager=trace'])
s0 = await manager.server_add(
config={'error_injections_at_startup': ['decrease_hints_flush_period']},
cmdline=['--logger-log-level', 'hints_manager=trace'],
property_file={"dc": "dc1", "rack": "r1"})
# Disable load balancer as it might bump topology version, potentially creating a race condition
# with read modify write below.
@@ -134,7 +138,10 @@ async def test_fence_hints(request, manager: ManagerClient):
# which the test relies on.
await manager.api.disable_tablet_balancing(s0.ip_addr)
[s1, s2] = await manager.servers_add(2)
[s1, s2] = await manager.servers_add(2, property_file=[
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"}
])
logger.info(f'Creating test table')
random_tables = RandomTables(request.node.name, manager, unique_name(), 3)

View File

@@ -10,7 +10,7 @@ import uuid
import logging
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_nodes_cluster
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
@@ -21,8 +21,8 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
since ignore node is permanent now and B is removed from the quorum early so it is enough to
have two live nodes for the quorum.
"""
await manager.servers_add(2)
servers = await manager.running_servers()
servers += await manager.servers_add(2, property_file=[servers[1].property_file(), servers[2].property_file()])
await manager.server_stop_gracefully(servers[3].server_id)
await manager.server_stop_gracefully(servers[4].server_id)
# test that non existing uuid is rejected
@@ -37,6 +37,6 @@ async def test_global_ignored_nodes_list(manager: ManagerClient, random_tables)
# is 2
await manager.server_stop_gracefully(servers[2].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(start=False, replace_cfg=replace_cfg)
await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[2].property_file())

View File

@@ -58,10 +58,10 @@ async def test_putget_2dc_with_rf(
table_name = "test_table_name"
columns = [Column("name", TextType), Column("value", TextType)]
logger.info("Create two servers in different DC's")
for i in nodes_list:
for rack_idx, dc_idx in enumerate(nodes_list):
s_info = await manager.server_add(
config=CONFIG,
property_file={"dc": f"dc{i}", "rack": "myrack"},
property_file={"dc": f"dc{dc_idx}", "rack": f"rack{rack_idx}"},
)
logger.info(s_info)
conn = manager.get_cql()

View File

@@ -23,14 +23,15 @@ async def test_not_enough_token_owners(manager: ManagerClient):
"""
logging.info('Trying to add a zero-token server as the first server in the cluster')
await manager.server_add(config={'join_ring': False},
property_file={"dc": "dc1", "rack": "rz"},
expected_error='Cannot start the first node in the cluster as zero-token')
logging.info('Adding the first server')
server_a = await manager.server_add()
server_a = await manager.server_add(property_file={"dc": "dc1", "rack": "r1"})
logging.info('Adding two zero-token servers')
# The second server is needed only to preserve the Raft majority.
server_b = (await manager.servers_add(2, config={'join_ring': False}))[0]
server_b = (await manager.servers_add(2, config={'join_ring': False}, property_file={"dc": "dc1", "rack": "rz"}))[0]
logging.info(f'Trying to decommission the only token owner {server_a}')
await manager.decommission_node(server_a.server_id,
@@ -47,7 +48,7 @@ async def test_not_enough_token_owners(manager: ManagerClient):
await manager.server_start(server_a.server_id)
logging.info('Adding a normal server')
await manager.server_add()
await manager.server_add(property_file={"dc": "dc1", "rack": "r2"})
cql = manager.get_cql()

View File

@@ -43,6 +43,10 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
previous state. For replace, add a single node (a sanity check verifying that the cluster is functioning properly).
7. Stop sending writes.
"""
# Currently, the constraints imposed by `rf_rack_valid_keyspaces` are quite strict
# and adjusting this test to working with it may require significant changes in the test.
# Let's disable the option explicitly until we do that.
rf_rack_cfg = {'rf_rack_valid_keyspaces': False}
# Decrease failure_detector_timeout_in_ms from the default 20 s to speed up some graceful shutdowns in the test.
# Shutting down the CQL server can hang for failure_detector_timeout_in_ms in the presence of dead nodes and
# CQL requests.
@@ -50,7 +54,8 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
'endpoint_snitch': 'GossipingPropertyFileSnitch',
'tablets_mode_for_new_keyspaces': 'enabled',
'failure_detector_timeout_in_ms': 2000,
}
} | rf_rack_cfg
property_file_dc1 = {'dc': 'dc1', 'rack': 'rack1'}
property_file_dc2 = {'dc': 'dc2', 'rack': 'rack2'}
@@ -150,7 +155,7 @@ async def test_raft_recovery_user_data(manager: ManagerClient, remove_dead_nodes
for i, being_replaced in enumerate(dead_servers):
replace_cfg = ReplaceConfig(replaced_id=being_replaced.server_id, reuse_ip_addr=False, use_host_id=True,
ignore_dead_nodes=[dead_srv.ip_addr for dead_srv in dead_servers[i + 1:]])
new_servers.append(await manager.server_add(replace_cfg=replace_cfg, property_file=property_file_dc2))
new_servers.append(await manager.server_add(replace_cfg=replace_cfg, config=rf_rack_cfg, property_file=property_file_dc2))
logging.info(f'Unsetting the recovery_leader config option on {live_servers}')
for srv in live_servers:

View File

@@ -15,9 +15,10 @@ from cassandra.cluster import ConsistencyLevel, Session # type: ignore
from cassandra.query import SimpleStatement # type: ignore
from cassandra.pool import Host # type: ignore
from test.pylib.util import wait_for_cql_and_get_hosts
from test.pylib.util import wait_for_cql_and_get_hosts, execute_with_tracing
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
@@ -309,13 +310,13 @@ async def test_incremental_read_repair(data_class: DataClass, manager: ManagerCl
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_read_repair_with_trace_logging(request, manager):
logger.info("Creating a new cluster")
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace"]
cmdline = ["--hinted-handoff-enabled", "0", "--logger-log-level", "mutation_data=trace:debug_error_injection=trace"]
config = {"read_request_timeout_in_ms": 60000}
for i in range(2):
await manager.server_add(cmdline=cmdline, config=config)
[node1, node2] = await manager.servers_add(2, cmdline=cmdline, config=config, auto_rack_dc="dc1")
cql = manager.get_cql()
srvs = await manager.running_servers()
@@ -326,13 +327,15 @@ async def test_read_repair_with_trace_logging(request, manager):
await cql.run_async(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 0)")
await manager.server_stop(srvs[0].server_id)
prepared = cql.prepare(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)")
prepared.consistency_level = ConsistencyLevel.ONE
await cql.run_async(prepared)
await manager.api.enable_injection(node1.ip_addr, "database_apply", one_shot=True)
await cql.run_async(SimpleStatement(f"INSERT INTO {ks}.t (pk, c) VALUES (0, 1)", consistency_level = ConsistencyLevel.ONE))
await manager.server_start(srvs[0].server_id)
tracing = execute_with_tracing(cql, SimpleStatement(f"SELECT * FROM {ks}.t WHERE pk = 0", consistency_level = ConsistencyLevel.ALL), log = True)
prepared = cql.prepare(f"SELECT * FROM {ks}.t WHERE pk = 0")
prepared.consistency_level = ConsistencyLevel.ALL
await cql.run_async(prepared)
assert len(tracing) == 1 # 1 page
found_read_repair = False
for event in tracing[0]:
found_read_repair |= "digest mismatch, starting read repair" == event.description
assert found_read_repair

View File

@@ -22,7 +22,7 @@ async def test_remove_rpc_client_with_pending_requests(request, manager: Manager
# Regression test for #17445
logger.info("starting first two nodes")
servers = await manager.servers_add(2)
servers = await manager.servers_add(2, auto_rack_dc="dc1")
logger.info(f"wait_for_cql_and_get_hosts for the first node {servers[0]}")
host0 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[0]], time.time() + 60))[0]
@@ -48,7 +48,7 @@ async def test_remove_rpc_client_with_pending_requests(request, manager: Manager
expected_data.sort()
logger.info(f"adding the third node")
servers += [await manager.server_add(start=False)]
servers += [await manager.server_add(start=False, property_file=servers[0].property_file())]
logger.info(f"starting the third node [{servers[2]}]")
third_node_future = asyncio.create_task(manager.server_start(servers[2].server_id))

View File

@@ -40,8 +40,7 @@ async def test_enable_compacting_data_for_streaming_and_repair_live_update(manag
silently broken in the past.
"""
cmdline = ["--enable-compacting-data-for-streaming-and-repair", "0", "--smp", "1", "--logger-log-level", "api=trace"]
node1 = await manager.server_add(cmdline=cmdline)
node2 = await manager.server_add(cmdline=cmdline)
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
@@ -89,8 +88,7 @@ async def test_tombstone_gc_for_streaming_and_repair(manager):
"--hinted-handoff-enabled", "0",
"--smp", "1",
"--logger-log-level", "api=trace:database=trace"]
node1 = await manager.server_add(cmdline=cmdline)
node2 = await manager.server_add(cmdline=cmdline)
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
@@ -149,10 +147,7 @@ async def test_tombstone_gc_for_streaming_and_repair(manager):
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_succeeds_with_unitialized_bm(manager):
await manager.server_add()
await manager.server_add()
servers = await manager.running_servers()
servers = await manager.servers_add(2, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
@@ -171,8 +166,7 @@ async def do_batchlog_flush_in_repair(manager, cache_time_in_ms):
total_repair_duration = 0
cmdline = ["--repair-hints-batchlog-flush-cache-time-in-ms", str(cache_time_in_ms), "--smp", "1", "--logger-log-level", "api=trace"]
node1 = await manager.server_add(cmdline=cmdline)
node2 = await manager.server_add(cmdline=cmdline)
node1, node2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
@@ -226,10 +220,7 @@ async def test_batchlog_flush_in_repair_without_cache(manager):
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_abort(manager):
cfg = {'tablets_mode_for_new_keyspaces': 'enabled'}
await manager.server_add(config=cfg)
await manager.server_add(config=cfg)
servers = await manager.running_servers()
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
cql = manager.get_cql()
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")

View File

@@ -78,7 +78,7 @@ async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> Non
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
@@ -90,7 +90,7 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
await manager.server_stop_gracefully(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False)
replace_future = asyncio.create_task(manager.server_add(replace_cfg))
replace_future = asyncio.create_task(manager.server_add(replace_cfg, property_file=servers[0].property_file()))
start_time = time.time()
next_id = 0
logger.info(f"running write requests in a loop while the replacing node is starting")

View File

@@ -27,7 +27,7 @@ async def test_reversed_queries_during_upgrade(manager: ManagerClient) -> None:
in order to test both native and legacy reversed formats.
"""
cmdline = ["--hinted-handoff-enabled", "0"]
node1, _ = await manager.servers_add(2, cmdline)
node1, _ = await manager.servers_add(2, cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()

View File

@@ -61,7 +61,7 @@ async def test_basic(manager: ManagerClient) -> None:
'internode_compression': "all",
'internode_compression_zstd_max_cpu_fraction': 0.0}
logger.info(f"Booting initial cluster")
servers = await manager.servers_add(servers_num=2, config=cfg)
servers = await manager.servers_add(servers_num=2, config=cfg, auto_rack_dc="dc1")
cql = manager.get_cql()
@@ -108,7 +108,7 @@ async def test_dict_training(manager: ManagerClient) -> None:
'--logger-log-level=dict_training=trace'
]
logger.info(f"Booting initial cluster")
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline)
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
@@ -170,7 +170,7 @@ async def test_external_dicts(manager: ManagerClient) -> None:
'--logger-log-level=advanced_rpc_compressor=debug'
]
logger.info(f"Booting initial cluster")
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline)
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
@@ -233,7 +233,7 @@ async def test_external_dicts_sanity(manager: ManagerClient) -> None:
'--logger-log-level=advanced_rpc_compressor=debug',
]
logger.info(f"Booting initial cluster")
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline)
servers = await manager.servers_add(servers_num=2, config=cfg, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()

View File

@@ -16,7 +16,7 @@ from test.pylib.manager_client import ManagerClient
@pytest.mark.asyncio
async def test_sticky_coordinator_enforced(manager: ManagerClient) -> None:
await manager.servers_add(2, cmdline=['--logger-log-level', 'paging=trace'])
await manager.servers_add(2, cmdline=['--logger-log-level', 'paging=trace'], auto_rack_dc="dc1")
cql = manager.get_cql()

View File

@@ -14,7 +14,7 @@ from cassandra.query import SimpleStatement # type: ignore # pylint
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_nodes_cluster
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
@@ -39,7 +39,7 @@ async def test_snapshot(manager, random_tables):
await t.add_column()
manager.driver_close()
server_d = await manager.server_add()
server_d = await manager.server_add(property_file=server_a.property_file())
logger.info("Started D %s", server_d)
logger.info("Stopping A %s, B %s, and C %s", server_a, server_b, server_c)

View File

@@ -54,14 +54,14 @@ async def test_autoretrain_dict(manager: ManagerClient):
uncompressed_size = blob_size * n_blobs * rf
logger.info("Bootstrapping cluster")
servers = (await manager.servers_add(2, cmdline=[
servers = await manager.servers_add(2, cmdline=[
'--logger-log-level=storage_service=debug',
'--logger-log-level=database=debug',
'--logger-log-level=sstable_dict_autotrainer=debug',
'--sstable-compression-dictionaries-retrain-period-in-seconds=1',
'--sstable-compression-dictionaries-autotrainer-tick-period-in-seconds=1',
f'--sstable-compression-dictionaries-min-training-dataset-bytes={int(uncompressed_size/2)}',
]))
], auto_rack_dc="dc1")
logger.info("Creating table")
cql = manager.get_cql()

View File

@@ -74,7 +74,7 @@ async def test_retrain_dict(manager: ManagerClient):
servers = (await manager.servers_add(2, cmdline=[
*common_debug_cli_options,
]))
], auto_rack_dc="dc1"))
logger.info("Creating table")
cql = manager.get_cql()
@@ -185,7 +185,7 @@ async def test_estimate_compression_ratios(manager: ManagerClient):
servers = (await manager.servers_add(2, cmdline=[
*common_debug_cli_options,
]))
], auto_rack_dc="dc1"))
cql = manager.get_cql()
@@ -346,13 +346,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
LZ4WithDictsCompressor and ZstdWithDictsCompressor
to the Cassandra-compatible LZ4Compressor and ZstdCompressor.
"""
servers = await manager.servers_add(1)
servers = await manager.servers_add(1, [
*common_debug_cli_options,
])
# Create keyspace and table
logger.info("Creating tables")
cql = manager.get_cql()
algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
dict_algorithms = ['LZ4WithDicts', 'ZstdWithDicts']
nondict_algorithms = ['Snappy', 'LZ4', 'Deflate', 'Zstd']
algorithms = dict_algorithms + nondict_algorithms
no_compression = 'NoCompression'
all_tables = dict_algorithms + nondict_algorithms + [no_compression]
await cql.run_async("""
CREATE KEYSPACE test
@@ -363,14 +369,19 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
CREATE TABLE test."{algo}" (pk int PRIMARY KEY, c blob)
WITH COMPRESSION = {{'sstable_compression': '{algo}Compressor'}};
''')
for algo in algorithms
])
for algo in algorithms],
cql.run_async(f'''
CREATE TABLE test."{no_compression}" (pk int PRIMARY KEY, c blob)
WITH COMPRESSION = {{}}
''')
)
# Populate data with
blob = random.randbytes(16*1024);
logger.info("Populating table")
n_blobs = 100
for algo in algorithms:
for algo in all_tables:
insert = cql.prepare(f'''INSERT INTO test."{algo}" (pk, c) VALUES (?, ?);''')
insert.consistency_level = ConsistencyLevel.ALL;
for pks in itertools.batched(range(n_blobs), n=100):
@@ -381,7 +392,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
async def validate_select():
cql = manager.get_cql()
for algo in algorithms:
for algo in all_tables:
select = cql.prepare(f'''SELECT c FROM test."{algo}" WHERE pk = ? BYPASS CACHE;''')
results = await cql.run_async(select, [42])
assert results[0][0] == blob
@@ -424,7 +435,7 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
names = set()
for table_info in sstable_info:
for sstable in table_info["sstables"]:
for prop in sstable["extended_properties"]:
for prop in sstable.get("extended_properties", []):
if prop["group"] == "compression_parameters":
for attr in prop["attributes"]:
if attr["key"] == "sstable_compression":
@@ -433,18 +444,24 @@ async def test_sstable_compression_dictionaries_enable_writing(manager: ManagerC
await asyncio.gather(*[
manager.api.retrain_dict(servers[0].ip_addr, "test", algo)
for algo in algorithms
for algo in all_tables
])
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
name_prefix = "org.apache.cassandra.io.compress."
for algo in algorithms:
for algo in dict_algorithms:
assert (await get_compressor_names(algo)) == {f"{algo}Compressor"}
for algo in nondict_algorithms:
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
assert (await get_compressor_names(no_compression)) == set()
await live_update_config(manager, servers, 'sstable_compression_dictionaries_enable_writing', "false")
await manager.api.keyspace_upgrade_sstables(servers[0].ip_addr, "test")
name_prefix = "org.apache.cassandra.io.compress."
assert (await get_compressor_names("LZ4WithDicts")) == {name_prefix + "LZ4Compressor"}
assert (await get_compressor_names("ZstdWithDicts")) == {name_prefix + "ZstdCompressor"}
for algo in nondict_algorithms:
assert (await get_compressor_names(algo)) == {name_prefix + f"{algo}Compressor"}
assert (await get_compressor_names(no_compression)) == set()

View File

@@ -232,6 +232,7 @@ def check_repairs(row_num_before: list[int], row_num_after: list[int], expected_
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.parametrize("included_host_count", [2, 1, 0])
async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_count):
injection = "handle_tablet_migration_repair_fail"
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
hosts_filter = "00000000-0000-0000-0000-000000000000"
if included_host_count == 1:
@@ -243,7 +244,7 @@ async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_
token = -1
async def repair_task():
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
await inject_error_on(manager, injection, servers)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, hosts_filter=hosts_filter)
async def check_filter():
@@ -255,7 +256,7 @@ async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_
assert len(res) == 1
assert res[str(token)].repair_hosts_filter.split(",").sort() == hosts_filter.split(",").sort()
await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers)
await inject_error_off(manager, injection, servers)
await asyncio.gather(repair_task(), check_filter())
@@ -264,8 +265,8 @@ async def test_tablet_repair_hosts_filter(manager: ManagerClient, included_host_
async def prepare_multi_dc_repair(manager) -> tuple[list[ServerInfo], CassandraSession, list[Host], str, str]:
servers = [await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
await manager.server_add(property_file = {'dc': 'DC2', 'rack' : 'R2'})]
await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R2'}),
await manager.server_add(property_file = {'dc': 'DC2', 'rack' : 'R3'})]
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', "
"'DC1': 2, 'DC2': 1} AND tablets = {'initial': 8};")

View File

@@ -33,6 +33,12 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_tablet_replication_factor_enough_nodes(manager: ManagerClient):
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
# This test verifies that Scylla rejects creating a table if there are too few token-owning nodes.
# That means that a keyspace must already be in place, but that's impossible with RF-rack-valid
# keyspaces being enforced. We could go over this constraint by creating 3 nodes and then
# decommissioning one of them before attempting to create a table, but if we decide to constraint
# decommission later on, this test will have to be modified again. Let's simply disable the option.
cfg = cfg | {'rf_rack_valid_keyspaces': False}
servers = await manager.servers_add(2, config=cfg)
cql = manager.get_cql()
@@ -66,7 +72,12 @@ async def test_tablet_scaling_option_is_respected(manager: ManagerClient):
async def test_tablet_cannot_decommision_below_replication_factor(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
servers = await manager.servers_add(4, config=cfg)
servers = await manager.servers_add(4, config=cfg, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"}
])
logger.info("Creating table")
cql = manager.get_cql()
@@ -132,7 +143,7 @@ async def test_reshape_with_tablets(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_rf_change(manager: ManagerClient, direction):
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
servers = await manager.servers_add(3, config=cfg)
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
for s in servers:
await manager.api.disable_tablet_balancing(s.ip_addr)
@@ -141,14 +152,14 @@ async def test_tablet_rf_change(manager: ManagerClient, direction):
this_dc = res[0].data_center
if direction == 'up':
rf_from = 2
rf_to = 3
rf_from = 1
rf_to = 2
if direction == 'down':
rf_from = 3
rf_to = 2
if direction == 'none':
rf_from = 2
rf_to = 2
rf_to = 1
if direction == 'none':
rf_from = 1
rf_to = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from}}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
@@ -199,7 +210,11 @@ async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClien
not owned by the node is attempted to be read."""
cfg = {'enable_user_defined_functions': False,
'tablets_mode_for_new_keyspaces': 'enabled' }
servers = await manager.servers_add(3, config=cfg)
servers = await manager.servers_add(3, config=cfg, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"}
])
cql = manager.get_cql()
@@ -788,12 +803,14 @@ async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: Manager
and when there is another down node in the datacenter, leaving no normal token owners.
"""
servers: dict[str, list[ServerInfo]] = dict()
servers['dc1'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc1', 'rack': 'rack1'})
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
{'dc': 'dc1', 'rack': 'rack1_1'},
{'dc': 'dc1', 'rack': 'rack1_2'}])
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
extra_node = 0 if with_zero_token_node else 1
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
if with_zero_token_node:
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1'}))
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
@@ -815,7 +832,7 @@ async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: Manager
logger.info(f"Replacing {node_to_replace} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
@pytest.mark.asyncio
@pytest.mark.parametrize("with_zero_token_node", [False, True])
@@ -827,10 +844,12 @@ async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_tok
And then verify that that node can be replaced successfully.
"""
servers: dict[str, list[ServerInfo]] = dict()
servers['dc1'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc1', 'rack': 'rack1'})
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
{'dc': 'dc1', 'rack': 'rack1_1'},
{'dc': 'dc1', 'rack': 'rack1_2'}])
servers['dc2'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc2', 'rack': 'rack2'})
if with_zero_token_node:
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1'}))
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
@@ -848,7 +867,7 @@ async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_tok
logger.info(f"Replacing {node_to_remove} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
@pytest.mark.asyncio
@pytest.mark.parametrize("with_zero_token_node", [False, True])
@@ -861,12 +880,14 @@ async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient,
but other datacenters can be used to rebuild the data.
"""
servers: dict[str, list[ServerInfo]] = dict()
servers['dc1'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc1', 'rack': 'rack1'})
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
{'dc': 'dc1', 'rack': 'rack1_1'},
{'dc': 'dc1', 'rack': 'rack1_2'}])
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
extra_node = 0 if with_zero_token_node else 1
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
if with_zero_token_node:
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1'}))
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
@@ -888,11 +909,11 @@ async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient,
logger.info(f"Replacing {nodes_to_replace[0]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[0].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
ignore_dead_nodes=[replaced_host_id])
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[0].property_file())
logger.info(f"Replacing {nodes_to_replace[1]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[1].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[1].property_file())
logger.info("Verifying data")
for node in servers['dc2']:
@@ -1115,15 +1136,15 @@ async def check_tablet_rebuild_with_repair(manager: ManagerClient, fail: bool):
host_ids = []
servers = []
async def make_server():
s = await manager.server_add(config=cfg)
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server()
await make_server()
await make_server()
await make_server("r1")
await make_server("r1")
await make_server("r2")
cql = manager.get_cql()

View File

@@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from typing import Any
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
@@ -12,6 +13,7 @@ from test.pylib.util import wait_for_cql_and_get_hosts, unique_name
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, TabletReplicas
from test.cluster.conftest import skip_mode
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace
from test.cqlpy.cassandra_tests.validation.entities.secondary_index_test import dotestCreateAndDropIndex
import pytest
import asyncio
@@ -94,7 +96,7 @@ async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(m
'--logger-log-level', 'messaging_service=trace',
'--logger-log-level', 'rpc=trace',
]
servers = await manager.servers_add(3, cmdline=cmdline)
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
s0 = servers[0].server_id
not_s0 = servers[1:]
@@ -495,7 +497,11 @@ async def test_tablet_repair(manager: ManagerClient):
'--logger-log-level', 'repair=trace',
'--task-ttl-in-seconds', '3600', # Make sure the test passes with non-zero task_ttl.
]
servers = await manager.servers_add(3, cmdline=cmdline)
servers = await manager.servers_add(3, cmdline=cmdline, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"}
])
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
@@ -558,7 +564,11 @@ async def test_concurrent_tablet_repair_and_split(manager: ManagerClient):
]
servers = await manager.servers_add(3, cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
})
}, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"}
])
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -622,9 +632,7 @@ async def test_tablet_missing_data_repair(manager: ManagerClient):
cmdline = [
'--hinted-handoff-enabled', 'false',
]
servers = [await manager.server_add(cmdline=cmdline),
await manager.server_add(cmdline=cmdline),
await manager.server_add(cmdline=cmdline)]
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', "
@@ -660,7 +668,7 @@ async def test_tablet_missing_data_repair(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_repair_history(manager: ManagerClient):
logger.info("Bootstrapping cluster")
servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()]
servers = await manager.servers_add(3, auto_rack_dc="dc1")
rf = 3
tablets = 8
@@ -686,7 +694,7 @@ async def test_tablet_repair_history(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_repair_ranges_selection(manager: ManagerClient):
logger.info("Bootstrapping cluster")
servers = [await manager.server_add(), await manager.server_add()]
servers = await manager.servers_add(2, auto_rack_dc="dc1")
rf = 2
tablets = 4
@@ -1604,12 +1612,12 @@ async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClie
logger.info("Verify data is not resurrected")
await assert_empty_table()
async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, nodes_per_rack: int) -> dict[ServerNum, ServerInfo]:
async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, nodes_per_rack: int, config: dict[str, Any] = None) -> dict[ServerNum, ServerInfo]:
logger.debug(f"Creating cluster: num_dcs={num_dcs} num_racks={num_racks} nodes_per_rack={nodes_per_rack}")
servers: dict[ServerNum, ServerInfo] = dict()
for dc in range(1, num_dcs + 1):
for rack in range(1, num_racks + 1):
rack_servers = await manager.servers_add(nodes_per_rack, property_file={"dc": f"dc{dc}", "rack": f"rack{rack}"})
rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": f"dc{dc}", "rack": f"rack{rack}"})
for s in rack_servers:
servers[s.server_id] = s
logger.debug(f"Created servers={list(servers.values())}")
@@ -1683,7 +1691,12 @@ async def test_decommission_rack_basic(manager: ManagerClient):
nodes_per_rack = 2
rf = num_racks - 1
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack)
# We need to disable this option to be able to create a keyspace. This can be ditched
# once we've implemented scylladb/scylladb#23426 and we can add new racks with the option enabled.
# Then we can create `rf` nodes, create the keyspace, and add another node.
config = {"rf_rack_valid_keyspaces": False}
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack, config)
async with create_and_populate_table(manager, rf=rf) as ctx:
logger.info("Verify tablet replicas distribution")
tables = {ctx.ks: [ctx.table]}
@@ -1719,7 +1732,11 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
nodes_per_rack = 2
rf = initial_num_racks
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack)
# We can't add a new rack if we create a keyspace.
# Once scylladb/scylladb#23426 has been implemented, this can be ditched.
config = {"rf_rack_valid_keyspaces": False}
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack, config)
async with create_and_populate_table(manager, rf=rf) as ctx:
logger.debug("Temporarily disable tablet load balancing")
node1 = sorted(initial_servers.values(), key=lambda s: s.server_id)[0]
@@ -1729,7 +1746,7 @@ async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
new_rack = f"rack{num_racks}"
# copy initial_servers into all_servers, don't just assign it (by reference)
all_servers: list[ServerInfo] = list(initial_servers.values())
new_rack_servers = await manager.servers_add(nodes_per_rack, property_file={"dc": "dc1", "rack": new_rack})
new_rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": "dc1", "rack": new_rack})
all_servers.extend(new_rack_servers)
logger.info("Verify tablet replicas distribution")
@@ -1887,7 +1904,7 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
"""Test truncate operation during topology change."""
# Start 3 node cluster
servers = await manager.servers_add(3, config = { 'enable_tablets': True })
servers = await manager.servers_add(3, config = { 'enable_tablets': True }, auto_rack_dc="dc1")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (k int PRIMARY KEY, v int)")
@@ -1905,7 +1922,8 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
truncate_task = asyncio.create_task(truncate_table())
logger.info("Adding fourth node")
new_server = await manager.server_add(config={'error_injections_at_startup': ['delay_bootstrap_120s'], 'enable_tablets': True})
new_server = await manager.server_add(config={'error_injections_at_startup': ['delay_bootstrap_120s'], 'enable_tablets': True},
property_file=servers[0].property_file())
await truncate_task
# Wait for bootstrap completion
@@ -1913,3 +1931,41 @@ async def test_truncate_during_topology_change(manager: ManagerClient):
rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test")
assert rows[0].count == 0, "Table should be empty after truncation"
# Reproducer for https://github.com/scylladb/scylladb/issues/22040.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_concurrent_schema_change_with_compaction_completion(manager: ManagerClient):
cmdline = ['--smp=2']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.enable_injection(servers[0].ip_addr, "sstable_list_builder_delay", one_shot=False)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.test"
await cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int);")
stop_compaction = False
async def background_compaction():
while stop_compaction == False:
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
compaction_task = asyncio.create_task(background_compaction())
for i in range(5):
dotestCreateAndDropIndex(cql, table, "CamelCase", False)
dotestCreateAndDropIndex(cql, table, "CamelCase2", True)
stop_compaction = True
await compaction_task
async def force_minor_compaction():
for i in range(4):
cql.run_async(f"INSERT INTO {ks}.test (a, b) VALUES (1, 1);")
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'TimeWindowCompactionStrategy' }};")
await force_minor_compaction()
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'IncrementalCompactionStrategy' }};")
await force_minor_compaction()

View File

@@ -76,10 +76,10 @@ async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerCl
}
logger.info("starting a node (the leader)")
servers = [await manager.server_add(config=config)]
servers = [await manager.server_add(config=config, property_file={"dc": "dc1", "rack": "r1"})]
logger.info("starting a second node (the follower)")
servers += [await manager.server_add(config=config)]
servers += [await manager.server_add(config=config, property_file={"dc": "dc1", "rack": "r2"})]
async with new_test_keyspace(manager, "with "
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and "

View File

@@ -27,15 +27,15 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
host_ids = []
servers = []
async def make_server():
s = await manager.server_add(config=cfg)
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server()
await make_server()
await make_server()
await make_server("r1")
await make_server("r1")
await make_server("r2")
cql = manager.get_cql()
@@ -109,22 +109,22 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
host_ids = []
servers = []
async def make_server():
s = await manager.server_add(config=cfg)
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await manager.api.disable_tablet_balancing(s.ip_addr)
await make_server()
await make_server("r1")
await make_server("r2")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
await make_server()
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await make_server()
await make_server("r2")
if fail_stage in ["cleanup_target", "revert_migration"]:
# we'll stop 2 servers, group0 quorum should be there - we need five
@@ -136,8 +136,8 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail
# attempt to remove the 2nd node, to make cleanup_target stage
# go ahead, will step on the legacy API lock on storage_service,
# so we need to ask some other node to do it
for _ in range(2):
await make_server()
await make_server("r1")
await make_server("r2")
logger.info(f"Cluster is [{host_ids}]")

View File

@@ -69,7 +69,8 @@ async def test_replace(manager: ManagerClient):
'--logger-log-level', 'raft_topology=trace',
]
servers = await manager.servers_add(3, cmdline=cmdline)
config = {"rf_rack_valid_keyspaces": False}
servers = await manager.servers_add(3, cmdline=cmdline, config=config)
cql = manager.get_cql()
@@ -118,7 +119,7 @@ async def test_replace(manager: ManagerClient):
logger.info('Replacing a node')
await manager.server_stop_gracefully(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
servers.append(await manager.server_add(replace_cfg))
servers.append(await manager.server_add(replace_cfg, config=config))
servers = servers[1:]
key_count = await finish_writes()
@@ -146,8 +147,10 @@ async def test_removenode(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = ['--logger-log-level', 'storage_service=trace']
config = {"rf_rack_valid_keyspaces": False}
# 4 nodes so that we can find new tablet replica for the RF=3 table on removenode
servers = await manager.servers_add(4, cmdline=cmdline)
servers = await manager.servers_add(4, cmdline=cmdline, config=config)
cql = manager.get_cql()
@@ -211,7 +214,13 @@ async def test_removenode_with_ignored_node(manager: ManagerClient):
# 5 nodes because we need a quorum with 2 nodes down.
# 4 nodes would be enough to not lose data with RF=3.
servers = await manager.servers_add(5, cmdline=cmdline)
servers = await manager.servers_add(5, cmdline=cmdline, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"}
])
cql = manager.get_cql()

View File

@@ -13,7 +13,7 @@ import logging
import pytest
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_nodes_cluster
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio

View File

@@ -32,7 +32,7 @@ def get_expected_tombstone_gc_mode(rf, tablets):
@pytest.mark.parametrize("rf", [1, 2])
@pytest.mark.parametrize("tablets", [True, False])
async def test_default_tombstone_gc(manager: ManagerClient, rf: int, tablets: bool):
_ = [await manager.server_add() for _ in range(2)]
_ = await manager.servers_add(2, auto_rack_dc="dc1")
cql = manager.get_cql()
tablets_enabled = "true" if tablets else "false"
async with new_test_keyspace(manager, f"with replication = {{ 'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} and tablets = {{ 'enabled': {tablets_enabled} }}") as keyspace:
@@ -44,7 +44,7 @@ async def test_default_tombstone_gc(manager: ManagerClient, rf: int, tablets: bo
@pytest.mark.parametrize("rf", [1, 2])
@pytest.mark.parametrize("tablets", [True, False])
async def test_default_tombstone_gc_does_not_override(manager: ManagerClient, rf: int, tablets: bool):
_ = [await manager.server_add() for _ in range(2)]
_ = await manager.servers_add(2, auto_rack_dc="dc1")
cql = manager.get_cql()
tablets_enabled = "true" if tablets else "false"
async with new_test_keyspace(manager, f"with replication = {{ 'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} and tablets = {{ 'enabled': {tablets_enabled} }}") as keyspace:
@@ -92,7 +92,10 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
cfg = {
'group0_tombstone_gc_refresh_interval_in_ms': 1000, # this is 1 hour by default
}
servers = [await manager.server_add(cmdline=cmdline, config=cfg) for _ in range(3)]
servers = await manager.servers_add(3, cmdline=cmdline, config=cfg, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r2"}])
cql = manager.get_cql()
hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]

View File

@@ -22,7 +22,8 @@ logger = logging.getLogger(__name__)
@pytest.mark.parametrize("tablets_enabled", [True, False])
async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bool):
"""Test basic topology operations using the topology coordinator."""
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'}
rf_rack_cfg = {'rf_rack_valid_keyspaces': False}
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'} | rf_rack_cfg
rf = 3
num_nodes = rf
if tablets_enabled:
@@ -57,7 +58,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
logger.info(f"Replacing node {servers[0]}")
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
servers = servers[1:] + [await manager.server_add(replace_cfg)]
servers = servers[1:] + [await manager.server_add(replace_cfg, config=rf_rack_cfg)]
await check_token_ring_and_group0_consistency(manager)
logger.info(f"Stopping node {servers[0]}")

View File

@@ -25,9 +25,11 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
d.mkdir()
k = d / "system_key"
k.write_text('AES/CBC/PKCS5Padding:128:ApvJEoFpQmogvam18bb54g==')
rf_rack_cfg = {'rf_rack_valid_keyspaces': False}
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled',
'user_info_encryption': {'enabled': True, 'key_provider': 'LocalFileSystemKeyProviderFactory'},
'system_key_directory': d.as_posix()}
cfg = cfg | rf_rack_cfg
rf = 3
num_nodes = rf
if tablets_enabled:
@@ -62,7 +64,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
logger.info(f"Replacing node {servers[0]}")
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
servers = servers[1:] + [await manager.server_add(replace_cfg)]
servers = servers[1:] + [await manager.server_add(replace_cfg, config=rf_rack_cfg)]
await check_token_ring_and_group0_consistency(manager)
logger.info(f"Stopping node {servers[0]}")

View File

@@ -19,7 +19,7 @@ import pytest
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_nodes_cluster
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
@@ -27,7 +27,7 @@ async def test_remove_node_add_column(manager: ManagerClient, random_tables: Ran
"""Add a node, remove an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_add()
await manager.server_add(property_file=servers[1].property_file())
await manager.server_stop_gracefully(servers[1].server_id) # stop [1]
await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1]
await check_token_ring_and_group0_consistency(manager)
@@ -53,7 +53,7 @@ async def test_decommission_node_add_column(manager: ManagerClient, random_table
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
await manager.api.enable_injection(
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
bootstrapped_server = await manager.server_add()
bootstrapped_server = await manager.server_add(property_file=decommission_target.property_file())
async def no_joining_nodes():
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
return not joining_nodes

View File

@@ -54,11 +54,11 @@ async def test_nodes_with_different_smp(request: FixtureRequest, manager: Manage
]
logger.info(f'Adding --smp=3 server')
await manager.server_add(cmdline=['--smp', '3'] + log_args)
await manager.server_add(cmdline=['--smp', '3'] + log_args, property_file={"dc": "dc1", "rack": "r1"})
logger.info(f'Adding --smp=4 server')
await manager.server_add(cmdline=['--smp', '4'] + log_args)
await manager.server_add(cmdline=['--smp', '4'] + log_args, property_file={"dc": "dc1", "rack": "r2"})
logger.info(f'Adding --smp=5 server')
await manager.server_add(cmdline=['--smp', '5'] + log_args)
await manager.server_add(cmdline=['--smp', '5'] + log_args, property_file={"dc": "dc1", "rack": "r3"})
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)

View File

@@ -18,20 +18,20 @@ from test.cluster.util import create_new_test_keyspace
@pytest.mark.asyncio
@pytest.mark.parametrize('zero_token_nodes', [1, 2])
async def test_zero_token_nodes_multidc_basic(manager: ManagerClient, zero_token_nodes: int):
@pytest.mark.parametrize('rf_rack_valid_keyspaces', [False, True])
async def test_zero_token_nodes_multidc_basic(manager: ManagerClient, zero_token_nodes: int, rf_rack_valid_keyspaces: bool):
"""
Test the basic functionality of a DC with zero-token nodes:
- adding zero-token nodes to a new DC succeeds
- with tablets, ensuring enough replicas for tables depends on the number of token-owners in a DC, not all nodes
- client requests in the presence of zero-token nodes succeed (also when zero-token nodes coordinate)
"""
normal_cfg = {'endpoint_snitch': 'GossipingPropertyFileSnitch'}
zero_token_cfg = {'endpoint_snitch': 'GossipingPropertyFileSnitch', 'join_ring': False}
property_file_dc1 = {'dc': 'dc1', 'rack': 'rack'}
normal_cfg = {'endpoint_snitch': 'GossipingPropertyFileSnitch', 'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces}
zero_token_cfg = normal_cfg | {'join_ring': False}
property_file_dc2 = {'dc': 'dc2', 'rack': 'rack'}
logging.info('Creating dc1 with 2 token-owning nodes')
servers = await manager.servers_add(2, config=normal_cfg, property_file=property_file_dc1)
servers = await manager.servers_add(2, config=normal_cfg, auto_rack_dc='dc1')
normal_nodes_in_dc2 = 2 - zero_token_nodes
logging.info(f'Creating dc2 with {normal_nodes_in_dc2} token-owning and {zero_token_nodes} zero-token nodes')
@@ -47,7 +47,17 @@ async def test_zero_token_nodes_multidc_basic(manager: ManagerClient, zero_token
ks_names = list[str]()
logging.info('Trying to create tables for different replication factors')
for rf in range(3):
# With `rf_rack_valid_keyspaces` set to true, we cannot create a keyspace with RF > #racks.
# Because of that, the test will fail not at the stage when a TABLE is created, but when
# the KEYSPACE is. We want to avoid that and hence this statement.
#
# rf_rack_valid_keyspaces == False: We'll attempt to create tables in keyspaces with too few normal token owners.
# rf_rack_valid_keyspaces == True: We'll only create RF-rack-valid keyspaces and so all of the created tables
# will have enough normal token owners.
rf_range = (normal_nodes_in_dc2 + 1) if rf_rack_valid_keyspaces else 3
for rf in range(rf_range):
failed = False
ks_name = await create_new_test_keyspace(dc2_cql, f"""WITH replication =
{{'class': 'NetworkTopologyStrategy', 'replication_factor': 2, 'dc2': {rf}}}

View File

@@ -22,11 +22,11 @@ async def test_zero_token_nodes_no_replication(manager: ManagerClient):
Test that zero-token nodes aren't replicas in all non-local replication strategies with and without tablets.
"""
logging.info('Adding the first server')
server_a = await manager.server_add()
server_a = await manager.server_add(property_file={"dc": "dc1", "rack": "r1"})
logging.info('Adding the second server as zero-token')
server_b = await manager.server_add(config={'join_ring': False})
server_b = await manager.server_add(config={'join_ring': False}, property_file={"dc": "dc1", "rack": "r2"})
logging.info('Adding the third server')
await manager.server_add()
await manager.server_add(property_file={"dc": "dc1", "rack": "r3"})
logging.info(f'Initiating connections to {server_a} and {server_b}')
cql_a = cluster_con([server_a.ip_addr], 9042, False,

View File

@@ -24,29 +24,34 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
- topology operations in the Raft-based topology involving zero-token nodes succeed
- client requests to normal nodes in the presence of zero-token nodes (2 normal nodes, RF=2, CL=2) succeed
"""
def get_pf(rack: str) -> dict[str, str]:
return {"dc": "dc1", "rack": rack}
logging.info('Trying to add a zero-token server in the gossip-based topology')
await manager.server_add(config={'join_ring': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'},
property_file=get_pf("rz"),
expected_error='the raft-based topology is disabled')
normal_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'}
zero_token_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled', 'join_ring': False}
logging.info('Adding the first server')
server_a = await manager.server_add(config=normal_cfg)
server_a = await manager.server_add(config=normal_cfg, property_file=get_pf("r1"))
logging.info('Adding the second server as zero-token')
server_b = await manager.server_add(config=zero_token_cfg)
server_b = await manager.server_add(config=zero_token_cfg, property_file=get_pf("rz"))
logging.info('Adding the third server')
server_c = await manager.server_add(config=normal_cfg)
server_c = await manager.server_add(config=normal_cfg, property_file=get_pf("r2"))
await wait_for_cql_and_get_hosts(manager.cql, [server_a, server_c], time.time() + 60)
finish_writes = await start_writes(manager.cql, 2, ConsistencyLevel.TWO)
logging.info('Adding the fourth server as zero-token')
await manager.server_add(config=zero_token_cfg) # Necessary to preserve the Raft majority.
await manager.server_add(config=zero_token_cfg, property_file=get_pf("rz")) # Necessary to preserve the Raft majority.
logging.info(f'Restarting {server_b}')
await manager.server_stop_gracefully(server_b.server_id)
@@ -57,23 +62,24 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
replace_cfg_b = ReplaceConfig(replaced_id=server_b.server_id, reuse_ip_addr=False, use_host_id=False)
logging.info(f'Trying to replace {server_b} with a token-owing server')
await manager.server_add(replace_cfg_b, config=normal_cfg, expected_error='Cannot replace the zero-token node')
await manager.server_add(replace_cfg_b, config=normal_cfg, property_file=server_b.property_file(),
expected_error='Cannot replace the zero-token node')
logging.info(f'Replacing {server_b}')
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg)
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg, property_file=server_b.property_file())
logging.info(f'Stopping {server_b}')
await manager.server_stop_gracefully(server_b.server_id)
replace_cfg_b = ReplaceConfig(replaced_id=server_b.server_id, reuse_ip_addr=True, use_host_id=False)
logging.info(f'Replacing {server_b} with the same IP')
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg)
server_b = await manager.server_add(replace_cfg_b, config=zero_token_cfg, property_file=server_b.property_file())
logging.info(f'Decommissioning {server_b}')
await manager.decommission_node(server_b.server_id)
logging.info('Adding two zero-token servers')
[server_b, server_d] = await manager.servers_add(2, config=zero_token_cfg)
[server_b, server_d] = await manager.servers_add(2, config=zero_token_cfg, property_file=get_pf("rz"))
logging.info(f'Rebuilding {server_b}')
await manager.rebuild_node(server_b.server_id)
@@ -92,21 +98,21 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
await manager.remove_node(server_a.server_id, server_d.server_id)
logging.info('Adding a zero-token server')
await manager.server_add(config=zero_token_cfg)
await manager.server_add(config=zero_token_cfg, property_file=get_pf("rz"))
# FIXME: Finish writes after the last server_add call once scylladb/scylladb#19737 is fixed.
logging.info('Checking results of the background writes')
await finish_writes()
logging.info('Adding a normal server')
server_e = await manager.server_add(config=normal_cfg)
server_e = await manager.server_add(config=normal_cfg, property_file=get_pf("r1"))
logging.info(f'Stopping {server_e}')
await manager.server_stop_gracefully(server_e.server_id)
replace_cfg_e = ReplaceConfig(replaced_id=server_e.server_id, reuse_ip_addr=False, use_host_id=False)
logging.info(f'Trying to replace {server_e} with a zero-token server')
await manager.server_add(replace_cfg_e, config=zero_token_cfg,
await manager.server_add(replace_cfg_e, config=zero_token_cfg, property_file=server_e.property_file(),
expected_error='Cannot replace the token-owning node')
await check_node_log_for_failed_mutations(manager, server_a)

View File

@@ -128,7 +128,7 @@ public:
static constexpr std::string_view ks_name = "ks";
static std::atomic<bool> active;
private:
std::unique_ptr<sstable_compressor_factory> _scf;
sharded<default_sstable_compressor_factory> _scf;
sharded<replica::database> _db;
sharded<gms::feature_service> _feature_service;
sharded<sstables::storage_manager> _sstm;
@@ -657,10 +657,14 @@ private:
auto stop_lang_manager = defer_verbose_shutdown("lang manager", [this] { _lang_manager.stop().get(); });
_lang_manager.invoke_on_all(&lang::manager::start).get();
_scf = make_sstable_compressor_factory();
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
_scf.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config, std::cref(*cfg), std::cref(numa_groups))).get();
auto stop_scf = defer_verbose_shutdown("sstable_compressor_factory", [this] {
_scf.stop().get();
});
_db_config = &*cfg;
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(*_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
auto stop_db = defer_verbose_shutdown("database", [this] {
_db.stop().get();
});
@@ -896,6 +900,9 @@ private:
_view_builder.stop().get();
});
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
_ss.start(std::ref(abort_sources), std::ref(_db),
std::ref(_gossiper),
std::ref(_sys_ks),
@@ -995,9 +1002,6 @@ private:
}
});
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
_sl_controller.invoke_on_all([this, &group0_client] (qos::service_level_controller& service) {
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(

View File

@@ -109,7 +109,7 @@ public:
void maybe_start_compaction_manager(bool enable = true);
explicit test_env(test_env_config cfg = {}, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
explicit test_env(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr);
~test_env();
test_env(test_env&&) noexcept;
@@ -176,15 +176,6 @@ public:
replica::table::config make_table_config();
template <typename Func>
static inline auto do_with(Func&& func, test_env_config cfg = {}) {
return seastar::do_with(test_env(std::move(cfg)), [func = std::move(func)] (test_env& env) mutable {
return futurize_invoke(func, env).finally([&env] {
return env.stop();
});
});
}
static future<> do_with_async(noncopyable_function<void (test_env&)> func, test_env_config cfg = {});
static future<> do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)> func);
@@ -192,7 +183,8 @@ public:
template <typename T>
static future<T> do_with_async_returning(noncopyable_function<T (test_env&)> func) {
return seastar::async([func = std::move(func)] {
test_env env;
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test_env env({}, *scf);
auto stop = defer([&] { env.stop().get(); });
return func(env);
});

View File

@@ -201,7 +201,7 @@ struct test_env::impl {
::cache_tracker cache_tracker;
gms::feature_service feature_service;
db::nop_large_data_handler nop_ld_handler;
std::unique_ptr<sstable_compressor_factory> scf;
sstable_compressor_factory& scf;
test_env_sstables_manager mgr;
std::unique_ptr<test_env_compaction_manager> cmgr;
reader_concurrency_semaphore semaphore;
@@ -210,7 +210,7 @@ struct test_env::impl {
data_dictionary::storage_options storage;
abort_source abort;
impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir);
impl(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm, tmpdir* tdir);
impl(impl&&) = delete;
impl(const impl&) = delete;
@@ -219,16 +219,16 @@ struct test_env::impl {
}
};
test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tdir)
test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, sstables::storage_manager* sstm, tmpdir* tdir)
: local_dir(tdir == nullptr ? std::optional<tmpdir>(std::in_place) : std::optional<tmpdir>(std::nullopt))
, dir(tdir == nullptr ? local_dir.value() : *tdir)
, db_config(make_db_config(dir.path().native(), cfg.storage))
, dir_sem(1)
, feature_service(gms::feature_config_from_db_config(*db_config))
, scf(make_sstable_compressor_factory())
, scf(scfarg)
, mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
feature_service, cache_tracker, cfg.available_memory, dir_sem,
[host_id = locator::host_id::create_random_id()]{ return host_id; }, *scf, abort, current_scheduling_group(), sstm)
[host_id = locator::host_id::create_random_id()]{ return host_id; }, scf, abort, current_scheduling_group(), sstm)
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
, use_uuid(cfg.use_uuid)
, storage(std::move(cfg.storage))
@@ -242,8 +242,8 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm, tmpdi
}
}
test_env::test_env(test_env_config cfg, sstables::storage_manager* sstm, tmpdir* tmp)
: _impl(std::make_unique<impl>(std::move(cfg), sstm, tmp))
test_env::test_env(test_env_config cfg, sstable_compressor_factory& scf, sstables::storage_manager* sstm, tmpdir* tmp)
: _impl(std::make_unique<impl>(std::move(cfg), scf, sstm, tmp))
{
}
@@ -325,7 +325,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
sharded<sstables::storage_manager> sstm;
sstm.start(std::ref(*db_cfg), sstables::storage_manager::config{}).get();
auto stop_sstm = defer([&] { sstm.stop().get(); });
test_env env(std::move(cfg), &sstm.local());
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test_env env(std::move(cfg), *scf, &sstm.local());
auto close_env = defer([&] { env.stop().get(); });
env.manager().plug_sstables_registry(std::make_unique<mock_sstables_registry>());
auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); });
@@ -334,7 +335,8 @@ future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, te
}
return seastar::async([func = std::move(func), cfg = std::move(cfg)] () mutable {
test_env env(std::move(cfg));
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test_env env(std::move(cfg), *scf);
auto close_env = defer([&] { env.stop().get(); });
func(env);
});
@@ -476,7 +478,8 @@ test_env::do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)>
return seastar::async([func = std::move(func)] {
tmpdir tdir;
sharded<test_env> env;
env.start(test_env_config{}, nullptr, &tdir).get();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
env.start(test_env_config{}, std::ref(*scf), nullptr, &tdir).get();
auto stop = defer([&] { env.stop().get(); });
func(env);
});

View File

@@ -515,3 +515,41 @@ def test_status_with_zero_token_nodes(request, nodetool):
]
_do_test_status(request, nodetool, None, nodes)
def test_status_negative_load(request, nodetool):
nodes = [
Node(
endpoint="127.0.0.1",
host_id="78a9c1d0-b341-467e-a076-9eff4cf7ffc6",
load=-206015,
tokens=["-9175818098208185248", "-3983536194780899528"],
datacenter="datacenter1",
rack="rack1",
status=NodeStatus.Unknown,
state=NodeState.Joining,
),
Node(
endpoint="127.0.0.2",
host_id="ed341f60-b12a-4fd4-9917-e80977ded0f9",
load=277624,
tokens=["-1810801828328238220", "2983536194780899528"],
datacenter="datacenter1",
rack="rack2",
status=NodeStatus.Down,
state=NodeState.Normal,
),
Node(
endpoint="127.0.0.3",
host_id="1e77eb26-a372-4eb4-aeaa-72f224cf6b4c",
load=353236,
tokens=["3810801828328238220", "6810801828328238220"],
datacenter="datacenter1",
rack="rack3",
status=NodeStatus.Up,
state=NodeState.Normal,
),
]
status_target = StatusQueryTarget(keyspace="ks", table=None, uses_tablets=False)
_do_test_status(request, nodetool, status_target, nodes)

View File

@@ -144,7 +144,8 @@ int scylla_sstable_main(int argc, char** argv) {
}
cfg.compaction_strategy = sstables::compaction_strategy::type(app.configuration()["compaction-strategy"].as<sstring>());
cfg.timestamp_range = app.configuration()["timestamp-range"].as<api::timestamp_type>();
test.start(std::move(cfg)).get();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
test.start(std::move(cfg), std::ref(*scf)).get();
auto stop_test = deferred_stop(test);
switch (mode) {

View File

@@ -195,7 +195,9 @@ private:
}
public:
perf_sstable_test_env(conf cfg) : _cfg(std::move(cfg))
perf_sstable_test_env(conf cfg, sstable_compressor_factory& scf)
: _env({}, scf)
, _cfg(std::move(cfg))
, s(create_schema(cfg.compaction_strategy))
, _distribution('@', '~')
, _mt(make_lw_shared<replica::memtable>(s))

View File

@@ -45,13 +45,16 @@ async def load_tablet_repair_task_infos(cql, host, table_id):
return repair_task_infos
async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256, disable_flush_cache_time = False, cmdline = None) -> (list[ServerInfo], CassandraSession, list[Host], str, str):
assert rf <= 3, "A keyspace with RF > 3 will be RF-rack-invalid if there are fewer racks than the RF"
if fast_stats_refresh:
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
else:
config = {}
if disable_flush_cache_time:
config.update({'repair_hints_batchlog_flush_cache_time_in_ms': 0})
servers = [await manager.server_add(config=config, cmdline=cmdline), await manager.server_add(config=config, cmdline=cmdline), await manager.server_add(config=config, cmdline=cmdline)]
servers = await manager.servers_add(3, config=config, cmdline=cmdline,
property_file=[{"dc": "dc1", "rack": f"r{i % rf}"} for i in range(rf)])
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {{'class': 'NetworkTopologyStrategy', "
"'replication_factor': {}}} AND tablets = {{'initial': {}}};".format(rf, tablets))

View File

@@ -24,6 +24,7 @@ from typing import Optional, TypeVar, Any
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import Statement # type: ignore # pylint: disable=no-name-in-module
from cassandra import DriverException, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from test import BUILD_DIR, TOP_SRC_DIR
@@ -315,3 +316,29 @@ async def gather_safely(*awaitables: Awaitable):
def get_xdist_worker_id() -> str | None:
return os.environ.get("PYTEST_XDIST_WORKER")
def execute_with_tracing(cql : Session, statement : str | Statement, log : bool = False, *cql_execute_extra_args, **cql_execute_extra_kwargs):
""" Execute statement via cql session and log the tracing output. """
cql_execute_extra_kwargs['trace'] = True
query_result = cql.execute(statement, *cql_execute_extra_args, **cql_execute_extra_kwargs)
tracing = query_result.get_all_query_traces(max_wait_sec_per=900)
ret = []
page_traces = []
for trace in tracing:
ret.append(trace.events)
if not log:
continue
trace_events = []
for event in trace.events:
trace_events.append(f" {event.source} {event.source_elapsed} {event.description}")
page_traces.append("\n".join(trace_events))
if log:
logger.debug("Tracing {}:\n{}\n".format(statement, "\n".join(page_traces)))
return ret

View File

@@ -49,7 +49,7 @@ tools::tablets_t do_load_system_tablets(const db::config& dbcfg,
std::string_view table_name,
reader_permit permit) {
sharded<sstable_manager_service> sst_man;
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
auto stop_sst_man_service = deferred_stop(sst_man);

View File

@@ -396,7 +396,7 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
reader_concurrency_semaphore rcs_sem(reader_concurrency_semaphore::no_limits{}, __FUNCTION__, reader_concurrency_semaphore::register_metrics::no);
auto stop_semaphore = deferred_stop(rcs_sem);
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sharded<sstable_manager_service> sst_man;
sst_man.start(std::ref(dbcfg), std::ref(*scf)).get();
auto stop_sst_man_service = deferred_stop(sst_man);
@@ -500,7 +500,7 @@ schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem:
cache_tracker tracker;
sstables::directory_semaphore dir_sem(1);
abort_source abort;
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, dbcfg, feature_service, tracker,
memory::stats().total_memory(), dir_sem,
[host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort);

View File

@@ -67,13 +67,13 @@ static std::ostream& operator<<(std::ostream& os, const std::vector<sstring>& v)
// mimic the behavior of FileUtils::stringifyFileSize
struct file_size_printer {
uint64_t value;
int64_t value;
bool human_readable;
bool use_correct_units;
// Cassandra nodetool uses base_2 and base_10 units interchangeably, some
// commands use this, some that. Let's accomodate this for now, and maybe
// fix this mess at one point in the future, after the rewrite is done.
file_size_printer(uint64_t value, bool human_readable = true, bool use_correct_units = false)
file_size_printer(int64_t value, bool human_readable = true, bool use_correct_units = false)
: value{value}
, human_readable{human_readable}
, use_correct_units{use_correct_units}
@@ -87,15 +87,15 @@ struct fmt::formatter<file_size_printer> : fmt::formatter<string_view> {
return fmt::format_to(ctx.out(), "{}", size.value);
}
using unit_t = std::tuple<uint64_t, std::string_view, std::string_view>;
using unit_t = std::tuple<int64_t, std::string_view, std::string_view>;
const unit_t units[] = {
{1UL << 40, "TiB", "TB"},
{1UL << 30, "GiB", "GB"},
{1UL << 20, "MiB", "MB"},
{1UL << 10, "KiB", "KB"},
{1LL << 40, "TiB", "TB"},
{1LL << 30, "GiB", "GB"},
{1LL << 20, "MiB", "MB"},
{1LL << 10, "KiB", "KB"},
};
for (auto [n, base_2, base_10] : units) {
if (size.value > n) {
if ((size.value > n) || (size.value < -n)) {
auto d = static_cast<float>(size.value) / n;
auto postfix = size.use_correct_units ? base_2 : base_10;
return fmt::format_to(ctx.out(), "{:.2f} {}", d, postfix);
@@ -2197,7 +2197,7 @@ void status_operation(scylla_rest_client& client, const bpo::variables_map& vm)
const auto joining = get_nodes_of_state(client, "joining");
const auto leaving = get_nodes_of_state(client, "leaving");
const auto moving = get_nodes_of_state(client, "moving");
const auto endpoint_load = rjson_to_map<size_t>(client.get("/storage_service/load_map"));
const auto endpoint_load = rjson_to_map<ssize_t>(client.get("/storage_service/load_map"));
const auto tablets_keyspace = keyspace && keyspace_uses_tablets(client, *keyspace);

View File

@@ -3559,7 +3559,7 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
}
gms::feature_service feature_service(gms::feature_config_from_db_config(dbcfg));
auto scf = make_sstable_compressor_factory();
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
cache_tracker tracker;
sstables::directory_semaphore dir_sem(1);
abort_source abort;

View File

@@ -406,7 +406,7 @@ class background_reclaimer {
promise<>* _main_loop_wait = nullptr;
future<> _done;
bool _stopping = false;
static constexpr size_t free_memory_threshold = 60'000'000;
static constexpr size_t free_memory_threshold = background_reclaim_free_memory_threshold;
private:
bool have_work() const {
#ifndef SEASTAR_DEFAULT_ALLOCATOR

View File

@@ -29,6 +29,8 @@ constexpr int segment_size_shift = 17; // 128K; see #151, #152
constexpr size_t segment_size = 1 << segment_size_shift;
constexpr size_t max_zone_segments = 256;
constexpr size_t background_reclaim_free_memory_threshold = 60'000'000;
//
// Frees some amount of objects from the region to which it's attached.
//