Compare commits

...

58 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
c06760cf15 Fix multiple issues in test_out_of_space_prevention.py
- Fix variable name error: host[0] → hosts[0] on line 98
- Add missing await keywords for async operations on lines 209 and 385
- Rename class random_content_file to RandomContentFile (PascalCase)
- Fix function name typo: test_autotoogle_compaction → test_autotoggle_compaction

Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-12-23 09:25:16 +00:00
copilot-swe-agent[bot]
c684456eba Initial plan 2025-12-23 09:21:06 +00:00
Pavel Emelyanov
cd2568ad00 test: Merge and parametrize test_backup_to_non_existent_something tests
There are three tests in cluster/object_store suite that check how
backup fails in case either of its parameters doesn't really exists. All
three greatly duplicate each other, it makes sense to merge them into
one larger parametrized test.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27695
2025-12-23 07:02:18 +02:00
Avi Kivity
7586c5ccbd Merge 'system.clients: add client_options map column' from Vladislav Zolotarov
This pull request introduces a new caching mechanism for client options in the Alternator and transport layers, refactors how client metadata is stored and accessed, and extends the `system.clients` virtual table to surface richer client information. The changes improve efficiency by deduplicating commonly used strings (like driver names/versions and client options), and ensure that client data is handled in a way that's safe for cross-shard access. Additionally, the test suite and virtual table schema are updated to reflect the new client options data.

**Caching and client metadata refactoring:**

* The largest and most repeatable items in the connection state before this PR were a `driver_name` and a `driver_version` which were stored as an `sstring` object which means that the corresponding memory consumption was 16 bytes per each such value at least (the smallest size of the `seastar`'s `sstring` object) **per-connection**. In reality the driver name is usually longer than 15 characters, e.g. "ScyllaDB Python Driver" is 23 characters and this is not the longest driver name there is. In such cases the actual memory usage of a corresponding `sstring` object jumps to 8 + 4 + 1 + (string length, 23 in our example) + 1.
So, for "ScyllaDB Python Driver" it would be 37 bytes (in reality it would be a bit more due to natural alignment of other allocations since the `contents` size is not well aligned (13 bytes), but let's ignore this for now).
* These bytes add up quickly as there are more connections and, sometimes we are talking about millions of connections per-shard.
* Using a smart pointer (`lw_shared_ptr`) referencing a corresponding cached value will effectively reduce the per-connection memory usage to be 8 bytes (a size of a pointer on 64-bit CPU platform) for each such value. While storing a corresponding `sstring` value only once.
* This will would reduce the "variable" (per-connection) memory usage by **at least 50%**. And in case of "ScyllaDB Python Driver" driver version - by 78%!
* And all this for a price of a single `loading_shared_values` object **per-shard** (implements a hash table) and a minor overhead for each value **stored** in it.

* Introduced a new cache type (`client_options_cache_type`) for deduplicating and sharing client option strings, and refactored `client_data`, `client_state`, and related classes to use `foreign_ptr<std::unique_ptr<client_data>>` and cached entry types for fields like driver name, driver version, and client options. (`client_data.hh`, `service/client_state.hh`, `alternator/server.hh`, `alternator/controller.hh`, `transport/controller.hh`, `transport/protocol_server.hh`) [[1]](diffhunk://#diff-664a3b19e905481bdf8eb3843fc4d34691067bb97ab11cfd6e652e74aac51d9fR33-R36) [[2]](diffhunk://#diff-664a3b19e905481bdf8eb3843fc4d34691067bb97ab11cfd6e652e74aac51d9fL40-R56) [[3]](diffhunk://#diff-daadce1a2de3667511e59558f3a8f077b5ee30a14bcc6a99d588db90d0fcd2bdL105-R107) [[4]](diffhunk://#diff-daadce1a2de3667511e59558f3a8f077b5ee30a14bcc6a99d588db90d0fcd2bdL154-R182) [[5]](diffhunk://#diff-5fce246edf5abffb2351bd02e2eb1e9850880f7a00607ccaa90c3eee7ef57c6bL91-R92) [[6]](diffhunk://#diff-5fce246edf5abffb2351bd02e2eb1e9850880f7a00607ccaa90c3eee7ef57c6bL110-R111) [[7]](diffhunk://#diff-31730ba8e7374f784a88dc27c1512291cf73b7f24e08768f7466a3c8cfcc7a1aL96-R96) [[8]](diffhunk://#diff-19a97c0247cc08155ee49b277e43859ca32d6ef8cbff0ed7368ec5fa19e0a11eL172-R172) [[9]](diffhunk://#diff-eea7e2db5d799a25e717a72ac8ce5842bd4adb72b694d38d8f47166d9cd926faL356-R356) [[10]](diffhunk://#diff-d0b4ec3a144bbc5dc993866cf0b940850a457ff6156064f7e2b4b10ad0a95fefL80-R80) [[11]](diffhunk://#diff-4293b94c444d9bd5ecd17ce7eda8c00685d35ecf6e07f844efc91a91bbe85be1L46-R48)

* Updated the methods for setting and getting driver name, driver version, and client options in `client_state` to be asynchronous and use the new cache. (`service/client_state.hh`, `service/client_state.cc`) [[1]](diffhunk://#diff-daadce1a2de3667511e59558f3a8f077b5ee30a14bcc6a99d588db90d0fcd2bdL154-R182) [[2]](diffhunk://#diff-99634aae22e2573f38b4e2f050ed2ac4f8173ff27f0ae8b3609d1f0cc1aeb775R347-R362)

**Virtual table and API enhancements:**

* Extended the `system.clients` virtual table schema and implementation to include a new `client_options` column (a map of option key/value pairs), and updated the table population logic to use the new cached types and foreign pointers. (`db/virtual_tables.cc`) [[1]](diffhunk://#diff-05f7bff3edb39fb8759c90b445e860189f2f30e04717ed58bae42716082af3d1R752) [[2]](diffhunk://#diff-05f7bff3edb39fb8759c90b445e860189f2f30e04717ed58bae42716082af3d1L769-R770) [[3]](diffhunk://#diff-05f7bff3edb39fb8759c90b445e860189f2f30e04717ed58bae42716082af3d1L809-R816) [[4]](diffhunk://#diff-05f7bff3edb39fb8759c90b445e860189f2f30e04717ed58bae42716082af3d1L828-R879)

**API and interface changes:**

* Changed the signatures of `get_client_data` methods throughout the codebase to return vectors of `foreign_ptr<std::unique_ptr<client_data>>` instead of plain `client_data` objects, to ensure safe cross-shard access. (`alternator/controller.hh`, `alternator/controller.cc`, `alternator/server.hh`, `alternator/server.cc`, `transport/controller.hh`, `transport/protocol_server.hh`) [[1]](diffhunk://#diff-31730ba8e7374f784a88dc27c1512291cf73b7f24e08768f7466a3c8cfcc7a1aL96-R96) [[2]](diffhunk://#diff-19a97c0247cc08155ee49b277e43859ca32d6ef8cbff0ed7368ec5fa19e0a11eL172-R172) [[3]](diffhunk://#diff-5fce246edf5abffb2351bd02e2eb1e9850880f7a00607ccaa90c3eee7ef57c6bL110-R111) [[4]](diffhunk://#diff-a7e2cda866c03a75afcf3b087de1c1dcd2e7aa996214db67f9a11ed6451e596dL988-R995) [[5]](diffhunk://#diff-eea7e2db5d799a25e717a72ac8ce5842bd4adb72b694d38d8f47166d9cd926faL356-R356) [[6]](diffhunk://#diff-d0b4ec3a144bbc5dc993866cf0b940850a457ff6156064f7e2b4b10ad0a95fefL80-R80) [[7]](diffhunk://#diff-4293b94c444d9bd5ecd17ce7eda8c00685d35ecf6e07f844efc91a91bbe85be1L46-R48)

**Testing and validation:**

* Updated the Python test for the `system.clients` table to verify the new `client_options` column and its contents, ensuring that driver name and version are present in the options map. (`test/cqlpy/test_virtual_tables.py`) [[1]](diffhunk://#diff-6dd8bd4a6a82cd642252a29dc70726f89a46ceefb991c3e63fc67e283f323f03R79) [[2]](diffhunk://#diff-6dd8bd4a6a82cd642252a29dc70726f89a46ceefb991c3e63fc67e283f323f03R88-R90)

Closes scylladb/scylladb#25746

* github.com:scylladb/scylladb:
  transport/server: declare a new "CLIENT_OPTIONS" option as supported
  service/client_state and alternator/server: use cached values for driver_name and driver_version fields
  system.clients: add a client_options column
  controller: update get_client_data to use foreign_ptr for client_data
2025-12-22 20:02:40 +02:00
Emil Maskovsky
d60b908a8e test/raft: improve reporting in the randomized_nemesis_test digest functions
The Boost ASSERTs in the digest functions of the randomized_nemesis_test
were not working well inside the state machine digest functions, leading
to unhelpful boost::execution_exception errors that terminated the apply
fiber, and didn't provide any helpful information.

Replaced by explicit checks with on_fatal_internal_error calls that
provide more context about the failure. Also added validation of the
digest value after appending or removing an element, which allows to
determine which operation resulted in causing the wrong value.

This effectively reverts the changes done in https://github.com/scylladb/scylladb/pull/19282,
but adds improved error reporting.

Refs: scylladb/scylladb#27307
Refs: scylladb/scylladb#17030

Closes scylladb/scylladb#27791
2025-12-22 20:02:40 +02:00
Nikos Dragazis
20ff2fcc18 docs: Amend limitations for keyspace RF changes
The doc about DDL statements claims that an `ALTER KEYSPACE` will fail
in the presence of an ongoing global topology operation.

This limitation was specifically referring to RF changes, which Scylla
implements as global topology requests (`keyspace_rf_change`), and it
was true when it was first introduced (1b913dd880) because there was
no global topology request queue at that time, so only one ongoing
global request was allowed in the cluster.

This limitation was lifted with the introduction of the global topology
request queue (6489308ebc), and it was re-introduced again very
recently (2e7ba1f8ce) in a slightly different form; it now applies only
to RF changes (not to any request type) and only those that affect the
same keyspace. None of these two changes were ever reflected in the doc.

Synchronize the doc with the current state.

Fixes #27776.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>

Closes scylladb/scylladb#27786
2025-12-22 20:02:40 +02:00
Andrei Chekun
6ffdada0ea test.py: modify JUnit report for easier rerun on CI
This will allow to add custom XML attribute to the JUnit report. In this
case there will be path to the function that can be used to run with
pytest command. Parametrized tests will have path to the function
excluding parameter.

Closes scylladb/scylladb#27707
2025-12-22 20:02:40 +02:00
Anna Stuchlik
4c247a5d08 doc: document support for i8g and i8ge instances
Fixes https://github.com/scylladb/scylladb/issues/27703

Closes scylladb/scylladb#27754
2025-12-22 20:02:40 +02:00
copilot-swe-agent[bot]
288d4b49e9 Skip backtrace in lsa-timing logs for preemptible reclaim
Preemptible reclaim is only done from the background reclaimer,
so backtrace is not useful. It's also normal that it takes a long time.
Skip the backtrace when reclaim is preemptible to reduce log noise.

Fixes the issue where background reclaim was printing unnecessary
backtraces in lsa-timing logs when operations took longer than the
stall detection threshold.

Closes: #27692

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-22 20:02:40 +02:00
Pavel Emelyanov
e304d912b4 Merge 'db/view/view_building_worker: follow-ups' from Michał Jadwiszczak
This patch consists of a few smaller follow-ups to the view building worker:
- catch general execption in staging task registrator
- remove unnecessary CV broadcast
- don't pollute function context with conditionally compiled variable
- avoid creating a copy of tasks map
- fix some typos

Refs https://github.com/scylladb/scylladb/issues/25929
Refs https://github.com/scylladb/scylladb/pull/26897

This PR doesn't fix any bugs but recently we're backporting some PRs to 2025.4, so let's also backport this one to avoid painful conflicts.

Closes scylladb/scylladb#26558

* github.com:scylladb/scylladb:
  docs/dev/view-building-coordinator: fix typos
  db/view/view_building_worker: remove unnnecessary empty lines
  db/view/view_building_worker: fix typo
  db/view/view_building_worker: avoid creating a copy of tasks map
  db/view/view_building_worker: wrap conditionally compiled code in a scope
  db/view/view_building_worker: remove unnecessary CV broadcast
  db/view/view_building_worker: catch general execption in staging task registrator
2025-12-22 20:02:40 +02:00
Botond Dénes
846a6e700b Merge 'get_snapshot_details: process also staging directory' from Benny Halevy
Currently, we determine the live vs. total snapshot size by listing all files in the snapshot directory,
and for each name, look it up in the base table directory and see if it exists there, and if so, if it's the same file
as in the snapshot by looking to the fstat data for the dev id and inode number.

However, we do not look the names in the staging directory so staging sstable
would skew the results as the will falsely contribute to the live size, since they
wouldn't be found in the base directory.

This change processes both the staging directory and base table directory
and keeps the file capacity in a map, indexed by the files inode number, allowing us to easily
detect hard links and be resilient against concurrent move of files from the staging sub-directory
back into the base table directory.

Fixes #27635

* Minor issue, no backport required

Closes scylladb/scylladb#27636

* github.com:scylladb/scylladb:
  table: get_snapshot_details: add FIXME comments
  table: get_snapshot_details: lookup entries also in the staging directory
  table: get_snapshot_details: optimize using the entry number_of_links
  table: get_snapshot_details: continue loop for manifest and schema entries
  table: get_snapshot_details: use directory_lister
2025-12-22 20:02:40 +02:00
Botond Dénes
af5e73def9 Merge 'test/cqlpy: remove unused variables' from Nadav Har'El
These patches fix a bunch of variables defined in test/cqlpy tests, but not used. Besides wasting a few bytes on disk, these unused variables can add confusion for readers who see them and might think they have some use which they are missing.

All these unused variables were found by Copilot's "code quality" scanner, but I considered each of them, and fixed them manually.

Closes scylladb/scylladb#27667

* github.com:scylladb/scylladb:
  test/cqlpy: remove unused variables
  test/cqlpy: use unique partition in test
2025-12-22 20:02:39 +02:00
Anna Stuchlik
9793a45288 doc: add a Vector Search page under Features
This commit adds a page with an overview of Vector Search under the Features section.
It includes a link to the VS documentation in ScyllaDB Cloud,
as the feature is only available in ScyllaDB Cloud.

The purpose of the page is to raise awareness of the feature.

Fixes https://scylladb.atlassian.net/browse/VECTOR-215

Closes scylladb/scylladb#27787
2025-12-22 15:29:45 +02:00
Alex
033579ad6f db: api: service: Fix ClientConnectorError in test_client_routes The bug was caused by capturing local variables by reference in lambdas passed to with_retry(), which is a coroutine. When the coroutine suspends, the lambda frame exits and the referenced locals are destroyed, leading to use-after-lifetime issues. This change fixes the problem by ensuring safe ownership across suspension points and also refactors how route_keys and route_entries are passed from the caller. Previously they were passed as const lvalue references, which cannot be moved and therefore ended up being repeatedly copied across function calls and lambda invocations. The new approach avoids unnecessary copies and makes the lifetime semantics explicit and safe.
Fixes: 27792

no backport needed private link is only in master branch

Closes scylladb/scylladb#27795
2025-12-22 14:52:47 +02:00
Yaniv Michael Kaul
c1da552fa4 test/pylib/scylla_cluster.py:get_scylla_2025_1_executable() - retry curl download of 2025.1
For some reason, we might fail. Retry 10 times, and fail with an error code instead of 404 or whatnot.

Benign, I hope - no need to backport.
Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>

Closes scylladb/scylladb#27746
2025-12-22 14:45:06 +02:00
Piotr Smaron
cb3b96b8f4 raft: correct lease->least typo in a comment
Funny, when researching if our raft implementation relies on the 'lease'
mechanism, I noticed this typo.

Closes scylladb/scylladb#27803
2025-12-22 14:39:55 +02:00
Avi Kivity
b105ad8379 build: drop -fexperimental-assignment-tracking clang option
`-fexperimental-assignment-tracking` was added in fdd8b03d4b to
make coroutine debugging work.

However, since then, it became unnecessary, perhaps due to 87c0adb2fe,
or perhaps to a toolchain fix.

Drop it, so we can benefit from assignment tracking (whatever it is),
and to improve compatibility with sccache, which rejects this option.

I verified that the test added in fdd8b03d4b fails without the option
and passes with this patch; in other words we're not introducing a
regression here.

Closes scylladb/scylladb#27763
2025-12-22 14:33:48 +02:00
Karol Nowacki
addac8b3f7 vector_search: test: Fix flaky DNS resolution test
The `vector_store_client_test_dns_resolving_repeated` test had race
conditions causing it to be flaky. Two main issues were identified:

1. Race between initial refresh and manual trigger: The test assumes
    a specific resolution sequence, but timing variations between the
    initial DNS refresh (on client creation) and the first manual
    trigger (in the test loop) can cause unexpected delayed scheduling.

2. Extra triggers from resolve_hostname fiber: During the client
    refresh phase, the background DNS fiber clears the client list.
    If resolve_hostname executes in the window after clearing but
    before the update completes, pending triggers are processed,
    incrementing the resolution count unexpectedly. At count 6, the
    mock resolver returns a valid address (count % 3 == 0), causing
    the test to fail.

The fix relaxes test assertions to verify retry behavior and client
clearing on DNS address loss, rather than enforcing exact resolution
counts.

Fixes: #27074

Closes scylladb/scylladb#27685
2025-12-21 20:02:16 +02:00
Vlad Zolotarov
ea95cdaaec transport/server: declare a new "CLIENT_OPTIONS" option as supported
Declare support for a 'CLIENT_OPTIONS' startup key.
This key is meant to be used by drivers for sending client-specific
configurations like request timeouts values, retry policy configuration, etc.
The value of this key can be any string in general (according to the CQL binary protocol),
however, it's expected to be some structured format, e.g. JSON.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-20 12:26:22 -05:00
Vlad Zolotarov
28cbaef110 service/client_state and alternator/server: use cached values for driver_name and driver_version fields
Optimize memory usage changing types of driver_name and driver_version be
a reference to a cached value instead of an sstring.

These fields very often have the same value among different connections hence
it makes sense to cache these values and use references to them instead of duplicating
such strings in each connection state.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-20 12:26:22 -05:00
Vlad Zolotarov
85adf6bdb1 system.clients: add a client_options column
This new column is going to contain all OPTIONS sent in the
STARTUP frame of the corresponding CQL session.

The new column has a `frozen<map<text, text>>` type, and
we are also optimizing the amount of required memory for storing
corresponding keys and values by caching them on each shard level.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-20 12:26:15 -05:00
Vlad Zolotarov
3a54bab193 controller: update get_client_data to use foreign_ptr for client_data
get_client_data() is used to assemble `client_data` objects from each connection
on each CPU in the context of generation of the `system.clients` virtual table data.

After collected, `client_data` objects were std::moved and arranged into a
different structure to match the table's sorting requirements.

This didn't allow having not-cross-shard-movable objects as fields in the `client_data`,
e.g. lw_shared_ptr objects.

Since we are planning to add such fields to `client_data` in following patches this patch
is solving the limitation above by making get_client_data() return `foreign_ptr<std::unique_ptr<client_data>>`
objects instead of naked `client_data` ones.

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
2025-12-19 11:01:41 -05:00
Anna Stuchlik
f65db4e8eb doc: remove the links to the Download Center
This commit removes the remaining links to the Download Center on the website.
We no longer use it for installation, and we don't want users to infer that
something like that still exists.

Fixes https://github.com/scylladb/scylladb/issues/27753

Closes scylladb/scylladb#27756
2025-12-19 12:53:40 +01:00
Botond Dénes
df2ac0f257 Merge 'test: dtest: schema_management_test.py: migrate from dtest' from Dario Mirovic
This PR migrates schema management tests from dtest to this repository.

One reason is that there is an ongoing effort to migrate tests from dtest to here.

Test `TestLargePartitionAlterSchema.test_large_partition_with_drop_column` failed with timeout error once. The main suspect so far are infra related problems, like infra congestion. The [logs from the test execution](https://jenkins.scylladb.com/job/scylla-master/job/dtest-release/1062/testReport/junit/schema_management_test/TestLargePartitionAlterSchema/Run_Dtest_Parallel_Cloud_Machines___Dtest___full_split001___test_large_partition_with_drop_column/), linked in the issue [test_large_partition_with_drop_column failed on TimeoutError #26932](https://github.com/scylladb/scylladb/issues/26932) show the following:
- `populate` works as intended - it starts, then during populate/insert drop column happened, then an exception is raised and intentionally ignored in the test, so no `Finish populate DB` for 50 x 1490 records - expected
- drop column works as intended - interrupts `populate` and proceeds to flush
- flush **probably** works as intended - logs are consistent with what we expect and what I got in local test runs
- `read` is the only thing that visibly got stuck, all the way until timeout happened, 5 minutes after the start

Migrating the test to this repo will also give us test start and end times on CI machines, in the sql report database. It has start and end timestamp for each test executed. We will be able to see how long does it usually take when the test is successful. It can not be seen from the logs, because logs are not kept for successful tests.

Another thing this PR does is adding a log message at the end of `database::flush_all_tables`. This will let us know if a thread got stuck inside or finished successfully. This addresses the **probably** part of the flush analysis step described above. If the issue reoccurs, we will have more information.

The test `test_large_partition_with_add_column` has not been executing for ~5 years. It was never migrated to pytest. The name was left as `large_partition_with_add_column_test`, and was skipped. Now it is enabled and updated.

Both `test_large_partition_with_add_column` and `test_large_partition_with_drop_column` are improved.
Small performance improvements:
- Regex compilation extracted from the stress function to the module level, to avoid recompilation.
- Do not materialize list in `stress_object` for loop. Use a generator expression.

The tests in `TestLargePartitionAlterSchema` are `test_large_partition_with_add_column`
and `test_large_partition_with_drop_column`.

These tests need to replicate the following conditions that led to a bug before a fix from around 5 years ago.

The scenario in which the problem could have happened has to involve:
- a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
- appending writes to the partition (not overwrites)
- scans of the partition
- schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
  column lands in the middle (in alphabetical order) of the old column set.

The way the test is set up is:
- fixed number of writes per populate call
- fixed number of reads

This has the following implications:
- if the machine executing the test is fast, all the writes are done before the 10 seconds sleep
- there are too many reads - most of them get executed after the test logic is done

This patch solves these issues in the following way:
- populate lazily generates write data, and stops when instructed by `stop_populating` event
- read, which is done sequentially, stops when instructed by `stop_reading` event
- number of max operations is increased significantly, but the operations are stopped 1 second
  after node flush; this makes sure there are enough operations during the test, but also that
  the test does not take unnecessary time

Test execution time has been reduced severalfold. On dev machine the time the tests take is
reduced from 110 seconds to 34 seconds.

scylla-dtest PR that removes migrated tests:
[schema_management_test.py: remove tests already ported to scylladb repo #6427](https://github.com/scylladb/scylla-dtest/pull/6427)

Fixes #26932

This is a migration of existing tests to this repository. No need for backport.

Closes scylladb/scylladb#27106

* github.com:scylladb/scylladb:
  test: dtest: schema_management_test.py: speed up `TestLargePartitionAlterSchema` tests
  test: dtest: schema_management_test.py: fix large partition add column test
  test: dtest: schema_management_test.py: add `TestSchemaManagement.prepare`
  test: dtest: schema_management_test.py: test enhancements
  test: dtest: schema_management_test.py: make the tests work
  test: dtest: migrate setup and tools from dtest
  test: dtest: copy unmodified schema_management_test.py
  replica: database: flush_all_tables log on completion
2025-12-19 12:30:00 +02:00
Botond Dénes
093e97a539 Merge 'test: increase num of requests in driver_service_level tests' from Andrzej Jackowski
`_verify_tasks_processed_metrics()` is used to check that the correct
service level is used to process requests. It takes two service levels
as arguments and executes numerous requests. After that, the number
of tasks processed by one of the service levels is expected to rise
by at least the number of executed requests. In contrast,
the second service level is expected to process fewer tasks than
the number of requests.

Unfortunately, background noise may cause some tasks to be executed
on the service level that is not supposed to process requests.
This patch increases the number of executed requests to eliminate
the chance of noise causing test failures.

Additionally, this commit extends logging to make future investigation
easier.

Fixes: https://github.com/scylladb/scylladb/issues/27715

No backport, fix for test on master.

Closes scylladb/scylladb#27735

* github.com:scylladb/scylladb:
  test: remove unused `get_processed_tasks_for_group`
  test: increase num of requests in driver_service_level tests
2025-12-19 10:54:14 +02:00
Emil Maskovsky
fa6e5d0754 test/random_failures: fix handling of banned notification
After 39cec4a node join may fail with either "init - Startup failed"
notification or occasionally because it was banned, depending on timing.

The change updates the test to handle both cases.

Fixes: scylladb/scylladb#27697

No backport: This failure is only present in master.

Closes scylladb/scylladb#27768
2025-12-19 09:55:31 +02:00
Emil Maskovsky
08518b2c12 test/raft: fix test_joining_old_node_fails flakiness
When a node without the required feature attempts to join a Raft-based
cluster with the feature enabled, there is a race between the join
rejection response ("Feature check failed") and the ban notification
("received notification of being banned"). Depending on timing, either
message may appear in the joining node's log.

This starts to happen after 39cec4a (which introduced informing the
nodes about being banned).

Updated the test to accept both error messages as valid, making the test
robust against this race condition, which is more likely in debug mode
or under slow execution.

Fixes: scylladb/scylladb#27603

No backport: This failure is only present in master.

Closes scylladb/scylladb#27760
2025-12-19 09:44:09 +02:00
Emil Maskovsky
2a75b1374e test/raft: fix race condition in failure_detector_test
The test had a sporadic failure due to a broken promise exception.
The issue was in `test_pinger::ping()` which captured the promise by
move into the subscription lambda, causing the promise to be destroyed
when the lambda was destroyed during coroutine unwinding.

Simplify `test_pinger::ping()` by replacing manual abort_source/promise
logic with `seastar::sleep_abortable()`.
This removes the risk of promise lifetime/race issues and makes the code
simpler and more robust.

Fixes: scylladb/scylladb#27136

Backport to active branches: This fixes a CI test issue, so it is
beneficial to backport the fix. As this is a test-only fix, it is a low
risk change.

Closes scylladb/scylladb#27737
2025-12-19 09:42:19 +02:00
Łukasz Paszkowski
2cb9bb8f3a test_user_writes_rejection: Disable speculative retries
This test starts a 3-node cluster and creates a large blob file so that one
node reaches critical disk utilization, triggering write rejections on that
node. The test then writes data with CL=QUORUM and validates that the data:
- did not reach the critically utilized node
- did reach the remaining two nodes

By default, tables use speculative retries to determine when coordinators may
query additional replicas.

Since the validation uses CL=ONE, it is possible that an additional request
is sent to satisfy the consistency level. As a result:
- the first check may fail if the additional request is sent to a node that
  already contains data, making it appear as if data reached the critically
  utilized node
- the second check may fail if the additional request is sent to the critically
  utilized node, making it appear as if data did not reach the healthy node

The patch fixes the flakiness by disabling the speculative retries.

Fixes https://github.com/scylladb/scylladb/issues/27212

Closes scylladb/scylladb#27488
2025-12-19 09:39:09 +02:00
Dario Mirovic
f1d63d014c test: dtest: schema_management_test.py: speed up TestLargePartitionAlterSchema tests
The tests in `TestLargePartitionAlterSchema` are `test_large_partition_with_add_column`
and `test_large_partition_with_drop_column`.

These tests need to replicate the following conditions that led to a bug before a fix from around 5 years ago.

The scenario in which the problem could have happened has to involve:
- a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
- appending writes to the partition (not overwrites)
- scans of the partition
- schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
  column lands in the middle (in alphabetical order) of the old column set.

The way the test is set up is:
- fixed number of writes per populate call
- fixed number of reads

This has the following implications:
- if the machine executing the test is fast, all the writes are done before the 10 seconds sleep
- there are too many reads - most of them get executed after the test logic is done

This patch solves these issues in the following way:
- populate lazily generates write data, and stops when instructed by `stop_populating` event
- read, which is done sequentially, stops when instructed by `stop_reading` event
- number of max operations is increased significantly, but the operations are stopped 1 second
  after node flush; this makes sure there are enough operations during the test, but also that
  the test does not take unnecessary time

Test execution time has been reduced severalfold. On dev machine the time the tests take is
reduced from 110 seconds to 34 seconds.

The patch also introduces a few small improvements:
- `cs_run` renamed to `run_stress` for clarity
- Stopped checking if cluster is `ScyllaCluster`, since it is the only one we use
- `case_map` removed from `test_alter_table_in_parallel_to_read_and_write`, used `mixed` param directly
- Added explanation comment on why we do `data[i].append(None)`
- Replaced `alter_table` inner function with its body, for simplicity
- Removed unnecessary `ck_rows` variable in `populate`
- Removed unnecessary `isinstance(self.cluster. ScyllaCluster)`
- Adjusted `ThreadPoolExecutor` size in several places where 5 workers are not needed
- Replaced functional programming style expressions for `new_versions` and `columns_list` with
  comprehension/generator statement python style code, improving readability

Refs #26932

fix
2025-12-18 17:07:27 +01:00
Michael Litvak
33f7bc28da docs: document restrictions of colocated tables
Currently some things are not supported for colocated tables: it's not
possible to repair a colocated table, and due to this it's also not
possible to use the tombstone_gc=repair mode on a colocated table.

Extend the documentation to explain what colocated tables are and
document these restrictions.

Fixes scylladb/scylladb#27261

Closes scylladb/scylladb#27516
2025-12-18 15:38:29 +01:00
Dario Mirovic
f831ca5ab5 test: dtest: schema_management_test.py: fix large partition add column test
`large_partition_with_add_column_test` and `large_partition_with_drop_column_test`
were added on August 17th, 2020 in scylladb/scylla-dtest#1569.

Only `large_partition_with_drop_column_test` was migrated to pytest, and renamed
to `test_large_partition_with_drop_column` on March 31st, 2021 in scylladb/scylla-dtest#2051.
Since then this test has not been running.

This patch fixes it - the test is updated and renamed and the testing environment
now properly picks it up.

Refs #26932
2025-12-18 12:54:43 +01:00
Dario Mirovic
1fe0509a9b test: dtest: schema_management_test.py: add TestSchemaManagement.prepare
Extract repeated cluster initialization code in `TestSchemaManagement`
into a separate `prepare` method. It holds all the common code for
cluster preparation, with just the necessary parameters.

Refs #26932
2025-12-18 12:54:43 +01:00
Dario Mirovic
e7d76fd8f3 test: dtest: schema_management_test.py: test enhancements
Extract regex compilation from the stress functions to the module level,
to avoid unnecessary regex compilation repetition.

Add descriptions to the stress functions.

Do not materialize list in `stress_object` for loop. Use a generator expression.

Make `_set_stress_val` an object method.

Refs #26932
2025-12-18 12:54:43 +01:00
Dario Mirovic
700853740d test: dtest: schema_management_test.py: make the tests work
Remove unused function markers.
Add wait_other_notice=True to cluster start method in
TestSchemaHistory.prepare function to make the test stable.

Enable the test in suite.yaml for dev and debug modes.

Fixes #26932
2025-12-18 12:54:43 +01:00
Dario Mirovic
3c5dd5e5ae test: dtest: migrate setup and tools from dtest
Migrate several functionalities from dtest. These will be used by
the schema_management_test.py tests when they are enabled.

Refs #26932
2025-12-18 12:54:43 +01:00
Dario Mirovic
5971b2ad97 test: dtest: copy unmodified schema_management_test.py
Copy schema_management_test.py from scylla-dtest to
test/cluster/dtest/schema_management_test.py.

Add license header.

Disable it for debug, dev, and release mode.

Refs #26932
2025-12-18 12:54:42 +01:00
Dario Mirovic
f89315d02f replica: database: flush_all_tables log on completion
In database::flush_all_tables add log on completion.
This slightly improves the readability of logs when debugging an issue.

Refs #26932
2025-12-18 12:54:42 +01:00
Patryk Jędrzejczak
d5c205194b Merge 'topology: Make removenode use left_token_ring state for global barrier' from Emil Maskovsky
Make the removenode operation go through the `left_token_ring` state, similar to decommission. This ensures that when removenode completes, all nodes in the cluster are aware of the topology change through a global token metadata barrier.

Previously, removenode would skip the `left_token_ring` state and go directly from `write_both_read_new` to `left` state. This meant that when the operation completed, some nodes might not yet know about the topology change, potentially causing issues with subsequent data plane requests.

Key changes:
- Both decommission and removenode now transition to `left_token_ring` state in the `write_both_read_new` handler
- In `left_token_ring` state, only decommissioning nodes receive the shutdown RPC (removed nodes are already dead)
- Updated documentation to reflect that both operations use this state

This change improves consistency guarantees for removenode operations by ensuring cluster-wide awareness before completion.

The change is protected by "REMOVENODE_WITH_LEFT_TOKEN_RING" feature flag to also support mixed clusters during e.g. upgrade.

Fixes:  scylladb/scylladb#25530

No backport: This fixes and issue found in tests. It can theoretically happen in production too, but wasn't reported in any customer issue, so a backport is not needed.

Closes scylladb/scylladb#26931

* https://github.com/scylladb/scylladb:
  topology: make removenode use left_token_ring state for global barrier
  topology: allow removing nodes not having tokens
  features: add feature flag for removenode via left token ring
2025-12-18 09:34:38 +01:00
Andrzej Jackowski
6ad10b141a test: remove unused get_processed_tasks_for_group
The function `get_processed_tasks_for_group` was defined twice in
`test_raft_service_levels.py`. This change removes the unused
definition to avoid confusion and clean up the code.
2025-12-17 20:45:53 +01:00
Andrzej Jackowski
8cf8e6c87d test: increase num of requests in driver_service_level tests
`_verify_tasks_processed_metrics()` is used to check that the correct
service level is used to process requests. It takes two service levels
as arguments and executes numerous requests. After that, the number
of tasks processed by one of the service levels is expected to rise
by at least the number of executed requests. In contrast,
the second service level is expected to process fewer tasks than
the number of requests.

Unfortunately, background noise may cause some tasks to be executed
on the service level that is not supposed to process requests.
This patch increases the number of executed requests to eliminate
the chance of noise causing test failures.

Additionally, this commit extends logging to make future investigation
easier.

Fixes: scylladb/scylladb#27715
2025-12-17 20:45:48 +01:00
Emil Maskovsky
1642c686c2 topology: make removenode use left_token_ring state for global barrier
Make the removenode operation go through the `left_token_ring` state,
similar to decommission. This ensures that when removenode completes,
all nodes in the cluster are aware of the topology change through a
global token metadata barrier.

Previously, removenode would skip the `left_token_ring` state and go
directly from `write_both_read_new` to `left` state. This meant that
when the operation completed, some nodes might not yet know about the
topology change, potentially causing issues with subsequent data plane
requests.

Key changes:
- Both decommission and removenode now transition to `left_token_ring`
  state in the `write_both_read_new` handler
- In `left_token_ring` state, only decommissioning nodes receive the
  shutdown RPC (removed nodes may already be dead)
- Updated documentation to reflect that both operations use this state

This change improves consistency guarantees for removenode operations
by ensuring cluster-wide awareness before completion.

Fixes:  scylladb/scylladb#25530
2025-12-17 13:31:11 +01:00
Emil Maskovsky
9431826c52 topology: allow removing nodes not having tokens
For the changes to go through the left_token_ring state when
REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled, we need to allow
removing nodes to not have any tokens (similarly to decommissioning
nodes, which use the same sequence of states).

This means the tests also need to change to allow for this new behavior
- it can temporarily happen that a removing node has no tokens but is
still part of Raft group 0 (so there may be a temporary mismatch between
the token ring and group 0 membership).

Therefore, the `check_token_ring_and_group0_consistency` function is
replaced by `wait_for_token_ring_and_group0_consistency`, which waits
up to 30 seconds for consistency to be reached.
2025-12-17 13:31:11 +01:00
Emil Maskovsky
ba6fabfc88 features: add feature flag for removenode via left token ring
To improve the behavior of the removenode operation, we want to issue
a global topology barrier after the removenode has been applied.
However, this requires changing the topology state machine to add a new
state (left_token_ring) to the removenode flow, which is not supported
by older nodes.

To allow rolling upgrades, we add a feature flag
REMOVENODE_WITH_LEFT_TOKEN_RING that controls whether the new removenode
flow is used.
2025-12-17 13:31:11 +01:00
Benny Halevy
798714183e table: get_snapshot_details: add FIXME comments
Ref https://github.com/scylladb/seastar/pull/3163

We can optimize the stat calls we use here by
using open_directory to open the snapshot,
base, and staging directory once, and using statat
calls for the relative name instead of the full
blown file_stat that needs to traverse the whole
path prefix for every call (the dirents are likely
to be cached, but still why waste cpu cycles on that
over and over again).

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-16 18:45:56 +02:00
Benny Halevy
f5ca3657e2 table: get_snapshot_details: lookup entries also in the staging directory
Since the sstables in the snapshot may still be in the staging dir.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-16 18:42:05 +02:00
Benny Halevy
dc00461adf table: get_snapshot_details: optimize using the entry number_of_links
If the number_of_linkes equals 1, we can be sure that
the file exists only in the snapshot directory so there is no need
to look it up in the data directory.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-16 18:42:05 +02:00
Benny Halevy
be6d87648c table: get_snapshot_details: continue loop for manifest and schema entries
Now that we're using a simple loop in the coroutine just continue
the loop for files we want to ignore.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-16 18:42:05 +02:00
Benny Halevy
004c08f525 table: get_snapshot_details: use directory_lister
It is more efficient to use the coroutine generator
to list the directory.

Brewing changes in seastar would make the generator buffered
as well as adding an extended generation that would
return the file stat data for each entry, that would become
useful in the next patch that optimizes the algorithm by
considering the entry's link count.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2025-12-16 18:42:05 +02:00
Nadav Har'El
4e106b9820 test/cqlpy: remove unused variables
Copilot detected a few cases of cqlpy tests setting a variable which
they don't use. In all the cases in this patch, we can just remove
the variable. Although the AI found all these unused variables, I
verified each case carefully before changing it in this patch.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2025-12-15 18:11:04 +02:00
Nadav Har'El
4fa4f40712 test/cqlpy: use unique partition in test
It is traditional to use a unique (or random) partition key in cqlpy
tests, to allow multiple tests to share the same table and make the test
suite a bit faster. One of the tests, test_multi_column_relation_desc,
set up a unique key "k", but then forgot to use it and used partition
key 0 instead. Fix the test to use this k.

This problem was spotted by Copilot, who saw the unused variable k.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2025-12-15 17:08:51 +02:00
Michał Jadwiszczak
aa908ba99c docs/dev/view-building-coordinator: fix typos 2025-12-04 12:52:42 +01:00
Michał Jadwiszczak
529cd25c51 db/view/view_building_worker: remove unnnecessary empty lines 2025-12-04 12:52:42 +01:00
Michał Jadwiszczak
4fc5fcaec4 db/view/view_building_worker: fix typo 2025-12-04 12:52:42 +01:00
Michał Jadwiszczak
3253b05ec9 db/view/view_building_worker: avoid creating a copy of tasks map
The loop can be converted to use an iterator and avoid creating a copy
of the tasks map.
2025-12-04 12:52:41 +01:00
Michał Jadwiszczak
597a2ce5f9 db/view/view_building_worker: wrap conditionally compiled code in a scope
The code creates a local variable, so it's better to wrap it in a local
scope, to the conditionally compiled variable doesn't pollute the
external scope.
2025-12-04 12:52:41 +01:00
Michał Jadwiszczak
a5f19af050 db/view/view_building_worker: remove unnecessary CV broadcast
After scylladb/scylladb#26897 was merged, the worker doesn't use the
view building state machine CV to manage lifetime of batches, so the
broadcast is not needed.
2025-12-04 12:52:41 +01:00
Michał Jadwiszczak
b4fe565f07 db/view/view_building_worker: catch general execption in staging task registrator
In case of general exception in `view_building_worker::create_staging_sstable_tasks()`,
catch it, print it with error level and sleep 1s before retrying.
This will allow for the registrator to retry its work in case of failure
and it should be easier to detect any bugs in the method.
2025-12-04 12:52:37 +01:00
63 changed files with 1474 additions and 326 deletions

View File

@@ -169,7 +169,7 @@ future<> controller::request_stop_server() {
});
}
future<utils::chunked_vector<client_data>> controller::get_client_data() {
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
return _server.local().get_client_data();
}

View File

@@ -93,7 +93,7 @@ public:
// This virtual function is called (on each shard separately) when the
// virtual table "system.clients" is read. It is expected to generate a
// list of clients connected to this server (on this shard).
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
};
}

View File

@@ -708,8 +708,12 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// As long as the system_clients_entry object is alive, this request will
// be visible in the "system.clients" virtual table. When requested, this
// entry will be formatted by server::ongoing_request::make_client_data().
auto user_agent_header = co_await _connection_options_keys_and_values.get_or_load(req->get_header("User-Agent"), [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto system_clients_entry = _ongoing_requests.emplace(
req->get_client_address(), req->get_header("User-Agent"),
req->get_client_address(), std::move(user_agent_header),
username, current_scheduling_group(),
req->get_protocol_name() == "https");
@@ -985,10 +989,10 @@ client_data server::ongoing_request::make_client_data() const {
return cd;
}
future<utils::chunked_vector<client_data>> server::get_client_data() {
utils::chunked_vector<client_data> ret;
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
co_await _ongoing_requests.for_each_gently([&ret] (const ongoing_request& r) {
ret.emplace_back(r.make_client_data());
ret.emplace_back(make_foreign(std::make_unique<client_data>(r.make_client_data())));
});
co_return ret;
}

View File

@@ -55,6 +55,7 @@ class server : public peering_sharded_service<server> {
// though it isn't really relevant for Alternator which defines its own
// timeouts separately. We can create this object only once.
updateable_timeout_config _timeout_config;
client_options_cache_type _connection_options_keys_and_values;
alternator_callbacks_map _callbacks;
@@ -88,7 +89,7 @@ class server : public peering_sharded_service<server> {
// is called when reading the "system.clients" virtual table.
struct ongoing_request {
socket_address _client_address;
sstring _user_agent;
client_options_cache_entry_type _user_agent;
sstring _username;
scheduling_group _scheduling_group;
bool _is_https;
@@ -107,7 +108,7 @@ public:
// table "system.clients" is read. It is expected to generate a list of
// clients connected to this server (on this shard). This function is
// called by alternator::controller::get_client_data().
future<utils::chunked_vector<client_data>> get_client_data();
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
private:
void set_routes(seastar::httpd::routes& r);
// If verification succeeds, returns the authenticated user's username

View File

@@ -100,9 +100,8 @@ rest_set_client_routes(http_context& ctx, sharded<service::client_routes_service
rapidjson::Document root;
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
const auto route_entries = parse_set_client_array(root);
co_await cr.local().set_client_routes(route_entries);
co_await cr.local().set_client_routes(parse_set_client_array(root));
co_return seastar::json::json_void();
}
@@ -132,8 +131,7 @@ rest_delete_client_routes(http_context& ctx, sharded<service::client_routes_serv
auto content = co_await util::read_entire_stream_contiguous(*req->content_stream);
root.Parse(content.c_str());
const auto route_keys = parse_delete_client_array(root);
co_await cr.local().delete_client_routes(route_keys);
co_await cr.local().delete_client_routes(parse_delete_client_array(root));
co_return seastar::json::json_void();
}

View File

@@ -10,7 +10,9 @@
#include <seastar/net/inet_address.hh>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
#include "utils/loading_shared_values.hh"
#include <list>
#include <optional>
enum class client_type {
@@ -27,6 +29,20 @@ enum class client_connection_stage {
ready,
};
// We implement a keys cache using a map-like utils::loading_shared_values container by storing empty values.
struct options_cache_value_type {};
using client_options_cache_type = utils::loading_shared_values<sstring, options_cache_value_type>;
using client_options_cache_entry_type = client_options_cache_type::entry_ptr;
using client_options_cache_key_type = client_options_cache_type::key_type;
// This struct represents a single OPTION key-value pair from the client's connection options.
// Both key and value are represented by corresponding "references" to their cached values.
// Each "reference" is effectively a lw_shared_ptr value.
struct client_option_key_value_cached_entry {
client_options_cache_entry_type key;
client_options_cache_entry_type value;
};
sstring to_string(client_connection_stage ct);
// Representation of a row in `system.clients'. std::optionals are for nullable cells.
@@ -37,8 +53,8 @@ struct client_data {
client_connection_stage connection_stage = client_connection_stage::established;
int32_t shard_id; /// ID of server-side shard which is processing the connection.
std::optional<sstring> driver_name;
std::optional<sstring> driver_version;
std::optional<client_options_cache_entry_type> driver_name;
std::optional<client_options_cache_entry_type> driver_version;
std::optional<sstring> hostname;
std::optional<int32_t> protocol_version;
std::optional<sstring> ssl_cipher_suite;
@@ -46,6 +62,7 @@ struct client_data {
std::optional<sstring> ssl_protocol;
std::optional<sstring> username;
std::optional<sstring> scheduling_group_name;
std::list<client_option_key_value_cached_entry> client_options;
sstring stage_str() const { return to_string(connection_stage); }
sstring client_type_str() const { return to_string(ct); }

View File

@@ -125,10 +125,6 @@ if(target_arch)
add_compile_options("-march=${target_arch}")
endif()
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options("SHELL:-Xclang -fexperimental-assignment-tracking=disabled")
endif()
function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
math(EXPR _stack_usage_threshold_in_bytes "${stack_usage_threshold_in_KB} * 1024")
set(_stack_usage_threshold_flag "-Wstack-usage=${_stack_usage_threshold_in_bytes}")

View File

@@ -2251,15 +2251,6 @@ def get_extra_cxxflags(mode, mode_config, cxx, debuginfo):
if debuginfo and mode_config['can_have_debug_info']:
cxxflags += ['-g', '-gz']
if 'clang' in cxx:
# Since AssignmentTracking was enabled by default in clang
# (llvm/llvm-project@de6da6ad55d3ca945195d1cb109cb8efdf40a52a)
# coroutine frame debugging info (`coro_frame_ty`) is broken.
#
# It seems that we aren't losing much by disabling AssigmentTracking,
# so for now we choose to disable it to get `coro_frame_ty` back.
cxxflags.append('-Xclang -fexperimental-assignment-tracking=disabled')
return cxxflags

View File

@@ -3157,7 +3157,10 @@ static bool must_have_tokens(service::node_state nst) {
// A decommissioning node doesn't have tokens at the end, they are
// removed during transition to the left_token_ring state.
case service::node_state::decommissioning: return false;
case service::node_state::removing: return true;
// A removing node might or might not have tokens depending on whether
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
// cases, we allow removing nodes to not have tokens.
case service::node_state::removing: return false;
case service::node_state::rebuilding: return true;
case service::node_state::normal: return true;
case service::node_state::left: return false;

View File

@@ -198,6 +198,7 @@ future<> view_building_worker::register_staging_sstable_tasks(std::vector<sstabl
future<> view_building_worker::run_staging_sstables_registrator() {
while (!_as.abort_requested()) {
bool sleep = false;
try {
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
co_await create_staging_sstable_tasks();
@@ -214,6 +215,14 @@ future<> view_building_worker::run_staging_sstables_registrator() {
vbw_logger.warn("Got group0_concurrent_modification while creating staging sstable tasks");
} catch (raft::request_aborted&) {
vbw_logger.warn("Got raft::request_aborted while creating staging sstable tasks");
} catch (...) {
vbw_logger.error("Exception while creating staging sstable tasks: {}", std::current_exception());
sleep = true;
}
if (sleep) {
vbw_logger.debug("Sleeping after exception.");
co_await seastar::sleep_abortable(1s, _as).handle_exception([] (auto x) { return make_ready_future<>(); });
}
}
}
@@ -417,9 +426,12 @@ future<> view_building_worker::check_for_aborted_tasks() {
auto my_host_id = vbw._db.get_token_metadata().get_topology().my_host_id();
auto my_replica = locator::tablet_replica{my_host_id, this_shard_id()};
auto tasks_map = vbw._state._batch->tasks; // Potentially, we'll remove elements from the map, so we need a copy to iterate over it
for (auto& [id, t]: tasks_map) {
auto task_opt = building_state.get_task(t.base_id, my_replica, id);
auto it = vbw._state._batch->tasks.begin();
while (it != vbw._state._batch->tasks.end()) {
auto id = it->first;
auto task_opt = building_state.get_task(it->second.base_id, my_replica, id);
++it; // Advance the iterator before potentially removing the entry from the map.
if (!task_opt || task_opt->get().aborted) {
co_await vbw._state._batch->abort_task(id);
}
@@ -449,7 +461,7 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
}) | std::ranges::to<std::unordered_set>();;
}
// If `state::processing_base_table` is diffrent that the `view_building_state::currently_processed_base_table`,
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
// clear the state, save and flush new base table
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
if (processing_base_table != building_state.currently_processed_base_table) {
@@ -571,8 +583,6 @@ future<> view_building_worker::batch::do_work() {
break;
}
}
_vbw.local()._vb_state_machine.event.broadcast();
}
future<> view_building_worker::do_build_range(table_id base_id, std::vector<table_id> views_ids, dht::token last_token, abort_source& as) {
@@ -774,13 +784,15 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
tasks.insert({id, *task_opt});
}
#ifdef SEASTAR_DEBUG
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
{
auto& some_task = tasks.begin()->second;
for (auto& [_, t]: tasks) {
SCYLLA_ASSERT(t.base_id == some_task.base_id);
SCYLLA_ASSERT(t.last_token == some_task.last_token);
SCYLLA_ASSERT(t.replica == some_task.replica);
SCYLLA_ASSERT(t.type == some_task.type);
SCYLLA_ASSERT(t.replica.shard == this_shard_id());
}
}
#endif
@@ -811,25 +823,6 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
co_return collect_completed_tasks();
}
}
}

View File

@@ -749,6 +749,7 @@ class clients_table : public streaming_virtual_table {
.with_column("ssl_protocol", utf8_type)
.with_column("username", utf8_type)
.with_column("scheduling_group", utf8_type)
.with_column("client_options", map_type_impl::get_instance(utf8_type, utf8_type, false))
.with_hash_version()
.build();
}
@@ -766,7 +767,7 @@ class clients_table : public streaming_virtual_table {
future<> execute(reader_permit permit, result_collector& result, const query_restrictions& qr) override {
// Collect
using client_data_vec = utils::chunked_vector<client_data>;
using client_data_vec = utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>;
using shard_client_data = std::vector<client_data_vec>;
std::vector<foreign_ptr<std::unique_ptr<shard_client_data>>> cd_vec;
cd_vec.resize(smp::count);
@@ -806,13 +807,13 @@ class clients_table : public streaming_virtual_table {
for (unsigned i = 0; i < smp::count; i++) {
for (auto&& ps_cdc : *cd_vec[i]) {
for (auto&& cd : ps_cdc) {
if (cd_map.contains(cd.ip)) {
cd_map[cd.ip].emplace_back(std::move(cd));
if (cd_map.contains(cd->ip)) {
cd_map[cd->ip].emplace_back(std::move(cd));
} else {
dht::decorated_key key = make_partition_key(cd.ip);
dht::decorated_key key = make_partition_key(cd->ip);
if (this_shard_owns(key) && contains_key(qr.partition_range(), key)) {
ips.insert(decorated_ip{std::move(key), cd.ip});
cd_map[cd.ip].emplace_back(std::move(cd));
ips.insert(decorated_ip{std::move(key), cd->ip});
cd_map[cd->ip].emplace_back(std::move(cd));
}
}
co_await coroutine::maybe_yield();
@@ -825,39 +826,58 @@ class clients_table : public streaming_virtual_table {
co_await result.emit_partition_start(dip.key);
auto& clients = cd_map[dip.ip];
std::ranges::sort(clients, [] (const client_data& a, const client_data& b) {
return a.port < b.port || a.client_type_str() < b.client_type_str();
std::ranges::sort(clients, [] (const foreign_ptr<std::unique_ptr<client_data>>& a, const foreign_ptr<std::unique_ptr<client_data>>& b) {
return a->port < b->port || a->client_type_str() < b->client_type_str();
});
for (const auto& cd : clients) {
clustering_row cr(make_clustering_key(cd.port, cd.client_type_str()));
set_cell(cr.cells(), "shard_id", cd.shard_id);
set_cell(cr.cells(), "connection_stage", cd.stage_str());
if (cd.driver_name) {
set_cell(cr.cells(), "driver_name", *cd.driver_name);
clustering_row cr(make_clustering_key(cd->port, cd->client_type_str()));
set_cell(cr.cells(), "shard_id", cd->shard_id);
set_cell(cr.cells(), "connection_stage", cd->stage_str());
if (cd->driver_name) {
set_cell(cr.cells(), "driver_name", cd->driver_name->key());
}
if (cd.driver_version) {
set_cell(cr.cells(), "driver_version", *cd.driver_version);
if (cd->driver_version) {
set_cell(cr.cells(), "driver_version", cd->driver_version->key());
}
if (cd.hostname) {
set_cell(cr.cells(), "hostname", *cd.hostname);
if (cd->hostname) {
set_cell(cr.cells(), "hostname", *cd->hostname);
}
if (cd.protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd.protocol_version);
if (cd->protocol_version) {
set_cell(cr.cells(), "protocol_version", *cd->protocol_version);
}
if (cd.ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd.ssl_cipher_suite);
if (cd->ssl_cipher_suite) {
set_cell(cr.cells(), "ssl_cipher_suite", *cd->ssl_cipher_suite);
}
if (cd.ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd.ssl_enabled);
if (cd->ssl_enabled) {
set_cell(cr.cells(), "ssl_enabled", *cd->ssl_enabled);
}
if (cd.ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd.ssl_protocol);
if (cd->ssl_protocol) {
set_cell(cr.cells(), "ssl_protocol", *cd->ssl_protocol);
}
set_cell(cr.cells(), "username", cd.username ? *cd.username : sstring("anonymous"));
if (cd.scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd.scheduling_group_name);
set_cell(cr.cells(), "username", cd->username ? *cd->username : sstring("anonymous"));
if (cd->scheduling_group_name) {
set_cell(cr.cells(), "scheduling_group", *cd->scheduling_group_name);
}
auto map_type = map_type_impl::get_instance(
utf8_type,
utf8_type,
false
);
auto prepare_client_options = [] (const auto& client_options) {
map_type_impl::native_type tmp;
for (auto& co: client_options) {
auto map_element = std::make_pair(data_value(co.key.key()), data_value(co.value.key()));
tmp.push_back(std::move(map_element));
}
return tmp;
};
set_cell(cr.cells(), "client_options",
make_map_value(map_type, prepare_client_options(cd->client_options)));
co_await result.emit_row(std::move(cr));
}
co_await result.emit_partition_end();

View File

@@ -365,7 +365,7 @@ Modifying a keyspace with tablets enabled is possible and doesn't require any sp
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
- An RF change cannot be requested while another RF change is pending for the same keyspace. Attempting to execute an ``ALTER`` statement in this scenario will fail with an explicit error. Wait for the ongoing RF change to complete before issuing another ``ALTER`` statement.
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
- The ``ALTER`` statement will fail if it would make the keyspace :term:`RF-rack-invalid <RF-rack-valid keyspace>`.
@@ -1043,6 +1043,8 @@ The following modes are available:
* - ``immediate``
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
.. _cql-per-table-tablet-options:
Per-table tablet options

View File

@@ -74,6 +74,8 @@ The keys and values are:
as an indicator to which shard client wants to connect. The desired shard number
is calculated as: `desired_shard_no = client_port % SCYLLA_NR_SHARDS`.
Its value is a decimal representation of type `uint16_t`, by default `19142`.
- `CLIENT_OPTIONS` is a string containing a JSON object representation that
contains CQL Driver configuration, e.g. load balancing policy, retry policy, timeouts, etc.
Currently, one `SCYLLA_SHARDING_ALGORITHM` is defined,
`biased-token-round-robin`. To apply the algorithm,

View File

@@ -86,6 +86,7 @@ stateDiagram-v2
de_left_token_ring --> [*]
}
state removing {
re_left_token_ring : left_token_ring
re_tablet_draining : tablet_draining
re_tablet_migration : tablet_migration
re_write_both_read_old : write_both_read_old
@@ -98,7 +99,8 @@ stateDiagram-v2
re_tablet_draining --> re_write_both_read_old
re_write_both_read_old --> re_write_both_read_new: streaming completed
re_write_both_read_old --> re_rollback_to_normal: rollback
re_write_both_read_new --> [*]
re_write_both_read_new --> re_left_token_ring
re_left_token_ring --> [*]
}
rebuilding --> normal: streaming completed
decommissioning --> left: operation succeeded
@@ -122,9 +124,10 @@ Note that these are not all states, as there are other states specific to tablet
Writes to vnodes-based tables are going to both new and old replicas (new replicas means calculated according
to modified token ring), reads are using old replicas.
- `write_both_read_new` - as above, but reads are using new replicas.
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
it from group 0. We also use this state to rollback a failed bootstrap or decommission.
- `left_token_ring` - the decommissioning or removing node left the token ring, but we still need to wait until other
nodes observe it and stop sending writes to this node. For decommission, we tell the node to shut down,
then remove it from group 0. For removenode, the node is already down, so we skip the shutdown step.
We also use this state to rollback a failed bootstrap or decommission.
- `rollback_to_normal` - the decommission or removenode operation failed. Rollback the operation by
moving the node we tried to decommission/remove back to the normal state.
- `lock` - the topology stays in this state until externally changed (to null state), preventing topology
@@ -141,7 +144,9 @@ reads that started before this point exist in the system. Finally we remove the
transitioning state.
Decommission, removenode and replace work similarly, except they don't go through
`commit_cdc_generation`.
`commit_cdc_generation`. Both decommission and removenode go through the
`left_token_ring` state to run a global barrier ensuring all nodes are aware
of the topology change before the operation completes.
The state machine may also go only through the `commit_cdc_generation` state
after getting a request from the user to create a new CDC generation if the

View File

@@ -41,12 +41,12 @@ Unless the task was aborted, the worker will eventually reply that the task was
it temporarily saves list of ids of finished tasks and removes those tasks from group0 state (pernamently marking them as finished) in 200ms intervals. (*)
This batching of removing finished tasks is done in order to reduce number of generated group0 operations.
On the other hand, view buildind tasks can can also be aborted due to 2 main reasons:
On the other hand, view building tasks can can also be aborted due to 2 main reasons:
- a keyspace/view was dropped
- tablet operations (see [tablet operations section](#tablet-operations))
In the first case we simply delete relevant view building tasks as they are no longer needed.
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task informations
to created a new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
But if a task needs to be aborted due to tablet operation, we're firstly setting the `aborted` flag to true. We need to do this because we need the task information
to create new adjusted tasks (if the operation succeeded) or rollback them (if the operation failed).
Once a task is aborted by setting the flag, this cannot be revoked, so rolling back a task means creating its duplicate and removing the original task.
(*) - Because there is a time gap between when the coordinator learns that a task is finished (from the RPC response) and when the task is marked as completed,

View File

@@ -17,6 +17,7 @@ This document highlights ScyllaDB's key data modeling features.
Workload Prioritization </features/workload-prioritization>
Backup and Restore </features/backup-and-restore>
Incremental Repair </features/incremental-repair/>
Vector Search </features/vector-search/>
.. panel-box::
:title: ScyllaDB Features
@@ -43,3 +44,5 @@ This document highlights ScyllaDB's key data modeling features.
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
efficient and lightweight approach to maintaining data consistency by
repairing only the data that has changed since the last repair.
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
similarity-based queries on vector embeddings.

View File

@@ -0,0 +1,55 @@
=================================
Vector Search in ScyllaDB
=================================
.. note::
This feature is currently available only in `ScyllaDB Cloud <https://cloud.docs.scylladb.com/>`_.
What Is Vector Search
-------------------------
Vector Search enables similarity-based queries over high-dimensional data,
such as text, images, audio, or user behavior. Instead of searching for exact
matches, it allows applications to find items that are semantically similar to
a given input.
To do this, Vector Search works on vector embeddings, which are numerical
representations of data that capture semantic meaning. This enables queries
such as:
* “Find documents similar to this paragraph”
* “Find products similar to what the user just viewed”
* “Find previous tickets related to this support request”
Rather than relying on exact values or keywords, Vector Search returns results
based on distance or similarity between vectors. This capability is
increasingly used in modern workloads such as AI-powered search, recommendation
systems, and retrieval-augmented generation (RAG).
Why Vector Search Matters
------------------------------------
Many applications already rely on ScyllaDB for high throughput, low and
predictable latency, and large-scale data storage.
Vector Search complements these strengths by enabling new classes of workloads,
including:
* Semantic search over text or documents
* Recommendations based on user or item similarity
* AI and ML applications, including RAG pipelines
* Anomaly and pattern detection
With Vector Search, ScyllaDB can serve as the similarity search backend for
AI-driven applications.
Availability
--------------
Vector Search is currently available only in ScyllaDB Cloud, the fully managed
ScyllaDB service.
👉 For details on using Vector Search, refer to the
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/index.html>`_.

View File

@@ -20,7 +20,10 @@ You can run your ScyllaDB workloads on AWS, GCE, and Azure using a ScyllaDB imag
Amazon Web Services (AWS)
-----------------------------
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`, :ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`, and :ref:`i7ie <system-requirements-i7ie-instances>`.
The recommended instance types are :ref:`i3en <system-requirements-i3en-instances>`,
:ref:`i4i <system-requirements-i4i-instances>`, :ref:`i7i <system-requirements-i7i-instances>`,
:ref:`i7ie <system-requirements-i7ie-instances>`, :ref:`i8g<system-requirements-i8g-instances>`,
and :ref:`i8ge <system-requirements-i8ge-instances>`.
.. note::
@@ -195,6 +198,118 @@ All i7i instances have the following specs:
See `Amazon EC2 I7i Instances <https://aws.amazon.com/ec2/instance-types/i7i/>`_ for details.
.. _system-requirements-i8g-instances:
i8g instances
^^^^^^^^^^^^^^
The following i8g instances are supported.
.. list-table::
:widths: 30 20 20 30
:header-rows: 1
* - Model
- vCPU
- Mem (GiB)
- Storage (GB)
* - i8g.large
- 2
- 16
- 1 x 468 GB
* - i8g.xlarge
- 4
- 32
- 1 x 937 GB
* - i8g.2xlarge
- 8
- 64
- 1 x 1,875 GB
* - i8g.4xlarge
- 16
- 128
- 1 x 3,750 GB
* - i8g.8xlarge
- 32
- 256
- 2 x 3,750 GB
* - i8g.12xlarge
- 48
- 384
- 3 x 3,750 GB
* - i8g.16xlarge
- 64
- 512
- 4 x 3,750 GB
All i8g instances have the following specs:
* Powered by AWS Graviton4 processors
* 3rd generation AWS Nitro SSD storage
* DDR5-5600 memory for improved throughput
* Up to 100 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
Amazon Elastic Block Store (EBS)
* Instance sizes offer up to 45 TB of total local NVMe instance storage
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
.. _system-requirements-i8ge-instances:
i8ge instances
^^^^^^^^^^^^^^
The following i8ge instances are supported.
.. list-table::
:widths: 30 20 20 30
:header-rows: 1
* - Model
- vCPU
- Mem (GiB)
- Storage (GB)
* - i8ge.large
- 2
- 16
- 1 x 1,250 GB
* - i8ge.xlarge
- 4
- 32
- 1 x 2,500 GB
* - i8ge.2xlarge
- 8
- 64
- 2 x 2,500 GB
* - i8ge.3xlarge
- 12
- 96
- 1 x 7,500 GB
* - i8ge.6xlarge
- 24
- 192
- 2 x 7,500 GB
* - i8ge.12xlarge
- 48
- 384
- 4 x 7,500 GB
* - i8ge.18xlarge
- 72
- 576
- 6 x 7,500 GB
All i8ge instances have the following specs:
* Powered by AWS Graviton4 processors
* 3rd generation AWS Nitro SSD storage
* DDR5-5600 memory for improved throughput
* Up to 300 Gbps of networking bandwidth and up to 60 Gbps of bandwidth to
Amazon Elastic Block Store (EBS)
* Instance sizes offer up to 120 TB of total local NVMe instance storage
See `Amazon EC2 I8g Instances <https://aws.amazon.com/ec2/instance-types/i8g/>`_ for details.
Im4gn and Is4gen instances
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ScyllaDB supports Arm-based Im4gn and Is4gen instances. See `Amazon EC2 Im4gn and Is4gen instances <https://aws.amazon.com/ec2/instance-types/i4g/>`_ for specification details.

View File

@@ -25,8 +25,7 @@ Getting Started
:id: "getting-started"
:class: my-panel
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/#core>`_ - Links to the ScyllaDB Download Center
* :doc:`Install ScyllaDB </getting-started/install-scylla/index/>`
* :doc:`Configure ScyllaDB </getting-started/system-configuration/>`
* :doc:`Run ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
* :doc:`Create a ScyllaDB Cluster - Single Data Center (DC) </operating-scylla/procedures/cluster-management/create-cluster/>`

View File

@@ -3,8 +3,7 @@
ScyllaDB Housekeeping and how to disable it
============================================
It is always recommended to run the latest version of ScyllaDB.
The latest stable release version is always available from the `Download Center <https://www.scylladb.com/download/>`_.
It is always recommended to run the latest stable version of ScyllaDB.
When you install ScyllaDB, it installs by default two services: **scylla-housekeeping-restart** and **scylla-housekeeping-daily**. These services check for the latest ScyllaDB version and prompt the user if they are using a version that is older than what is publicly available.
Information about your ScyllaDB deployment, including the ScyllaDB version currently used, as well as unique user and server identifiers, are collected by a centralized service.

View File

@@ -9,6 +9,8 @@ Running ``cluster repair`` on a **single node** synchronizes all data on all nod
To synchronize all data in clusters that have both tablets-based and vnodes-based keyspaces, run :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair/>` on **all**
of the nodes in the cluster, and :doc:`nodetool cluster repair </operating-scylla/nodetool-commands/cluster/repair/>` on **any** of the nodes in the cluster.
.. warning:: :term:`Colocated Tables <Colocated Table>` cannot be synchronized using cluster repair in this version.
To check if a keyspace enables tablets, use:
.. code-block:: cql

View File

@@ -202,3 +202,7 @@ Glossary
The name comes from two basic operations, multiply (MU) and rotate (R), used in its inner loop.
The MurmurHash3 version used in ScyllaDB originated from `Apache Cassandra <https://commons.apache.org/proper/commons-codec/apidocs/org/apache/commons/codec/digest/MurmurHash3.html>`_, and is **not** identical to the `official MurmurHash3 calculation <https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/MurmurHash.java#L31-L33>`_. More `here <https://github.com/russss/murmur3-cassandra>`_.
Colocated Table
An internal table of a special type in a :doc:`tablets </architecture/tablets>` enabled keyspace that is colocated with another base table, meaning it always has the same tablet replicas as the base table.
Current types of colocated tables include CDC log tables, local indexes, and materialized views that have the same partition key as their base table.

View File

@@ -177,6 +177,7 @@ public:
gms::feature driver_service_level { *this, "DRIVER_SERVICE_LEVEL"sv };
gms::feature strongly_consistent_tables { *this, "STRONGLY_CONSISTENT_TABLES"sv };
gms::feature client_routes { *this, "CLIENT_ROUTES"sv };
gms::feature removenode_with_left_token_ring { *this, "REMOVENODE_WITH_LEFT_TOKEN_RING"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;

View File

@@ -176,7 +176,7 @@ void fsm::become_leader() {
_last_election_time = _clock.now();
_ping_leader = false;
// a new leader needs to commit at lease one entry to make sure that
// a new leader needs to commit at least one entry to make sure that
// all existing entries in its log are committed as well. Also it should
// send append entries RPC as soon as possible to establish its leadership
// (3.4). Do both of those by committing a dummy entry.

View File

@@ -2793,6 +2793,7 @@ future<> database::flush_all_tables() {
});
_all_tables_flushed_at = db_clock::now();
co_await _commitlog->wait_for_pending_deletes();
dblog.info("Forcing new commitlog segment and flushing all tables complete");
}
future<db_clock::time_point> database::get_all_tables_flushed_at(sharded<database>& sharded_db) {

View File

@@ -3385,16 +3385,15 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
continue;
}
lister::scan_dir(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>(), [datadir, &all_snapshots] (fs::path snapshots_dir, directory_entry de) {
auto snapshot_name = de.name;
auto lister = directory_lister(snapshots_dir, lister::dir_entry_types::of<directory_entry_type::directory>());
while (auto de = lister.get().get()) {
auto snapshot_name = de->name;
all_snapshots.emplace(snapshot_name, snapshot_details());
return get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).then([&all_snapshots, snapshot_name] (auto details) {
auto& sd = all_snapshots.at(snapshot_name);
sd.total += details.total;
sd.live += details.live;
return make_ready_future<>();
});
}).get();
auto details = get_snapshot_details(snapshots_dir / fs::path(snapshot_name), datadir).get();
auto& sd = all_snapshots.at(snapshot_name);
sd.total += details.total;
sd.live += details.live;
}
}
return all_snapshots;
});
@@ -3402,38 +3401,61 @@ future<std::unordered_map<sstring, table::snapshot_details>> table::get_snapshot
future<table::snapshot_details> table::get_snapshot_details(fs::path snapshot_dir, fs::path datadir) {
table::snapshot_details details{};
std::optional<fs::path> staging_dir = snapshot_dir / sstables::staging_dir;
if (!co_await file_exists(staging_dir->native())) {
staging_dir.reset();
}
co_await lister::scan_dir(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>(), [datadir, &details] (fs::path snapshot_dir, directory_entry de) -> future<> {
auto sd = co_await io_check(file_stat, (snapshot_dir / de.name).native(), follow_symlink::no);
auto lister = directory_lister(snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
while (auto de = co_await lister.get()) {
const auto& name = de->name;
// FIXME: optimize stat calls by keeping the base directory open and use statat instead, here and below.
// See https://github.com/scylladb/seastar/pull/3163
auto sd = co_await io_check(file_stat, (snapshot_dir / name).native(), follow_symlink::no);
auto size = sd.allocated_size;
// The manifest and schema.sql files are the only files expected to be in this directory not belonging to the SSTable.
//
// All the others should just generate an exception: there is something wrong, so don't blindly
// add it to the size.
if (de.name != "manifest.json" && de.name != "schema.cql") {
if (name != "manifest.json" && name != "schema.cql") {
details.total += size;
if (sd.number_of_links == 1) {
// File exists only in the snapshot directory.
details.live += size;
continue;
}
// If the number of linkes is greater than 1, it is still possible that the file is linked to another snapshot
// So check the datadir for the file too.
} else {
size = 0;
continue;
}
try {
auto exists_in_dir = [&] (fs::path path) -> future<bool> {
try {
// File exists in the main SSTable directory. Snapshots are not contributing to size
auto psd = co_await io_check(file_stat, (datadir / de.name).native(), follow_symlink::no);
auto psd = co_await io_check(file_stat, path.native(), follow_symlink::no);
// File in main SSTable directory must be hardlinked to the file in the snapshot dir with the same name.
if (psd.device_id != sd.device_id || psd.inode_number != sd.inode_number) {
dblog.warn("[{} device_id={} inode_number={} size={}] is not the same file as [{} device_id={} inode_number={} size={}]",
(datadir / de.name).native(), psd.device_id, psd.inode_number, psd.size,
(snapshot_dir / de.name).native(), sd.device_id, sd.inode_number, sd.size);
details.live += size;
(datadir / name).native(), psd.device_id, psd.inode_number, psd.size,
(snapshot_dir / name).native(), sd.device_id, sd.inode_number, sd.size);
co_return false;
}
} catch (std::system_error& e) {
co_return true;
} catch (std::system_error& e) {
if (e.code() != std::error_code(ENOENT, std::system_category())) {
throw;
}
co_return false;
}
};
// Check staging dir first, as files might be moved from there to the datadir concurrently to this check
if ((!staging_dir || !co_await exists_in_dir(*staging_dir / name)) &&
!co_await exists_in_dir(datadir / name)) {
details.live += size;
}
});
}
co_return details;
}

View File

@@ -82,7 +82,7 @@ seastar::future<> service::client_routes_service::set_client_routes_inner(const
auto guard = co_await _group0_client.start_operation(_abort_source, service::raft_timeout{});
utils::chunked_vector<canonical_mutation> cmuts;
for (auto& entry : route_entries) {
for (const auto& entry : route_entries) {
auto mut = co_await make_update_client_route_mutation(guard.write_timestamp(), entry);
cmuts.emplace_back(std::move(mut));
}
@@ -103,24 +103,24 @@ seastar::future<> service::client_routes_service::delete_client_routes_inner(con
co_await _group0_client.add_entry(std::move(cmd), std::move(guard), _abort_source);
}
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
return cr.with_retry([&] {
seastar::future<> service::client_routes_service::set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries) {
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) mutable -> future<> {
return cr.with_retry([&cr, route_entries = std::move(route_entries)] {
return cr.set_client_routes_inner(route_entries);
});
});
}
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
return cr.with_retry([&] {
seastar::future<> service::client_routes_service::delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys) {
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) mutable -> future<> {
return cr.with_retry([&cr, route_keys = std::move(route_keys)] {
return cr.delete_client_routes_inner(route_keys);
});
});
}
template <typename Func>
seastar::future<> service::client_routes_service::with_retry(Func&& func) const {
seastar::future<> service::client_routes_service::with_retry(Func func) const {
int retries = 10;
while (true) {
try {

View File

@@ -66,8 +66,8 @@ public:
future<mutation> make_remove_client_route_mutation(api::timestamp_type ts, const service::client_routes_service::client_route_key& key);
future<mutation> make_update_client_route_mutation(api::timestamp_type ts, const client_route_entry& entry);
future<std::vector<client_route_entry>> get_client_routes() const;
seastar::future<> set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
seastar::future<> delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys);
seastar::future<> set_client_routes(std::vector<service::client_routes_service::client_route_entry> route_entries);
seastar::future<> delete_client_routes(std::vector<service::client_routes_service::client_route_key> route_keys);
// notifications
@@ -76,7 +76,7 @@ private:
seastar::future<> set_client_routes_inner(const std::vector<service::client_routes_service::client_route_entry>& route_entries);
seastar::future<> delete_client_routes_inner(const std::vector<service::client_routes_service::client_route_key>& route_keys);
template <typename Func>
seastar::future<> with_retry(Func&& func) const;
seastar::future<> with_retry(Func func) const;
abort_source& _abort_source;
gms::feature_service& _feature_service;

View File

@@ -344,3 +344,17 @@ void service::client_state::update_per_service_level_params(qos::service_level_o
_workload_type = slo.workload;
}
future<> service::client_state::set_client_options(
client_options_cache_type& keys_and_values_cache,
const std::unordered_map<sstring, sstring>& client_options) {
for (const auto& [key, value] : client_options) {
auto cached_key = co_await keys_and_values_cache.get_or_load(key, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
auto cached_value = co_await keys_and_values_cache.get_or_load(value, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
_client_options.emplace_back(std::move(cached_key), std::move(cached_value));
}
}

View File

@@ -18,6 +18,7 @@
#include "auth/authenticated_user.hh"
#include "auth/authenticator.hh"
#include "auth/permission.hh"
#include "client_data.hh"
#include "transport/cql_protocol_extension.hh"
#include "service/qos/service_level_controller.hh"
@@ -102,7 +103,8 @@ private:
private volatile String keyspace;
#endif
std::optional<auth::authenticated_user> _user;
std::optional<sstring> _driver_name, _driver_version;
std::optional<client_options_cache_entry_type> _driver_name, _driver_version;
std::list<client_option_key_value_cached_entry> _client_options;
auth_state _auth_state = auth_state::UNINITIALIZED;
bool _control_connection = false;
@@ -151,18 +153,33 @@ public:
return _control_connection = true;
}
std::optional<sstring> get_driver_name() const {
std::optional<client_options_cache_entry_type> get_driver_name() const {
return _driver_name;
}
void set_driver_name(sstring driver_name) {
_driver_name = std::move(driver_name);
future<> set_driver_name(client_options_cache_type& keys_and_values_cache, const sstring& driver_name) {
_driver_name = co_await keys_and_values_cache.get_or_load(driver_name, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
}
std::optional<sstring> get_driver_version() const {
const auto& get_client_options() const {
return _client_options;
}
future<> set_client_options(
client_options_cache_type& keys_and_values_cache,
const std::unordered_map<sstring, sstring>& client_options);
std::optional<client_options_cache_entry_type> get_driver_version() const {
return _driver_version;
}
void set_driver_version(sstring driver_version) {
_driver_version = std::move(driver_version);
future<> set_driver_version(
client_options_cache_type& keys_and_values_cache,
const sstring& driver_version)
{
_driver_version = co_await keys_and_values_cache.get_or_load(driver_version, [] (const client_options_cache_key_type&) {
return make_ready_future<options_cache_value_type>(options_cache_value_type{});
});
}
client_state(external_tag,

View File

@@ -588,12 +588,16 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
}
break;
case node_state::decommissioning:
// A decommissioning node loses its tokens when topology moves to left_token_ring.
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
break;
}
[[fallthrough]];
case node_state::removing:
// A decommissioning or removing node loses its tokens when topology moves to left_token_ring.
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
if (rs.state == node_state::removing && !_feature_service.removenode_with_left_token_ring) {
on_internal_error(
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
}
break;
}
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
co_await process_normal_node(id, host_id, ip, rs);

View File

@@ -2672,6 +2672,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
while (utils::get_local_injector().enter("topology_coordinator_pause_after_streaming")) {
co_await sleep_abortable(std::chrono::milliseconds(10), _as);
}
const bool removenode_with_left_token_ring = _feature_service.removenode_with_left_token_ring;
auto node = get_node_to_work_on(std::move(guard));
bool barrier_failed = false;
// In this state writes goes to old and new replicas but reads start to be done from new replicas
@@ -2726,7 +2727,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
break;
case node_state::removing: {
co_await utils::get_local_injector().inject("delay_node_removal", utils::wait_for_message(std::chrono::minutes(5)));
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
if (!removenode_with_left_token_ring) {
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
}
}
[[fallthrough]];
case node_state::decommissioning: {
@@ -2734,7 +2737,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node_state next_state;
utils::chunked_vector<canonical_mutation> muts;
muts.reserve(2);
if (node.rs->state == node_state::decommissioning) {
if (removenode_with_left_token_ring || node.rs->state == node_state::decommissioning) {
// Both decommission and removenode go through left_token_ring state
// to ensure a global barrier is executed before the request is marked as done.
// This ensures all nodes have observed the topology change.
next_state = node.rs->state;
builder.set_transition_state(topology::transition_state::left_token_ring);
} else {
@@ -2809,6 +2815,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case topology::transition_state::left_token_ring: {
auto node = get_node_to_work_on(std::move(guard));
// Need to be captured as the node variable might become invalid (e.g. moved out) at particular points.
const auto node_rs_state = node.rs->state;
const bool is_removenode = node_rs_state == node_state::removing;
if (is_removenode && !_feature_service.removenode_with_left_token_ring) {
on_internal_error(
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
}
auto finish_left_token_ring_transition = [&](node_to_work_on& node) -> future<> {
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
@@ -2828,9 +2844,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
muts.push_back(builder.build());
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
auto str = node.rs->state == node_state::decommissioning
? ::format("finished decommissioning node {}", node.id)
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
auto str = std::invoke([&]() {
switch (node_rs_state) {
case node_state::decommissioning:
return ::format("finished decommissioning node {}", node.id);
case node_state::removing:
return ::format("finished removing node {}", node.id);
default:
return ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
}
});
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
};
@@ -2843,6 +2866,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
if (node.id == _raft.id()) {
// Removed node must be dead, so it shouldn't enter here (it can't coordinate its own removal).
if (is_removenode) {
on_internal_error(rtlogger, "removenode operation cannot be coordinated by the removed node itself");
}
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
rtlogger.info("coordinator is decommissioning; giving up leadership");
@@ -2856,8 +2884,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
auto excluded_nodes = get_excluded_nodes_for_topology_request(node);
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
// Removed node is added to ignored nodes, so it should be automatically excluded.
if (is_removenode && !excluded_nodes.contains(node.id)) {
on_internal_error(rtlogger, "removenode operation must have the removed node in excluded_nodes");
}
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), std::move(excluded_nodes)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
@@ -2874,15 +2907,17 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
if (barrier_failed) {
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
// If barrier above failed it means there may be unfinished writes to a decommissioned node,
// or some nodes might not have observed the new topology yet (one purpose of the barrier
// is to make sure all nodes observed the new topology before completing the request).
// Lets wait for the ring delay for those writes to complete and new topology to propagate
// before continuing.
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
// Make decommissioning node a non voter before reporting operation completion below.
// Otherwise the decommissioned node may see the completion and exit before it is removed from
// Make decommissioning/removed node a non voter before reporting operation completion below.
// Otherwise the node may see the completion and exit before it is removed from
// the config at which point the removal from the config will hang if the cluster had only two
// nodes before the decommission.
co_await _voter_handler.on_node_removed(node.id, _as);
@@ -2893,7 +2928,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state");
// Tell the node to shut down.
// For decommission/rollback: Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
// so there's no guarantee that it would learn about entering that state even if it was still
@@ -2902,15 +2937,19 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// There is the possibility that the node will never get the message
// and decommission will hang on that node.
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
//
// For removenode: The node is already dead, no need to send shutdown command.
auto node_id = node.id;
bool shutdown_failed = false;
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
if (!is_removenode) {
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
}
if (shutdown_failed) {
node = retake_node(co_await start_operation(), node_id);

View File

@@ -604,18 +604,14 @@ async def test_driver_service_creation_failure(manager: ManagerClient) -> None:
service_level_names = [sl.service_level for sl in service_levels]
assert "driver" not in service_level_names
def get_processed_tasks_for_group(metrics, group):
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
if res is None:
return 0
return res
@pytest.mark.asyncio
async def _verify_tasks_processed_metrics(manager, server, used_group, unused_group, func):
number_of_requests = 1000
number_of_requests = 3000
def get_processed_tasks_for_group(metrics, group):
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
logger.info(f"group={group}, tasks_processed={res}")
if res is None:
return 0
return res
@@ -627,8 +623,10 @@ async def _verify_tasks_processed_metrics(manager, server, used_group, unused_gr
await asyncio.gather(*[asyncio.to_thread(func) for i in range(number_of_requests)])
metrics = await manager.metrics.query(server.ip_addr)
assert get_processed_tasks_for_group(metrics, used_group) - initial_tasks_processed_by_used_group > number_of_requests
assert get_processed_tasks_for_group(metrics, unused_group) - initial_tasks_processed_by_unused_group < number_of_requests
tasks_processed_by_used_group = get_processed_tasks_for_group(metrics, used_group)
tasks_processed_by_unused_group = get_processed_tasks_for_group(metrics, unused_group)
assert tasks_processed_by_used_group - initial_tasks_processed_by_used_group > number_of_requests
assert tasks_processed_by_unused_group - initial_tasks_processed_by_unused_group < number_of_requests
@pytest.mark.asyncio
async def test_driver_service_level_not_used_for_user_queries(manager: ManagerClient) -> None:

View File

@@ -52,6 +52,18 @@ KNOWN_LOG_LEVELS = {
"OFF": "info",
}
# Captures the aggregate metric before the "[READ ..., WRITE ...]" block.
STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*')
# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block.
STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*')
# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block.
STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*')
# Splits a "key : value" line into key and value.
STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$')
class NodeError(Exception):
def __init__(self, msg: str, process: int | None = None):
@@ -528,6 +540,15 @@ class ScyllaNode:
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
def stress(self, stress_options: list[str], **kwargs):
"""
Run `cassandra-stress` against this node.
This method does not do any result parsing.
:param stress_options: List of options to pass to `cassandra-stress`.
:param kwargs: Additional arguments to pass to `subprocess.Popen()`.
:return: Named tuple with `stdout`, `stderr`, and `rc` (return code).
"""
cmd_args = ["cassandra-stress"] + stress_options
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
@@ -549,6 +570,73 @@ class ScyllaNode:
except KeyboardInterrupt:
pass
def _set_stress_val(self, key, val, res):
"""
Normalize a stress result string and populate aggregate/read/write metrics.
Removes comma-thousands separators from numbers, converts to float,
stores the aggregate metric under `key`.
If the value contains a "[READ ..., WRITE ...]" block, also stores the
read and write metrics under `key:read` and `key:write`.
:param key: The metric name
:param val: The metric value string
:param res: The dictionary to populate
"""
def parse_num(s):
return float(s.replace(',', ''))
if "[" in val:
p = STRESS_SUMMARY_PATTERN
m = p.match(val)
if m:
res[key] = parse_num(m.group(1))
p = STRESS_READ_PATTERN
m = p.match(val)
if m:
res[key + ":read"] = parse_num(m.group(1))
p = STRESS_WRITE_PATTERN
m = p.match(val)
if m:
res[key + ":write"] = parse_num(m.group(1))
else:
try:
res[key] = parse_num(val)
except ValueError:
res[key] = val
def stress_object(self, stress_options=None, ignore_errors=None, **kwargs):
"""
Run stress test and return results as a structured metrics dictionary.
Runs `stress()`, finds the `Results:` section in `stdout`, and then
processes each `key : value` line, putting it into a dictionary.
:param stress_options: List of stress options to pass to `stress()`.
:param ignore_errors: Deprecated (no effect).
:param kwargs: Additional arguments to pass to `stress()`.
:return: Dictionary of stress test results.
"""
if ignore_errors:
self.warning("passing `ignore_errors` to stress_object() is deprecated")
ret = self.stress(stress_options, **kwargs)
p = STRESS_KEY_VALUE_PATTERN
res = {}
start = False
for line in (s.strip() for s in ret.stdout.splitlines()):
if start:
m = p.match(line)
if m:
self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res)
else:
if line == 'Results:':
start = True
return res
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
cmd = ["flush"]
if ks:

View File

@@ -0,0 +1,690 @@
#
# Copyright (C) 2015-present The Apache Software Foundation
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import functools
import logging
import string
import threading
import time
from concurrent import futures
from typing import NamedTuple
import pytest
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import SimpleStatement, dict_factory
from concurrent.futures import ThreadPoolExecutor
from dtest_class import Tester, create_cf, create_ks, read_barrier
from tools.assertions import assert_all, assert_invalid
from tools.cluster_topology import generate_cluster_topology
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
logger = logging.getLogger(__name__)
class TestSchemaManagement(Tester):
def prepare(self, racks_num: int, has_config: bool = True):
cluster = self.cluster
cluster_topology = generate_cluster_topology(rack_num=racks_num)
if has_config:
config = {
"ring_delay_ms": 5000,
}
cluster.set_configuration_options(values=config)
cluster.populate(cluster_topology)
cluster.start(wait_other_notice=True)
return cluster
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema...")
create_ks(session, "ks", 3)
session.execute(
"""
CREATE TABLE users (
id int,
firstname text,
lastname text,
PRIMARY KEY (id)
);
"""
)
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
insert_statement.consistency_level = ConsistencyLevel.ALL
session.execute(insert_statement, [0])
logger.debug("Altering schema")
session.execute("ALTER TABLE users WITH comment = 'updated'")
logger.debug("Restarting node2")
node2.stop(gently=True)
node2.start(wait_for_binary_proto=True)
logger.debug("Restarting node3")
node3.stop(gently=True)
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
n_partitions = 20
for i in range(n_partitions):
session.execute(insert_statement, [i])
rows = session.execute("SELECT * FROM users")
res = sorted(rows)
assert len(res) == n_partitions
for i in range(n_partitions):
expected = [i, "A", "B"]
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
def test_dropping_keyspace_with_many_columns(self):
"""
Exploits https://github.com/scylladb/scylla/issues/1484
"""
cluster = self.prepare(racks_num=1, has_config=False)
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
for i in range(8):
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
session.execute("drop keyspace testxyz")
for node in cluster.nodelist():
s = self.patient_cql_connection(node)
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
s.execute("drop keyspace testxyz")
def test_multiple_create_table_in_parallel(self):
"""
Run multiple create table statements via different nodes
1. Create a cluster of 3 nodes
2. Run create table with different table names in parallel - check all complete
3. Run create table with the same table name in parallel - check if they complete
"""
logger.debug("1. Create a cluster of 3 nodes")
nodes_count = 3
cluster = self.prepare(racks_num=nodes_count)
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
ks = "ks"
create_ks(sessions[0], ks, nodes_count)
def create_table(session, table_name):
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
logger.debug(f"create_statement {create_statement}")
session.execute(create_statement)
logger.debug("2. Run create table with different table names in parallel - check all complete")
step2_tables = [f"t{i}" for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
list(executor.map(create_table, sessions, step2_tables))
for table in step2_tables:
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
step3_table = "test"
step3_tables = [step3_table for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
for res_future in res_futures:
try:
res_future.result()
except AlreadyExists as e:
logger.info(f"expected cassandra.AlreadyExists error {e}")
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
def test_alter_table_in_parallel_to_read_and_write(self, case):
"""
Create a table and write into while altering the table
1. Create a cluster of 3 nodes and populate a table
2. Run write/read/read_and_write" statement in a loop
3. Alter table while inserts are running
"""
logger.debug("1. Create a cluster of 3 nodes and populate a table")
cluster = self.prepare(racks_num=3)
col_number = 20
[node1, node2, node3] = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
def run_stress(stress_type, col=col_number - 2):
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
logger.debug("Populate")
run_stress("write", col_number)
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. Run {case} statement in a loop")
statement_future = executor.submit(functools.partial(run_stress, case))
logger.debug(f"let's {case} statement work some time")
time.sleep(2)
logger.debug("3. Alter table while inserts are running")
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
logger.debug(f"alter_statement {alter_statement}")
alter_result = session.execute(alter_statement)
logger.debug(alter_result.all())
logger.debug(f"wait till {case} statement finished")
statement_future.result()
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
logger.debug("read and check data")
run_stress("read")
@pytest.mark.skip("unimplemented")
def commitlog_replays_after_schema_change(self):
"""
Commitlog can be replayed even though schema has been changed
1. Create a table and insert data
2. Alter table
3. Kill node
4. Boot node and verify that commitlog have been replayed and that all data is restored
"""
raise NotImplementedError
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
def test_update_schema_while_node_is_killed(self, case):
"""
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
def create_table_case():
try:
logger.debug("Creating table")
create_c1c2_table(session)
logger.debug("Populating")
insert_c1c2(session, n=10)
except AlreadyExists:
# the CQL command can be called multiple time case of retries
pass
def alter_table_case():
try:
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Invalid column name c3" in str(exc)
def drop_table_case():
try:
session.execute("DROP TABLE cf;", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Cannot drop non existing table" in str(exc)
logger.debug("Creating keyspace")
create_ks(session, "ks", 3)
if case != "create_table":
create_table_case()
case_map = {
"create_table": create_table_case,
"alter_table": alter_table_case,
"drop_table": drop_table_case,
}
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. kill node during {case}")
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
case_map[case]()
kill_node_future.result()
logger.debug("3. Start the stopped node2")
node2.start(wait_for_binary_proto=True)
session = self.patient_exclusive_cql_connection(node2)
read_barrier(session)
def create_or_alter_table_expected_result(col_mun):
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
expected_case_result_map = {
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
}
logger.debug("verify that commitlog has been replayed and that all data is restored")
expected_case_result_map[case]()
@pytest.mark.parametrize("is_gently_stop", [True, False])
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
"""
Nodes rejoining the cluster synch on schema changes
1. Create a cluster and insert data
2. Stop a node
3. Alter table
4. Insert additional data
5. Start the stopped node
6. Verify the stopped node synchs on the updated schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
create_c1c2_table(session)
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
logger.debug("Populating")
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
logger.debug("2 Stop a node1")
node1.stop(gently=is_gently_stop, wait_other_notice=True)
logger.debug("3 Alter table")
session = self.patient_cql_connection(node2)
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
logger.debug("4 Insert additional data")
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
logger.debug("5. Start the stopped node1")
node1.start(wait_for_binary_proto=True)
logger.debug("6. Verify the stopped node synchs on the updated schema")
session = self.patient_exclusive_cql_connection(node1)
read_barrier(session)
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
expected = [["test", "test", "test", "test"]]
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
def test_reads_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
def test_writes_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True, wait_other_notice=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
expected = [[2, "2"]]
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
class TestLargePartitionAlterSchema(Tester):
# Issue scylladb/scylla: #5135:
#
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
# partition entry update
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
# entry being preempted.
#
# The scenario in which the problem could have happened has to involve:
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
# - appending writes to the partition (not overwrites)
# - scans of the partition
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
# column lands in the middle (in alphabetical order) of the old column set.
#
# Memtable flush has to happen after a schema alter concurrently with a read.
#
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
PARTITIONS = 50
STRING_VALUE = string.ascii_lowercase
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
if not self.cluster.nodelist():
self.cluster.populate(cluster_topology)
self.cluster.start(wait_other_notice=True)
node1 = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node=node1)
self.create_schema(session=session, rf=rf)
return session
def create_schema(self, session, rf):
logger.debug("Creating schema")
create_ks(session=session, name="ks", rf=rf)
session.execute(
"""
CREATE TABLE lp_table (
pk int,
ck1 int,
val1 text,
val2 text,
PRIMARY KEY (pk, ck1)
);
"""
)
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
ck = ck_start
def _populate_loop():
nonlocal ck
while True:
if stop_populating is not None and stop_populating.is_set():
return
if ck_end is not None and ck >= ck_end:
return
for pk in range(self.PARTITIONS):
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
data.append(row)
yield tuple(row)
ck += 1
records_written = ck - ck_start
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
parameters = _populate_loop()
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
return data
def read(self, session, ck_max, stop_reading: threading.Event = None):
def _read_loop():
while True:
for ck in range(ck_max):
for pk in range(self.PARTITIONS):
if stop_reading is not None and stop_reading.is_set():
return
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
if stop_reading is None:
return
logger.debug(f"Start reading..")
_read_loop()
logger.debug(f"Finish reading..")
def add_column(self, session, column_name, column_type):
logger.debug(f"Add {column_name} column")
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
def drop_column(self, session, column_name):
logger.debug(f"Drop {column_name} column")
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
def test_large_partition_with_add_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.add_column(session, "new_clmn", "int")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
for future in futures.as_completed(threads, timeout=timeout):
try:
future.result()
except Exception as exc: # noqa: BLE001
pytest.fail(f"Generated an exception: {exc}")
# Add 'null' values for the new column `new_clmn` in the expected data
for i, _ in enumerate(data):
data[i].append(None)
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
def test_large_partition_with_drop_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.drop_column(session=session, column_name="val1")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
result = []
for future in futures.as_completed(threads, timeout=timeout):
try:
result.append(future.result())
except Exception as exc: # noqa: BLE001
# "Unknown identifier val1" is expected error
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
pytest.fail(f"Generated an exception: {exc}")
class HistoryVerifier:
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
"""
Initialize parameters for further verification of schema history.
:param table_name: table thats we change it's schema and verify schema history accordingly.
"""
self.table_name = table_name
self.keyspace_name = keyspace_name
self.versions = []
self.versions_dict = {}
self.query = ""
def verify(self, session, expected_current_diff, expected_prev_diff, query):
"""
Verify current schema history entry by comparing to previous schema entry.
:param session: python cql session
:param expected_current_diff: difference of current schema from previous schema
:param expected_prev_diff: difference of previous schema from current schema
:param query: The query that created new schema
"""
def get_table_id(session, keyspace_name, table_name):
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
assert table_name, f"Input table_name should have value, table_name={table_name}"
query = "select keyspace_name,table_name,id from system_schema.tables"
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
current_rows = session.execute(query).current_rows
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
res = current_rows[0]
return res["id"]
def read_schema_history_table(session, cf_id):
"""
read system.scylla_table_schema_history and verify current version diff from previous vesion
:param session: python cql session
:param cf_id: uuid of the table we changed it's schema
"""
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
res = session.execute(query).current_rows
new_versions = list({
entry["schema_version"]
for entry in res
if str(entry["schema_version"]) not in self.versions
})
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
assert len(new_versions) == 1, msg
current_version = str(new_versions[0])
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
columns_list = (
{"column_name": entry["column_name"], "type": entry["type"]}
for entry in res
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
)
self.versions_dict[current_version] = {}
for item in columns_list:
self.versions_dict[current_version][item["column_name"]] = item["type"]
self.versions.append(current_version)
if len(self.versions) > 1:
current_id = self.versions[-1]
previous_id = self.versions[-2]
set_current = set(self.versions_dict[current_id].items())
set_previous = set(self.versions_dict[previous_id].items())
current_diff = set_current - set_previous
previous_diff = set_previous - set_current
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
if current_diff != expected_current_diff:
logger.debug(msg1 + msg2)
assert current_diff == expected_current_diff, msg1 + msg2
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
assert previous_diff == expected_prev_diff, msg1 + msg2
self.query = query
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
read_schema_history_table(session, cf_id)
class DDL(NamedTuple):
ddl_command: str
expected_current_diff: set | None
expected_prev_diff: set | None
class TestSchemaHistory(Tester):
def prepare(self):
cluster = self.cluster
# in case support tablets and rf-rack-valid-keyspaces
# create cluster with 3 racks with 1 node in each rack
cluster_topology = generate_cluster_topology(rack_num=3)
rf = 3
cluster.populate(cluster_topology).start(wait_other_notice=True)
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
create_ks(self.session, "lwt_load_ks", rf)
def test_schema_history_alter_table(self):
"""test schema history changes following alter table cql commands"""
self.prepare()
verifier = HistoryVerifier(table_name="table2")
queries_and_expected_diffs = [
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
]
for ddl in queries_and_expected_diffs:
self.session.execute(ddl.ddl_command)
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)

View File

@@ -218,6 +218,18 @@ def assert_row_count_in_select_less(
assert count < max_rows_expected, f'Expected a row count < of {max_rows_expected} in query "{query}", but got {count}'
def assert_length_equal(object_with_length, expected_length):
"""
Assert an object has a specific length.
@param object_with_length The object whose length will be checked
@param expected_length The expected length of the object
Examples:
assert_length_equal(res, nb_counter)
"""
assert len(object_with_length) == expected_length, f"Expected {object_with_length} to have length {expected_length}, but instead is of length {len(object_with_length)}"
def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
"""
asserts that the contents of the two provided lists are equal

View File

@@ -14,6 +14,7 @@ from cassandra.query import SimpleStatement
from cassandra.concurrent import execute_concurrent_with_args
from test.cluster.dtest.dtest_class import create_cf
from test.cluster.dtest.tools import assertions
logger = logging.getLogger(__name__)
@@ -51,6 +52,27 @@ def insert_c1c2( # noqa: PLR0913
execute_concurrent_with_args(session, statement, [[f"k{k}"] for k in keys], concurrency=concurrency)
def query_c1c2( # noqa: PLR0913
session,
key,
consistency=ConsistencyLevel.QUORUM,
tolerate_missing=False,
must_be_missing=False,
c1_value="value1",
c2_value="value2",
ks="ks",
cf="cf",
):
query = SimpleStatement(f"SELECT c1, c2 FROM {ks}.{cf} WHERE key='k{key}'", consistency_level=consistency)
rows = list(session.execute(query))
if not tolerate_missing and not must_be_missing:
assertions.assert_length_equal(rows, 1)
res = rows[0]
assert len(res) == 2 and res[0] == c1_value and res[1] == c2_value, res
if must_be_missing:
assertions.assert_length_equal(rows, 0)
def rows_to_list(rows):
new_list = [list(row) for row in rows]
return new_list

View File

@@ -131,8 +131,9 @@ async def test_backup_move(manager: ManagerClient, object_storage, move_files):
@pytest.mark.asyncio
async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_storage):
'''backup should fail if the destination bucket does not exist'''
@pytest.mark.parametrize("ne_parameter", [ "endpoint", "bucket", "snapshot" ])
async def test_backup_with_non_existing_parameters(manager: ManagerClient, object_storage, ne_parameter):
'''backup should fail if either of the parameters does not exist'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
@@ -142,7 +143,8 @@ async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_stor
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
server = await manager.server_add(config=cfg, cmdline=cmd)
ks, cf = await prepare_snapshot_for_backup(manager, server)
backup_snap_name = 'backup'
ks, cf = await prepare_snapshot_for_backup(manager, server, snap_name = backup_snap_name)
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
@@ -150,39 +152,18 @@ async def test_backup_to_non_existent_bucket(manager: ManagerClient, object_stor
assert len(files) > 0
prefix = f'{cf}/backup'
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', object_storage.address, "non-existant-bucket", prefix)
tid = await manager.api.backup(server.ip_addr, ks, cf,
backup_snap_name if ne_parameter != 'snapshot' else 'no-such-snapshot',
object_storage.address if ne_parameter != 'endpoint' else 'no-such-endpoint',
object_storage.bucket_name if ne_parameter != 'bucket' else 'no-such-bucket',
prefix)
status = await manager.api.wait_task(server.ip_addr, tid)
assert status is not None
assert status['state'] == 'failed'
#assert 'S3 request failed. Code: 15. Reason: Access Denied.' in status['error']
if ne_parameter == 'endpoint':
assert status['error'] == 'std::invalid_argument (endpoint no-such-endpoint not found)'
@pytest.mark.asyncio
async def test_backup_to_non_existent_endpoint(manager: ManagerClient, object_storage):
'''backup should fail if the endpoint is invalid/inaccessible'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options'],
'task_ttl_in_seconds': 300
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
server = await manager.server_add(config=cfg, cmdline=cmd)
ks, cf = await prepare_snapshot_for_backup(manager, server)
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
assert len(files) > 0
prefix = f'{cf}/backup'
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', "does_not_exist", object_storage.bucket_name, prefix)
status = await manager.api.wait_task(server.ip_addr, tid)
assert status is not None
assert status['state'] == 'failed'
assert status['error'] == 'std::invalid_argument (endpoint does_not_exist not found)'
async def do_test_backup_abort(manager: ManagerClient, object_storage,
breakpoint_name, min_files, max_files = None):
'''helper for backup abort testing'''
@@ -236,38 +217,6 @@ async def do_test_backup_abort(manager: ManagerClient, object_storage,
assert max_files is None or uploaded_count < max_files
@pytest.mark.asyncio
async def test_backup_to_non_existent_snapshot(manager: ManagerClient, object_storage):
'''backup should fail if the snapshot does not exist'''
objconf = object_storage.create_endpoint_conf()
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options'],
'task_ttl_in_seconds': 300
}
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace:api=info']
server = await manager.server_add(config=cfg, cmdline=cmd)
ks, cf = await prepare_snapshot_for_backup(manager, server)
prefix = f'{cf}/backup'
tid = await manager.api.backup(server.ip_addr, ks, cf, 'nonexistent-snapshot',
object_storage.address, object_storage.bucket_name, prefix)
# The task is expected to fail immediately due to invalid snapshot name.
# However, since internal implementation details may change, we'll wait for
# task completion if immediate failure doesn't occur.
actual_state = None
for status_api in [manager.api.get_task_status,
manager.api.wait_task]:
status = await status_api(server.ip_addr, tid)
assert status is not None
actual_state = status['state']
if actual_state == 'failed':
break
else:
assert actual_state == 'failed'
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_backup_is_abortable(manager: ManagerClient, object_storage):

View File

@@ -181,11 +181,14 @@ async def test_random_failures(manager: ManagerClient,
LOGGER.info("Found following message in the coordinator's log:\n\t%s", matches[-1][0])
await manager.server_stop(server_id=s_info.server_id)
BANNED_NOTIFICATION = "received notification of being banned from the cluster from"
STARTUP_FAILED_PATTERN = f"init - Startup failed:|{BANNED_NOTIFICATION}"
if s_info in await manager.running_servers():
LOGGER.info("Wait until the new node initialization completes or fails.")
await server_log.wait_for("init - (Startup failed:|Scylla version .* initialization completed)", timeout=120)
await server_log.wait_for(f"init - (Startup failed:|Scylla version .* initialization completed)|{BANNED_NOTIFICATION}", timeout=120)
if await server_log.grep("init - Startup failed:"):
if await server_log.grep(STARTUP_FAILED_PATTERN):
LOGGER.info("Check that the new node is dead.")
expected_statuses = [psutil.STATUS_DEAD]
else:
@@ -216,7 +219,7 @@ async def test_random_failures(manager: ManagerClient,
else:
if s_info in await manager.running_servers():
LOGGER.info("The new node is dead. Check if it failed to startup.")
assert await server_log.grep("init - Startup failed:")
assert await server_log.grep(STARTUP_FAILED_PATTERN)
await manager.server_stop(server_id=s_info.server_id) # remove the node from the list of running servers
LOGGER.info("Try to remove the dead new node from the cluster.")

View File

@@ -26,6 +26,7 @@ skip_in_release:
- test_raft_cluster_features
- test_cluster_features
- dtest/limits_test
- dtest/schema_management_test
skip_in_debug:
- test_shutdown_hang
- test_replace

View File

@@ -146,13 +146,13 @@ async def test_joining_old_node_fails(manager: ManagerClient) -> None:
# Try to add a node that doesn't support the feature - should fail
new_server_info = await manager.server_add(start=False, property_file=servers[0].property_file())
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
# Try to replace with a node that doesn't support the feature - should fail
await manager.server_stop_gracefully(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id=servers[0].server_id, reuse_ip_addr=False, use_host_id=False)
new_server_info = await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[0].property_file())
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
@pytest.mark.asyncio

View File

@@ -131,7 +131,7 @@ async def test_major_compaction_flush_all_tables(manager: ManagerClient, compact
await manager.api.keyspace_compaction(server.ip_addr, ks, cf)
flush_log = await log.grep("Forcing new commitlog segment and flushing all tables", from_mark=mark)
assert len(flush_log) == (1 if expect_all_table_flush else 0)
assert len(flush_log) == (2 if expect_all_table_flush else 0)
# all tables should be flushed the first time unless compaction_flush_all_tables_before_major_seconds == 0
await check_all_table_flush_in_major_compaction(compaction_flush_all_tables_before_major_seconds != 0)

View File

@@ -67,7 +67,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
logger.info(f"Removing node {servers[0]} using {servers[1]}")
await manager.remove_node(servers[1].server_id, servers[0].server_id)
await check_token_ring_and_group0_consistency(manager)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
servers = servers[1:]
logger.info("Checking results of the background writes")

View File

@@ -74,7 +74,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
logger.info(f"Removing node {servers[0]} using {servers[1]}")
await manager.remove_node(servers[1].server_id, servers[0].server_id)
await check_token_ring_and_group0_consistency(manager)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
servers = servers[1:]
logger.info("Checking results of the background writes")

View File

@@ -74,7 +74,6 @@ def test_cast_int_literal_with_type_hint_to_blob(cql, table1, scylla_only):
# An int can always be converted to a valid blob, but blobs might have wrong amount of bytes
# and can't be converted to a valid int.
def test_cast_blob_literal_to_int(cql, table1):
pk = unique_key_int()
with pytest.raises(InvalidRequest, match='HEX'):
cql.execute(f"INSERT INTO {table1} (pk) VALUES (0xBAAAAAAD)")
with pytest.raises(InvalidRequest, match='blob'):

View File

@@ -61,7 +61,7 @@ def test_select_default_order(cql, table_int_desc):
def test_multi_column_relation_desc(cql, table2):
k = unique_key_int()
stmt = cql.prepare(f'INSERT INTO {table2} (p, c1, c2) VALUES (?, ?, ?)')
cql.execute(stmt, [0, 1, 0])
cql.execute(stmt, [0, 1, 1])
cql.execute(stmt, [0, 1, 2])
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = 0 AND (c1, c2) >= (1, 1)'))
cql.execute(stmt, [k, 1, 0])
cql.execute(stmt, [k, 1, 1])
cql.execute(stmt, [k, 1, 2])
assert [(1, 2), (1, 1)] == list(cql.execute(f'SELECT c1,c2 FROM {table2} WHERE p = {k} AND (c1, c2) >= (1, 1)'))

View File

@@ -352,7 +352,7 @@ def test_storage_options_alter_type(cql, scylla_only):
ksdef_local = "WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : '1' } " \
"AND STORAGE = { 'type' : 'S3', 'bucket' : '/b1', 'endpoint': 'localhost'}"
with pytest.raises(InvalidRequest):
res = cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
cql.execute(f"ALTER KEYSPACE {keyspace} {ksdef_local}")
# Reproducer for scylladb#14139
def test_alter_keyspace_preserves_udt(cql):

View File

@@ -171,7 +171,6 @@ def test_grant_revoke_data_permissions(cql, test_keyspace):
# Test that permissions for user-defined functions are serialized in a Cassandra-compatible way
def test_udf_permissions_serialization(cql):
schema = "a int primary key"
user = "cassandra"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace, new_user(cql) as user:
with new_test_table(cql, keyspace, schema) as table:
# Creating a bilingual function makes this test case work for both Scylla and Cassandra
@@ -247,7 +246,6 @@ def test_udf_permissions_quoted_names(cassandra_bug, cql):
# permissions. Cassandra erroneously reports the unrelated missing permissions.
# Reported to Cassandra as CASSANDRA-19005.
def test_drop_udf_with_same_name(cql, cassandra_bug):
schema = "a int primary key"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
body1_lua = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE lua AS 'return 42;'"
body1_java = "(i int) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS 'return 42L;'"
@@ -288,7 +286,6 @@ def test_drop_udf_with_same_name(cql, cassandra_bug):
# Tests for ALTER are separate, because they are qualified as cassandra_bug
def test_grant_revoke_udf_permissions(cql):
schema = "a int primary key, b list<int>"
user = "cassandra"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'replication_factor': 1 }") as keyspace:
with new_test_table(cql, keyspace, schema) as table:
fun_body_lua = "(i int, l list<int>) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"
@@ -335,7 +332,6 @@ def test_grant_revoke_udf_permissions(cql):
# and yet it's not enforced
def test_grant_revoke_alter_udf_permissions(cassandra_bug, cql):
schema = "a int primary key"
user = "cassandra"
with new_test_keyspace(cql, "WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }") as keyspace:
with new_test_table(cql, keyspace, schema) as table:
fun_body_lua = "(i int) CALLED ON NULL INPUT RETURNS int LANGUAGE lua AS 'return 42;'"

View File

@@ -90,8 +90,6 @@ def test_attached_service_level(scylla_only, cql):
assert res_one.role == cql.cluster.auth_provider.username and res_one.service_level == sl
def test_list_effective_service_level(scylla_only, cql):
sl1 = "sl1"
sl2 = "sl2"
timeout = "10s"
workload_type = "batch"
@@ -120,8 +118,6 @@ def test_list_effective_service_level(scylla_only, cql):
assert row.value == "batch"
def test_list_effective_service_level_shares(scylla_only, cql):
sl1 = "sl1"
sl2 = "sl2"
shares1 = 500
shares2 = 200
@@ -184,8 +180,6 @@ def test_default_shares_in_listings(scylla_only, cql):
# and that the messages Scylla returns are informative.
def test_manipulating_default_service_level(cql, scylla_only):
default_sl = "default"
# Service levels are case-sensitive (if used with quotation marks).
fake_default_sl = '"DeFaUlT"'
with new_user(cql) as role:
# Creation.

View File

@@ -76,6 +76,7 @@ def test_clients(scylla_only, cql):
'ssl_enabled',
'ssl_protocol',
'username',
'client_options',
])
cls = list(cql.execute(f"SELECT {columns} FROM system.clients"))
# There must be at least one connection - the one that sent this SELECT
@@ -84,6 +85,9 @@ def test_clients(scylla_only, cql):
for cl in cls:
assert(cl[0] == '127.0.0.1')
assert(cl[2] == 'cql')
client_options = cl[13]
assert(client_options.get('DRIVER_NAME') == cl[4])
assert(client_options.get('DRIVER_VERSION') == cl[5])
# We only want to check that the table exists with the listed columns, to assert
# backwards compatibility.

View File

@@ -23,7 +23,7 @@ def scylla_with_wasm_only(scylla_only, cql, test_keyspace):
try:
f42 = unique_name()
f42_body = f'(module(func ${f42} (param $n i64) (result i64)(return i64.const 42))(export "{f42}" (func ${f42})))'
res = cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
cql.execute(f"DROP FUNCTION {test_keyspace}.{f42}")
except NoHostAvailable as err:
if "not enabled" in str(err):
@@ -373,8 +373,7 @@ def test_pow(cql, test_keyspace, table1, scylla_with_wasm_only):
assert len(res) == 1 and res[0].result == 177147
# Test that only compilable input is accepted
def test_compilable(cql, test_keyspace, table1, scylla_with_wasm_only):
table = table1
def test_compilable(cql, test_keyspace, scylla_with_wasm_only):
wrong_source = f"""
Dear wasmtime compiler, please return a function which returns its float argument increased by 1
"""
@@ -384,8 +383,7 @@ Dear wasmtime compiler, please return a function which returns its float argumen
# Test that not exporting a function with matching name
# results in an error
def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
table = table1
def test_not_exported(cql, test_keyspace, scylla_with_wasm_only):
wrong_source = f"""
(module
(type (;0;) (func (param f32) (result f32)))
@@ -403,8 +401,7 @@ def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
f"AS '{wrong_source}'")
# Test that trying to use something that is exported, but is not a function, won't work
def test_not_a_function(cql, test_keyspace, table1, scylla_with_wasm_only):
table = table1
def test_not_a_function(cql, test_keyspace, scylla_with_wasm_only):
wrong_source = f"""
(module
(type (;0;) (func (param f32) (result f32)))

View File

@@ -49,6 +49,9 @@ RUN_ID = pytest.StashKey[int]()
logger = logging.getLogger(__name__)
# Store pytest config globally so we can access it in hooks that only receive report
_pytest_config: pytest.Config | None = None
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption('--mode', choices=ALL_MODES, action="append", dest="modes",
@@ -184,6 +187,52 @@ def pytest_sessionstart(session: pytest.Session) -> None:
)
@pytest.hookimpl(trylast=True)
def pytest_runtest_logreport(report):
"""Add custom XML attributes to JUnit testcase elements.
This hook wraps the node_reporter's to_xml method to add custom attributes
when the XML element is created. This approach works with pytest-xdist because
it modifies the XML element directly when it's generated, rather than trying
to modify attrs before finalize() is called.
Attributes added:
- function_path: The function path of the test case (excluding parameters).
Uses trylast=True to run after LogXML's hook has created the node_reporter.
"""
from _pytest.junitxml import xml_key
# Only process call phase
if report.when != "call":
return
# Get the XML reporter
config = _pytest_config
if config is None:
return
xml = config.stash.get(xml_key, None)
if xml is None:
return
node_reporter = xml.node_reporter(report)
nodeid = report.nodeid
function_path = f'test/{nodeid.rsplit('.', 2)[0].rsplit('[', 1)[0]}'
# Wrap the to_xml method to add custom attributes to the element
original_to_xml = node_reporter.to_xml
def custom_to_xml():
"""Wrapper that adds custom attributes to the testcase element."""
element = original_to_xml()
element.set("function_path", function_path)
return element
node_reporter.to_xml = custom_to_xml
def pytest_sessionfinish(session: pytest.Session) -> None:
if not session.config.getoption("--test-py-init"):
return
@@ -196,6 +245,9 @@ def pytest_sessionfinish(session: pytest.Session) -> None:
def pytest_configure(config: pytest.Config) -> None:
global _pytest_config
_pytest_config = config
config.build_modes = get_modes_to_run(config)
if testpy_run_id := config.getoption("--run_id"):

View File

@@ -243,7 +243,7 @@ async def get_scylla_2025_1_executable(build_mode: str) -> str:
if not unpacked_marker.exists():
if not downloaded_marker.exists():
archive_path.unlink(missing_ok=True)
await run_process(["curl", "--silent", "--show-error", "--output", archive_path, url])
await run_process(["curl", "--retry", "10", "--fail", "--silent", "--show-error", "--output", archive_path, url])
downloaded_marker.touch()
shutil.rmtree(unpack_dir, ignore_errors=True)
unpack_dir.mkdir(exist_ok=True, parents=True)

View File

@@ -40,16 +40,8 @@ struct test_pinger: public direct_failure_detector::pinger {
co_return;
}
promise<> p;
auto f = p.get_future();
auto sub = as.subscribe([&, p = std::move(p)] () mutable noexcept {
p.set_value();
});
if (!sub) {
throw abort_requested_exception{};
}
co_await std::move(f);
throw abort_requested_exception{};
// Simulate a blocking ping that only returns when aborted.
co_await sleep_abortable(std::chrono::hours(1), as);
}, as);
co_return ret;
}

View File

@@ -2930,6 +2930,18 @@ private:
static constexpr elem_t magic = 54313;
static void check_digest_value(elem_t d) {
if (d < 0 || d >= magic) {
on_fatal_internal_error(tlogger, fmt::format("Digest value out of range: {}", d));
}
}
static void validate_digest_value(elem_t d_new, elem_t d_old, elem_t x) {
if (d_new < 0 || d_new >= magic) {
on_fatal_internal_error(tlogger, fmt::format("Digest value invalid after appending/removing element: d_new {}, d_old {}, x {}", d_new, d_old, x));
}
}
public:
append_seq(std::vector<elem_t> v) : _seq{make_lw_shared<std::vector<elem_t>>(std::move(v))}, _end{_seq->size()}, _digest{0} {
for (auto x : *_seq) {
@@ -2938,20 +2950,26 @@ public:
}
static elem_t digest_append(elem_t d, elem_t x) {
BOOST_REQUIRE_LE(0, d);
BOOST_REQUIRE_LT(d, magic);
check_digest_value(d);
auto y = (d + x) % magic;
SCYLLA_ASSERT(digest_remove(y, x) == d);
validate_digest_value(y, d, x);
return y;
}
static elem_t digest_remove(elem_t d, elem_t x) {
BOOST_REQUIRE_LE(0, d);
BOOST_REQUIRE_LT(d, magic);
check_digest_value(d);
auto y = (d - x) % magic;
return y < 0 ? y + magic : y;
if (y < 0) {
y += magic;
}
validate_digest_value(y, d, x);
return y;
}
elem_t digest() const {

View File

@@ -28,7 +28,7 @@ def write_generator(table, size_in_kb: int):
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
class random_content_file:
class RandomContentFile:
def __init__(self, path: str, size_in_bytes: int):
path = pathlib.Path(path)
self.filename = path if path.is_file() else path / str(uuid.uuid4())
@@ -64,11 +64,11 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
for server in servers:
await manager.api.disable_autocompaction(server.ip_addr, ks)
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", " WITH speculative_retry = 'NONE'") as cf:
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -95,7 +95,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
try:
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=hosts[0], execution_profile=cl_one_profile).result()
except Exception:
pass
else:
@@ -111,7 +111,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
@pytest.mark.asyncio
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
cmdline = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
@@ -134,7 +134,7 @@ async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Ca
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
@@ -175,7 +175,7 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
@@ -198,7 +198,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
@@ -206,7 +206,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
s2_mark = await s2_log.mark()
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
await s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
assert await s1_log.grep(f"compaction .* Split {cf}", from_mark=s1_mark) == []
@@ -236,7 +236,7 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
@@ -315,7 +315,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
mark = await log.mark()
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -371,7 +371,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
@@ -382,7 +382,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
coord_log = await manager.server_open_log(coord_serv.server_id)
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
await manager.server_restart(servers[0].server_id, wait_others=2)

View File

@@ -198,47 +198,62 @@ SEASTAR_TEST_CASE(vector_store_client_test_dns_resolving_repeated) {
auto cfg = config();
cfg.vector_store_primary_uri.set("http://good.authority.here:6080");
auto vs = vector_store_client{cfg};
auto count = 0;
bool fail_dns_resolution = true;
auto as = abort_source_timeout();
auto address = inet_address("127.0.0.1");
configure(vs)
.with_dns_refresh_interval(milliseconds(10))
.with_wait_for_client_timeout(milliseconds(20))
.with_dns_resolver([&count](auto const& host) -> future<std::optional<inet_address>> {
.with_dns_resolver([&](auto const& host) -> future<std::optional<inet_address>> {
BOOST_CHECK_EQUAL(host, "good.authority.here");
count++;
if (count % 3 != 0) {
if (fail_dns_resolution) {
co_return std::nullopt;
}
co_return inet_address(format("127.0.0.{}", count));
co_return address;
});
vs.start_background_tasks();
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
BOOST_CHECK_EQUAL(count, 3);
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.3");
vector_store_client_tester::trigger_dns_resolver(vs);
// Wait for the DNS resolution to fail
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.empty();
}));
fail_dns_resolution = false;
// Wait for the DNS resolution to succeed
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
BOOST_CHECK_EQUAL(count, 6);
addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
auto addrs1 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs1.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs1[0]), "127.0.0.1");
BOOST_REQUIRE_EQUAL(addrs.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs[0]), "127.0.0.6");
fail_dns_resolution = true;
// Trigger DNS resolver to check for address changes
// Resolver will not re-check automatically after successful resolution
vector_store_client_tester::trigger_dns_resolver(vs);
// Wait for the DNS resolution to fail again
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.empty();
}));
// Resolve to a different address
address = inet_address("127.0.0.2");
fail_dns_resolution = false;
// Wait for the DNS resolution to succeed
BOOST_CHECK(co_await repeat_until(seconds(1), [&vs, &as]() -> future<bool> {
auto addrs = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
co_return addrs.size() == 1;
}));
auto addrs2 = co_await vector_store_client_tester::resolve_hostname(vs, as.reset());
BOOST_REQUIRE_EQUAL(addrs2.size(), 1);
BOOST_CHECK_EQUAL(print_addr(addrs2[0]), "127.0.0.2");
co_await vs.stop();
}

View File

@@ -353,7 +353,7 @@ future<> controller::set_cql_ready(bool ready) {
return _gossiper.local().add_local_application_state(gms::application_state::RPC_READY, gms::versioned_value::cql_ready(ready));
}
future<utils::chunked_vector<client_data>> controller::get_client_data() {
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> controller::get_client_data() {
return _server ? _server->local().get_client_data() : protocol_server::get_client_data();
}

View File

@@ -77,7 +77,7 @@ public:
virtual future<> start_server() override;
virtual future<> stop_server() override;
virtual future<> request_stop_server() override;
virtual future<utils::chunked_vector<client_data>> get_client_data() override;
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() override;
future<> update_connections_scheduling_group();
future<std::vector<connection_service_level_params>> get_connections_service_level_params();

View File

@@ -10,6 +10,7 @@
#include "seastarx.hh"
#include <seastar/core/future.hh>
#include <seastar/core/sharded.hh>
#include <seastar/net/socket_defs.hh>
#include <vector>
#include "client_data.hh"
@@ -43,8 +44,8 @@ public:
/// This variant is used by the REST API so failure is acceptable.
virtual future<> request_stop_server() = 0;
virtual future<utils::chunked_vector<client_data>> get_client_data() {
return make_ready_future<utils::chunked_vector<client_data>>(utils::chunked_vector<client_data>());
virtual future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data() {
return make_ready_future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>>();
}
protocol_server(seastar::scheduling_group sg) noexcept : _sched_group(std::move(sg)) {}

View File

@@ -691,6 +691,7 @@ client_data cql_server::connection::make_client_data() const {
cd.connection_stage = client_connection_stage::authenticating;
}
cd.scheduling_group_name = _current_scheduling_group.name();
cd.client_options = _client_state.get_client_options();
cd.ssl_enabled = _ssl_enabled;
cd.ssl_protocol = _ssl_protocol;
@@ -958,12 +959,17 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_st
}
if (auto driver_ver_opt = options.find("DRIVER_VERSION"); driver_ver_opt != options.end()) {
_client_state.set_driver_version(driver_ver_opt->second);
co_await _client_state.set_driver_version(_server._connection_options_keys_and_values, driver_ver_opt->second);
}
if (auto driver_name_opt = options.find("DRIVER_NAME"); driver_name_opt != options.end()) {
_client_state.set_driver_name(driver_name_opt->second);
co_await _client_state.set_driver_name(_server._connection_options_keys_and_values, driver_name_opt->second);
}
// Store all received client options for later exposure in the system.clients 'client_options' column
// (a frozen map<text, text>). Options are cached to reduce memory overhead by deduplicating
// identical key/value sets across multiple connections (e.g., same driver name/version).
co_await _client_state.set_client_options(_server._connection_options_keys_and_values, options);
cql_protocol_extension_enum_set cql_proto_exts;
for (cql_protocol_extension ext : supported_cql_protocol_extensions()) {
if (options.contains(protocol_extension_name(ext))) {
@@ -1647,6 +1653,9 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_supported(int
opts.insert({"CQL_VERSION", cql3::query_processor::CQL_VERSION});
opts.insert({"COMPRESSION", "lz4"});
opts.insert({"COMPRESSION", "snappy"});
// CLIENT_OPTIONS value is a JSON string that can be used to pass client-specific configuration,
// e.g. CQL driver configuration.
opts.insert({"CLIENT_OPTIONS", ""});
if (_server._config.allow_shard_aware_drivers) {
opts.insert({"SCYLLA_SHARD", format("{:d}", this_shard_id())});
opts.insert({"SCYLLA_NR_SHARDS", format("{:d}", smp::count)});
@@ -2308,11 +2317,11 @@ const cql3::cql_metadata_id_type& cql_metadata_id_wrapper::get_response_metadata
return _response_metadata_id.value();
}
future<utils::chunked_vector<client_data>> cql_server::get_client_data() {
utils::chunked_vector<client_data> ret;
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> cql_server::get_client_data() {
utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>> ret;
co_await for_each_gently([&ret] (const generic_server::connection& c) {
const connection& conn = dynamic_cast<const connection&>(c);
ret.emplace_back(conn.make_client_data());
ret.emplace_back(make_foreign(std::make_unique<client_data>(conn.make_client_data())));
});
co_return ret;
}

View File

@@ -206,6 +206,7 @@ private:
seastar::metrics::metric_groups _metrics;
std::unique_ptr<event_notifier> _notifier;
private:
client_options_cache_type _connection_options_keys_and_values;
transport_stats _stats;
auth::service& _auth_service;
qos::service_level_controller& _sl_controller;
@@ -234,7 +235,7 @@ public:
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
}
future<utils::chunked_vector<client_data>> get_client_data();
future<utils::chunked_vector<foreign_ptr<std::unique_ptr<client_data>>>> get_client_data();
future<> update_connections_scheduling_group();
future<> update_connections_service_level_params();
future<std::vector<connection_service_level_params>> get_connections_service_level_params();

View File

@@ -1547,8 +1547,8 @@ void reclaim_timer::report() const noexcept {
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
auto info_level = _stall_detected ? log_level::info : log_level::debug;
auto MiB = 1024*1024;
auto msg_extra = extra_msg_when_stall_detected(_stall_detected,
_stall_detected ? current_backtrace() : saved_backtrace{});
auto msg_extra = extra_msg_when_stall_detected(_stall_detected && !_preemptible,
(_stall_detected && !_preemptible) ? current_backtrace() : saved_backtrace{});
timing_logger.log(time_level, "{} took {} us, trying to release {:.3f} MiB {}preemptibly, reserve: {{goal: {}, max: {}}}{}",
_name, (_duration + 500ns) / 1us, (float)_memory_to_release / MiB, _preemptible ? "" : "non-",