Compare commits

...

47 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
ba6e3f87e4 Update iterator methods and utility methods (range, bounds, tombstone_for_row, row_count)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:38:26 +00:00
copilot-swe-agent[bot]
03005ab209 Update critical row access methods (clustered_row, insert_row, find_row, empty)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:36:36 +00:00
copilot-swe-agent[bot]
1ee1e9acd3 Update more mutation_partition methods for variant storage
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:34:07 +00:00
copilot-swe-agent[bot]
501bb812c1 Update mutation_partition constructors for variant storage (WIP)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:31:35 +00:00
copilot-swe-agent[bot]
11d66bb9fb Add variant infrastructure to mutation_partition (WIP - compilation will fail)
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-24 21:29:08 +00:00
copilot-swe-agent[bot]
5829b4d102 Initial plan 2025-12-24 21:22:11 +00:00
Michał Hudobski
ce3320a3ff auth: add system table permissions to VECTOR_SEARCH_INDEXING
Due to the recent changes in the vector store service,
the service needs to read two of the system tables
to function correctly. This was not accounted for
when the new permission was added. This patch fixes that
by allowing these tables (group0_history and versions)
to be read with the VECTOR_SEARCH_INDEXING permission.

We also add a test that validates this behavior.

Fixes: SCYLLADB-73

Closes scylladb/scylladb#27546
2025-12-23 15:53:07 +02:00
Pawel Pery
caa0cbe328 vector_search_validator: move high availability tests from vector-store.git
Initially, tests for high availability were implemented in vector-store.git
repository. High availability is currently implemented in scylladb.git
repository so this repository should be the better place to store them. This
commit copies these tests into the scylladb.git.

The commit copies validator-vector-store/src/high_availability.rs (tests logic)
and validator-tests/src/common.rs (utils for tests) into the local crate
validator-scylla. The common.rs should be copied to be able for reviewer to see
common test code and this code most likely be frequent to change - it will be
hard to maintain one common version between two repositories.

The commit updates also README for vector_search_validator; it shortly describe
the validator modules.

The commit updates reference to the latest vector-store.git master.

As a next step on the vector-store.git high_availability.rs would be removed
and common.rs moved from validator-tests into validator-vector-store.

References: VECTOR-394

Closes scylladb/scylladb#27499
2025-12-23 15:53:07 +02:00
Yaron Kaikov
bad2fe72b6 .github/workflows: Add email validator workflow
This workflow validates that all commits in a pull request use email
addresses ending in @scylladb.com. For each commit with an author or
committer email that doesn't match this pattern, the workflow automatically
adds a comment to the pull request with a warning.

This serves two purposes:

1. Alert maintainers when external contributors submit code (which is
   acceptable, but good to be aware of)

2. Help ScyllaDB developers catch cases where they haven't configured
   their git email correctly

When a non-@scylladb.com email is detected, the workflow posts this
comment on the pull request:
```
    ⚠️ Non-@scylladb.com Email Addresses Detected

    Found commit(s) with author or committer emails that don't end with
    @scylladb.com.

    This indicates either:

    - An external contributor (acceptable, but maintainer should be aware)
    - A developer who hasn't configured their git email correctly

    For ScyllaDB developers:

    If you're a ScyllaDB employee, please configure your git email globally:

        git config --global user.email "your.name@scylladb.com"

    If only your most recent commit is invalid, you can amend it:

        git commit --amend --reset-author --no-edit
        git push --force

    If you have multiple invalid commits, you need to rewrite them all:

        git rebase -i <base-commit>
        # Mark each invalid commit as 'edit', then for each:
        git commit --amend --reset-author --no-edit
        git rebase --continue
        # Repeat for each invalid commit
        git push --force
```
Fixes: https://scylladb.atlassian.net/browse/RELENG-35

Closes scylladb/scylladb#27796
2025-12-23 15:53:06 +02:00
Pavel Emelyanov
ec15a1b602 table: Do not move foreign string when writing snapshot
The table::seal_snapshot() accepts a vector of sstables filenames and
writes them into manifest file. For that, it iterates over the vector
and moves all filenames from it into the streamer object.

The problem is that the vector contains foreign pointers on sets with
sstrings. Not only sets are foreign, sstrings in it are foreign too.
It's not correct to std::move() them to local CPU.

The fix is to make streamer object work on string_view-s and populate it
with non-owning references to the sstrings from aforementioned sets.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27755
2025-12-23 15:53:06 +02:00
Pavel Emelyanov
ecef158345 api: Use ranges library to process views in get_built_indexes()
No functional changes, just make the loop shorter and more
self-contained.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27742
2025-12-23 15:53:06 +02:00
Israel Fruchter
53abf93bd8 Update tools/cqlsh submodule
* tools/cqlsh 22401228...9e5a91d7 (7):
  > Add pip retry configuration to handle network timeouts
  > Clean up unwanted build artifacts and update .gitignore
  > test_legacy_auth: update to pytest format
  > Add support for disabling compression via CLI and cqlshrc
  > Update scylla-driver to 3.29.7 (#144)
  > Update scylla-driver version to 3.29.6
  > Revert "Migrate workflows to Blacksmith"

Closes scylladb/scylladb#27567

[avi: build optimized clang 21.1.7
      regenerate frozen toolchain with optimized clang from

      https://devpkg.scylladb.com/clang/clang-21.1.7-Fedora-43-aarch64.tar.gz
      https://devpkg.scylladb.com/clang/clang-21.1.7-Fedora-43-x86_64.tar.gz
]

Closes scylladb/scylladb#27734
2025-12-23 15:53:06 +02:00
Aleksandra Martyniuk
bbe64e0e2a test: rename duplicate tests
There are two test with name test_repair_options_hosts_tablets in
test/nodetool/test_cluster_repair.py and and two test_repair_keyspace
in test/nodetool/test_repair.py. Due to that one of each pair is ignored.

Rename the tests so that they are unique.

Fixes: https://github.com/scylladb/scylladb/issues/27701.

Closes scylladb/scylladb#27720
2025-12-23 15:53:06 +02:00
Anna Stuchlik
7198191aa9 doc: fix the license information on DockerHub
This commit removes the OSS-related information from DockerHub.
It adds the link to the Source Available license.

Fixes https://github.com/scylladb/scylladb/issues/22440

Closes scylladb/scylladb#27706
2025-12-23 15:53:06 +02:00
Calle Wilund
d5f72cd5fc test::pylib::encryption_provider: Push up setting system_key_directory to all providers
Fixes #27694

Unless set by config, the location will default to /etc/scylla, which is not a good
place to write things for tests. Push the config properly and the directory (but
_not_ creation) to all provider basetype.

Closes scylladb/scylladb#27696
2025-12-23 15:53:06 +02:00
Dawid Mędrek
afde5f668a test: Implement describing Boost tests in JSON format
The Boost.Test framework offers a way to describe tests written in it
by running them with the option `--list_content`. It can be
parametrized by either HRF (Human Readable Format) or DOT (the Graphviz
graph format) [1]. Thanks to that, we can learn the test tree structure
and collect additional information about the tests (e.g. labels [2]).

We currently emply that feature of the framework to collect and run
Boost tests in Scylla. Unfortunately, both formats have their
shortcomings:

* HRF: the format is simple to parse, but it doesn't contain all
       relevant information, e.g. labels.
* DOT: the format is designed for creating graphical visualizations,
       and it's relatively difficult to parse.

To amend those problems, we implement a custom extension of the feature.
It produces output in the JSON format and contains more than the most
basic information about the tests; at the same time, it's easy to browse
and parse.

To obtain that output, the user needs to call a Boost.Test executable
with the option `--list_json_content`. For example:

```
$ ./path/to/test/exec -- --list_json_content
```

Note that the argument should be prepended with a `--` to indicate that
it targets user code, not Boost.Test itself.

---

The structure of the new format looks like this (top-level downwards):

- File name
- Test suite(s) & free test cases
- Test cases wrapped in test suites

Note that it's different from the output the default Boost.Test formats
produce: they organize information within test suites, which can
potentially span multiple files [3]. The JSON format makes test files
the primary object of interest and test suites from different files
are always considered distinct.

Example of the output (after applying some formatting):

```
$ ./build/dev/test/boost/canonical_mutation_test -- --list_json_content
[{"file":"test/boost/canonical_mutation_test.cc", "content": {
  "suites": [],
  "tests": [
    {"name": "test_conversion_back_and_forth", "labels": ""},
    {"name": "test_reading_with_different_schemas", "labels": ""}
  ]
}}]
```

---

The implementation may be seen as a bit ugly, and it's effectively
a hack. It's based on registering a global fixture [4] and linking
that code to every Boost.Test executable.

Unfortunately, there doesn't seem to be any better way. That would
require more extensive changes in the test files (e.g. enforcing
going through the same entry point in all of them).

This implementation is a compromise between simplicity and
effectiveness. The changes are kept minimal, while the developers
writing new tests shouldn't need to remember to do anything special.
Everything should work out of the box (at least as long as there's
no non-trivial linking involved).

Fixes scylladb/scylladb#25415

---

References:
[1] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/utf_reference/rt_param_reference/list_content.html
[2] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/tests_grouping.html
[3] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree/test_suite.html
[4] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/fixtures/global.html

Closes scylladb/scylladb#27527
2025-12-23 15:53:06 +02:00
Asias He
140858fc22 tablet-mon.py: Add repair support
Add repair support in the tablet monitor.

Fixes #24824

Closes scylladb/scylladb#27400
2025-12-23 15:53:06 +02:00
Botond Dénes
58b5d43538 Merge 'test: multi LWT and counters test during tablets resize and migration' from Yauheni Khatsianevich
This PR extends BaseLWTTester with optional counter-table configuration and
verification, enabling randomized LWT tests over tablets with counters.

And introduces new LWT with counters test durng tablets resize and migration
- Workload: N workers perform CAS updates
- Update counter table each time CAS was successful
- Enable balancing and increase min_tablet_count to force split,
 and lower min_tablet_count to merge.
- Run tablets migrations loop
- Stop workload and verify data consistency

Refs: https://github.com/scylladb/qa-tasks/issues/1918
Refs: https://github.com/scylladb/qa-tasks/issues/1988
Refs https://github.com/scylladb/scylladb/issues/18068

Closes scylladb/scylladb#27170

* github.com:scylladb/scylladb:
  test: new LWT with counters test during tablets migration/resize - Workload: N workers perform CAS updates - Update counter table each time CAS was successful - Enable balancing and increase min_tablet_count to force split,  and lower min_tablet_count to merge. - Run tablets migrations loop - Stop workload and verify data consistency
  test/lwt: add counter-table support to BaseLWTTester
2025-12-23 07:29:35 +02:00
Botond Dénes
bfdd4f7776 Merge 'Synchronize incremental repair and tablet split' from Raphael Raph Carvalho
Split prepare can run concurrently with repair.

Consider this:

1) split prepare starts
2) incremental repair starts
3) split prepare finishes
4) incremental repair produces unsplit sstable
5) split is not happening on sstable produced by repair
        5.1) that sstable is not marked as repaired yet
        5.2) might belong to repairing set (has compaction disabled)
6) split executes
7) repairing or repaired set has unsplit sstable

If split was acked to coordinator (meaning prepare phase finished),
repair must make sure that all sstables produced by it are split.
It's not happening today with incremental repair because it disables
split on sstables belonging to repairing group. And there's a window
where sstables produced by repair belong to that group.

To solve the problem, we want the invariant where all sealed sstables
will be split.
To achieve this, streaming consumers are patched to produce unsealed
sstable, and the new variant add_new_sstable_and_update_cache() will
take care of splitting the sstable while it's unsealed.
If no split is needed, the new sstable will be sealed and attached.

This solution was also needed to interact nicely with out of space
prevention too. If disk usage is critical, split must not happen on
restart, and the invariant aforementioned allows for it, since any
unsplit sstable left unsealed will be discarded on restart.
The streaming consumer will fail if disk usage is critical too.

The reason interposer consumer doesn't fully solve the problem is
because incremental repair can start before split, and the sstable
being produced when split decision was emitted must be split before
attached. So we need a solution which covers both scenarios.

Fixes #26041.
Fixes #27414.

Should be backported to 2025.4 that contains incremental repair

Closes scylladb/scylladb#26528

* github.com:scylladb/scylladb:
  test: Add reproducer for split vs intra-node migration race
  test: Verify split failure on behalf of repair during critical disk utilization
  test: boost: Add failure_when_adding_new_sstable_test
  test: Add reproducer for split vs incremental repair race condition
  compaction: Fail split of new sstable if manager is disabled
  replica: Don't split in do_add_sstable_and_update_cache()
  streaming: Leave sstables unsealed until attached to the table
  replica: Wire add_new_sstables_and_update_cache() into intra-node streaming
  replica: Wire add_new_sstable_and_update_cache() into file streaming consumer
  replica: Wire add_new_sstable_and_update_cache() into streaming consumer
  replica: Document old add_sstable_and_update_cache() variants
  replica: Introduce add_new_sstables_and_update_cache()
  replica: Introduce add_new_sstable_and_update_cache()
  replica: Account for sstables being added before ACKing split
  replica: Remove repair read lock from maybe_split_new_sstable()
  compaction: Preserve state of input sstable in maybe_split_new_sstable()
  Rename maybe_split_sstable() to maybe_split_new_sstable()
  sstables: Allow storage::snapshot() to leave destination sstable unsealed
  sstables: Add option to leave sstable unsealed in the stream sink
  test: Verify unsealed sstable can be compacted
  sstables: Allow unsealed sstable to be loaded
  sstables: Restore sstable_writer_config::leave_unsealed
2025-12-23 07:28:56 +02:00
Botond Dénes
bf9640457e Merge 'test: add crash detection during tests' from Cezar Moise
After tests end, an extra check is performed, looking into node logs for crashes, aborts and similar issues.
The test directory is also scanned for coredumps.
If any of the above are found, the test will fail with an error.

The following checks are made:
- Any log line matching `Assertion.*failed` or containing `AddressSanitizer` is marked as a critical error
- Lines matching `Aborting on shard` will only be marked as a critical error if the paterns in `manager.ignore_cores_log_patterns` are not found in that log
- If any critical error is found, the log is also scanned for backtraces
- Any backtraces found are decoded and saved
- If the test is marked with `@pytest.mark.check_nodes_for_errors`, the logs are checked for any `ERROR` lines
- Any pattern in `manager.ignore_log_patterns` and `manager.ignore_cores_log_patterns` will cause above check to ignore that line
- The `expected_error` value that many methods, like `manager.decommission_node`, have will be automatically appended to `manager.ignore_log_patterns`

refs: https://github.com/scylladb/qa-tasks/issues/1804

---

[Examples](https://jenkins.scylladb.com/job/scylla-staging/job/cezar/job/byo_build_tests_dtest/46/testReport/):
Following examples are run on a separate branch where changes have been made to enable these failures.

`test_unfinished_writes_during_shutdown`
- Errors are found in logs and are not ignored
```
failed on teardown with "Failed:
Server 2096: found 1 error(s) (log: scylla-2096.log)
  ERROR 2025-12-15 14:20:06,563 [shard 0: gms] raft_topology - raft_topology_cmd barrier_and_drain failed with: std::runtime_error (raft topology: command::barrier_and_drain, the version has changed, version 11, current_version 12, the topology change coordinator  had probably migrated to another node)
Server 2101: found 4 error(s) (log: scylla-2101.log)
  ERROR 2025-12-15 14:20:04,674 [shard 0:strm] repair - repair[c434c0c0-68da-472c-ba3e-ed80960ce0d5]: Repair 1 out of 4 ranges, keyspace=system_distributed, table=view_build_status, range=(minimum token,maximum token), peers=[27c027a6-603d-49d0-8766-1b085d8c7d29, b549cb36-fae8-490b-a19e-86d42e7aa07a, f7049967-81ff-4296-9be7-9d6a4d33a29e], live_peers=[b549cb36-fae8-490b-a19e-86d42e7aa07a, f7049967-81ff-4296-9be7-9d6a4d33a29e], status=failed: mandatory neighbor=27c027a6-603d-49d0-8766-1b085d8c7d29 is not alive
  ERROR 2025-12-15 14:20:04,674 [shard 1:strm] repair - repair[c434c0c0-68da-472c-ba3e-ed80960ce0d5]: Repair 1 out of 4 ranges, keyspace=system_distributed, table=view_build_status, range=(minimum token,maximum token), peers=[27c027a6-603d-49d0-8766-1b085d8c7d29, b549cb36-fae8-490b-a19e-86d42e7aa07a, f7049967-81ff-4296-9be7-9d6a4d33a29e], live_peers=[b549cb36-fae8-490b-a19e-86d42e7aa07a, f7049967-81ff-4296-9be7-9d6a4d33a29e], status=failed: mandatory neighbor=27c027a6-603d-49d0-8766-1b085d8c7d29 is not alive
  ERROR 2025-12-15 14:20:04,675 [shard 0: gms] raft_topology - raft_topology_cmd stream_ranges failed with: std::runtime_error (["shard 0: std::runtime_error (repair[c434c0c0-68da-472c-ba3e-ed80960ce0d5]: 1 out of 4 ranges failed, keyspace=system_distributed, tables=[\"view_build_status\", \"cdc_generation_timestamps\", \"service_levels\", \"cdc_streams_descriptions_v2\"], repair_reason=bootstrap, nodes_down_during_repair={27c027a6-603d-49d0-8766-1b085d8c7d29}, aborted_by_user=false, failed_because=std::runtime_error (Repair mandatory neighbor=27c027a6-603d-49d0-8766-1b085d8c7d29 is not alive, keyspace=system_distributed, mandatory_neighbors=[27c027a6-603d-49d0-8766-1b085d8c7d29, b549cb36-fae8-490b-a19e-86d42e7aa07a, f7049967-81ff-4296-9be7-9d6a4d33a29e]))", "shard 1: std::runtime_error (repair[c434c0c0-68da-472c-ba3e-ed80960ce0d5]: 1 out of 4 ranges failed, keyspace=system_distributed, tables=[\"view_build_status\", \"cdc_generation_timestamps\", \"service_levels\", \"cdc_streams_descriptions_v2\"], repair_reason=bootstrap, nodes_down_during_repair={27c027a6-603d-49d0-8766-1b085d8c7d29}, aborted_by_user=false, failed_because=std::runtime_error (Repair mandatory neighbor=27c027a6-603d-49d0-8766-1b085d8c7d29 is not alive, keyspace=system_distributed, mandatory_neighbors=[27c027a6-603d-49d0-8766-1b085d8c7d29, b549cb36-fae8-490b-a19e-86d42e7aa07a, f7049967-81ff-4296-9be7-9d6a4d33a29e]))"])
  ERROR 2025-12-15 14:20:06,812 [shard 0:main] init - Startup failed: std::runtime_error (Bootstrap failed. See earlier errors (Rolled back: Failed stream ranges: std::runtime_error (failed status returned from 9dd942aa-acec-4105-9719-9bda403e8e94)))
Server 2094: found 1 error(s) (log: scylla-2094.log)
  ERROR 2025-12-15 14:20:04,675 [shard 0: gms] raft_topology - send_raft_topology_cmd(stream_ranges) failed with exception (node state is bootstrapping): std::runtime_error (failed status returned from 9dd942aa-acec-4105-9719-9bda403e8e94)"
```

`test_kill_coordinator_during_op`
- aborts caused by injection
- `ignore_cores_log_patterns` is not set
- while there are errors in logs and `ignore_log_patterns` is not set, they are ignored automatically due to the `expected_error` parameter, such as in `await manager.decommission_node(server_id=other_nodes[-1].server_id, expected_error="Decommission failed. See earlier errors")`
```
failed on teardown with "Failed:
Server 1105: found 1 critical error(s), 1 backtrace(s) (log: scylla-1105.log)
  Aborting on shard 0, in scheduling group gossip.
  1 backtrace(s) saved in scylla-1105-backtraces.txt
Server 1106: found 1 critical error(s), 1 backtrace(s) (log: scylla-1106.log)
  Aborting on shard 0, in scheduling group gossip.
  1 backtrace(s) saved in scylla-1106-backtraces.txt
Server 1113: found 1 critical error(s), 1 backtrace(s) (log: scylla-1113.log)
  Aborting on shard 0, in scheduling group gossip.
  1 backtrace(s) saved in scylla-1113-backtraces.txt
Server 1148: found 1 critical error(s), 1 backtrace(s) (log: scylla-1148.log)
  Aborting on shard 0, in scheduling group gossip.
  1 backtrace(s) saved in scylla-1148-backtraces.txt"
```
Decoded backtrace can be found in [failed_test_logs](https://jenkins.scylladb.com/job/scylla-staging/job/cezar/job/byo_build_tests_dtest/46/artifact/testlog/x86_64/dev/failed_test/test_kill_coordinator_during_op.dev.1)

Closes scylladb/scylladb#26177

* github.com:scylladb/scylladb:
  test: add logging to crash_coordinator_before_stream injection
  test: add crash detection during tests
  test.py: add pid to ServerInfo
2025-12-23 07:27:58 +02:00
Cezar Moise
0ef8ca4c57 test: add logging to crash_coordinator_before_stream injection
In order to have the test ignore crashes caused by the injection,
it needs to log its occurence.
2025-12-18 16:28:13 +02:00
Cezar Moise
95d0782f89 test: add crash detection during tests
After tests end, an extra check if performed, looking into node logs.
By default, it only searches for critical errors and scans for coredumps.
If the test has the fixture `check_nodes_for_errors`, it will search for all errors.
Both checks can be ignored by setting `ignore_cores_log_patterns` and `ignore_log_patterns`.
If any of the above are found, the test will fail with an error.
2025-12-18 16:28:13 +02:00
Yauheni Khatsianevich
07867a9a0d test: new LWT with counters test during tablets migration/resize
- Workload: N workers perform CAS updates
- Update counter table each time CAS was successful
- Enable balancing and increase min_tablet_count to force split,
 and lower min_tablet_count to merge.
- Run tablets migrations loop
- Stop workload and verify data consistency
2025-12-15 14:32:30 +01:00
Raphael S. Carvalho
a0a7941eb1 test: Add reproducer for split vs intra-node migration race
This is a problem caught after removing split from
add_sstable_and_update_cache(), which was used by
intra node migration when loading new sstables
into the destination shard.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 17:01:18 -03:00
Raphael S. Carvalho
e3b9abdb30 test: Verify split failure on behalf of repair during critical disk utilization
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 17:01:18 -03:00
Raphael S. Carvalho
bc772b791d test: boost: Add failure_when_adding_new_sstable_test
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 17:01:18 -03:00
Raphael S. Carvalho
77a4f95eb8 test: Add reproducer for split vs incremental repair race condition
Refs #26041.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 17:01:16 -03:00
Raphael S. Carvalho
992bfb9f63 compaction: Fail split of new sstable if manager is disabled
If manager has been disabled due to out of space prevention, it's
important to throw an exception rather than silently not
splitting the new sstable.

Not splitting a sstable when needed can cause correctness issue
when finalizing split later.
It's better to fail the writer (e.g. repair one) which will be
retried than making caller think everything succeeded.

The new replica::table::add_new_sstable_and_update_cache() will
now unlink the new sstable on failure, so the table dir will
not be left with sstables not loaded.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:51 -03:00
Raphael S. Carvalho
ee3a743dc4 replica: Don't split in do_add_sstable_and_update_cache()
Now, only sstable loader on boot and refresh from upload uses this
procedure. The idea is that maybe_split_new_sstable() will throw
when compaction cannot run due to e.g. out of space prevention.
It could fail repair writer, but we don't want it to fail boot.

As for refresh from upload, it's not supposed to work when tablet
map at the time of backup is not the same when restoring.
Even before this, refresh would fail if split already executed,
split would only happen if split was still ongoing. We need
token range stability for local restore. The safe variant will
always be load and stream.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:51 -03:00
Raphael S. Carvalho
48d243f32f streaming: Leave sstables unsealed until attached to the table
We want the invariant that after ACK, all sealed sstables will be split.

This guarantee that on restart, no unsplit sstables will be found
sealed.

The paths that generate unsplit sstables are streaming and file
streaming consumers. It includes intra-node streaming, which
is local but can clone an unsplit sstable into destination.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:51 -03:00
Raphael S. Carvalho
d9d58780e2 replica: Wire add_new_sstables_and_update_cache() into intra-node streaming
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:51 -03:00
Raphael S. Carvalho
ddb27488fa replica: Wire add_new_sstable_and_update_cache() into file streaming consumer
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:51 -03:00
Raphael S. Carvalho
10225ee434 replica: Wire add_new_sstable_and_update_cache() into streaming consumer
After the wiring, failure to attach the new sstable in the streaming
consumer will unlink the sstable automatically.

Fixes #27414.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:51 -03:00
Raphael S. Carvalho
a72025bbf6 replica: Document old add_sstable_and_update_cache() variants
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:51 -03:00
Raphael S. Carvalho
3f8363300a replica: Introduce add_new_sstables_and_update_cache()
Piggyback on new add_new_sstable_and_update_cache(), replacing
the previous add_sstables_and_update_cache().

Will be used by intra-node migration since we want it to be
safe when loading the cloned sstables. An unsplit sstable
can be cloned into destination which already ACKed split,
so we need this variant which splits sstable if needed,
while it's unsealed.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
63d1d6c39b replica: Introduce add_new_sstable_and_update_cache()
Failure to load sstable in streaming can leave sealed sstables
on disk since they're not unlinked on failure.

This can result in several problems:
1) Data resurrection: since the sstable may contain deleted data
2) Split issue: since the finalization requires all sstables to be split
3) Disk usage issue: since the sstables hold space and streaming retries
can keep accumulating these files.

This new procedure will be later wired into streaming consumers, in
order to fix those problems.

Another benefit of the interface is that if there's split when adding
the new sstable, the output sstables will be returned to the caller,
allowing them to register the actual loaded sstables into e.g.
the view builder.

Refs #27414.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
27d460758f replica: Account for sstables being added before ACKing split
We want the invariant that after ACK, all sealed sstables will be split.
If check-and-attach is not atomic, this sequence is possible:

1) no split decision set.
2) Unsplit sstable is checked, no need to split, sealed.
3) split decision is set and ACKed
4) unsplit sstable is attached

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
794e03856a replica: Remove repair read lock from maybe_split_new_sstable()
The lock is intended to serialize some maintenance compactions,
such as major, with repair. But maybe_split_new_sstable() is
restricted solely to new sstables that aren't part of the
sstable set.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
2dae0a7380 compaction: Preserve state of input sstable in maybe_split_new_sstable()
This is crucial with MVs, since the splitting must preserve the state of
the original sstable. We want the sstable to be in staging dir, so it's
excluded when calculating the diff for performing pushes to view
replicas.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
1fdc410e24 Rename maybe_split_sstable() to maybe_split_new_sstable()
Since the function must only be used on new sstables, it should
be renamed to something describing its usage should be restricted.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
1a077a80f1 sstables: Allow storage::snapshot() to leave destination sstable unsealed
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
c5e840e460 sstables: Add option to leave sstable unsealed in the stream sink
That will be needed for file streaming to leave output sstable unsealed.

we want the invariant where all sealed sstables are split after split
was ACKed.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
c10486a5e9 test: Verify unsealed sstable can be compacted
This is crucial for splitting before sealing the sstable produced by
repair. This way, unsplit sstables won't be left on disk sealed.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
ab82428228 sstables: Allow unsealed sstable to be loaded
File streaming will have to load an unsealed sstable, so we need
to be able to parse components from temporary TOC instead.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Raphael S. Carvalho
b1be4ba2fc sstables: Restore sstable_writer_config::leave_unsealed
This option was retired in commit 0959739216, but
it will be again needed in order to implement split before sealing.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2025-12-12 16:59:50 -03:00
Cezar Moise
7c8ab3d3d3 test.py: add pid to ServerInfo
Adding pid info to servers allows matching coredumps with servers

Other improvements:
- When replacing just some fields of ServerInfo, use `_replace` instead of
building a new object. This way it is agnostic to changes to the Object
- When building ServerInfo from a list, the types defined for its fields are
not enforced, so ServerInfo(*list) works fine and does not need to be changed if
fields are added or removed.
2025-12-12 15:11:03 +02:00
Yauheni Khatsianevich
f12adfc292 test/lwt: add counter-table support to BaseLWTTester
Extend BaseLWTTester with optional counter-table configuration and
verification, enabling randomized LWT tests over tablets
 with counters.
2025-12-08 15:57:33 +01:00
64 changed files with 2942 additions and 259 deletions

View File

@@ -0,0 +1,13 @@
name: validate_pr_author_email
on:
pull_request_target:
types:
- opened
- synchronize
- reopened
jobs:
validate_pr_author_email:
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main

View File

@@ -547,17 +547,13 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
vp.insert(b.second);
}
}
std::vector<sstring> res;
replica::database& db = vb.local().get_db();
auto uuid = validate_table(db, ks, cf_name);
replica::column_family& cf = db.find_column_family(uuid);
res.reserve(cf.get_index_manager().list_indexes().size());
for (auto&& i : cf.get_index_manager().list_indexes()) {
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
res.emplace_back(i.metadata().name());
}
}
co_return res;
co_return cf.get_index_manager().list_indexes()
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
| std::ranges::to<std::vector>();
});
}

View File

@@ -12,6 +12,7 @@
#include <seastar/core/condition-variable.hh>
#include "schema/schema_fwd.hh"
#include "sstables/open_info.hh"
#include "compaction_descriptor.hh"
class reader_permit;
@@ -44,7 +45,7 @@ public:
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
virtual sstables::shared_sstable make_sstable() const = 0;
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;

View File

@@ -416,7 +416,9 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
}
descriptor.creator = [&t] (shard_id) {
return t.make_sstable();
// All compaction types going through this path will work on normal input sstables only.
// Off-strategy, for example, waits until the sstables move out of staging state.
return t.make_sstable(sstables::sstable_state::normal);
};
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
@@ -1847,6 +1849,10 @@ protected:
throw make_compaction_stopped_exception();
}
}, false);
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
throw make_compaction_stopped_exception();
}
co_return co_await do_rewrite_sstable(std::move(sst));
}
};
@@ -2284,12 +2290,16 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
}
future<std::vector<sstables::shared_sstable>>
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
co_return std::vector<sstables::shared_sstable>{sst};
}
if (!can_proceed(&t)) {
co_return std::vector<sstables::shared_sstable>{sst};
// Throw an error if split cannot be performed due to e.g. out of space prevention.
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
// which is uneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
if (is_disabled()) {
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
"reason might be out of space prevention", sst->get_filename()))));
}
std::vector<sstables::shared_sstable> ret;
@@ -2297,8 +2307,11 @@ compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction
compaction_progress_monitor monitor;
compaction_data info = create_compaction_data();
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
desc.creator = [&t] (shard_id _) {
return t.make_sstable();
desc.creator = [&t, sst] (shard_id _) {
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
// For example, if base table has views, it's important that sstable produced by repair will be
// in the staging state.
return t.make_sstable(sst->state());
};
desc.replacer = [&] (compaction_completion_desc d) {
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));

View File

@@ -376,7 +376,8 @@ public:
// Splits a single SSTable by segregating all its data according to the classifier.
// If SSTable doesn't need split, the same input SSTable is returned as output.
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
// Run a custom job for a given table, defined by a function
// it completes when future returned by job is ready or returns immediately

View File

@@ -1698,6 +1698,18 @@ deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vect
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
# We need to link these files to all Boost tests to make sure that
# we can execute `--list_json_content` on them. That will produce
# a similar result as calling `--list_content={HRF,DOT}`.
# Unfortunately, to be able to do that, we're forced to link the
# relevant code by hand.
for key in deps.keys():
for prefix in boost_tests_prefixes:
if key.startswith(prefix):
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
wasm_deps = {}
wasm_deps['wasm/return_input.wat'] = 'test/resource/wasm/rust/return_input.rs'

View File

@@ -200,6 +200,7 @@ public:
static constexpr auto DICTS = "dicts";
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
static constexpr auto CLIENT_ROUTES = "client_routes";
static constexpr auto VERSIONS = "versions";
// auth
static constexpr auto ROLES = "roles";

View File

@@ -605,8 +605,8 @@ public:
}
static schema_ptr build_schema() {
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("version", utf8_type)
.with_column("build_mode", utf8_type)

View File

@@ -2,8 +2,11 @@
## What is ScyllaDB?
ScyllaDB is a high-performance NoSQL database system, fully compatible with Apache Cassandra.
ScyllaDB is released under the GNU Affero General Public License version 3 and the Apache License, ScyllaDB is free and open-source software.
ScyllaDB is a high-performance NoSQL database optimized for speed and scalability.
It is designed to efficiently handle large volumes of data with minimal latency,
making it ideal for data-intensive applications.
ScyllaDB is distributed under the [ScyllaDB Source Available License](https://github.com/scylladb/scylladb/blob/master/LICENSE-ScyllaDB-Source-Available.md).
> [ScyllaDB](http://www.scylladb.com/)

View File

@@ -45,7 +45,9 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
: _tombstone(x._tombstone)
, _static_row(s, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _rows(use_single_row_storage(s) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _row_tombstones(x._row_tombstones)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
@@ -54,10 +56,30 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
return current_allocator().construct<rows_entry>(s, *x);
};
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
if (use_single_row_storage(s)) {
// Copy single row if it exists
if (x.uses_single_row_storage()) {
const auto& x_row = x.get_single_row_storage();
if (x_row) {
get_single_row_storage() = deletable_row(s, *x_row);
}
} else if (!x.get_rows_storage().empty()) {
// Converting from multi-row to single-row - take the first row
// This shouldn't normally happen as schema doesn't change this way
on_internal_error(mplog, "mutation_partition: cannot convert multi-row partition to single-row");
}
} else {
// Multi-row storage
if (x.uses_single_row_storage()) {
// Converting from single-row to multi-row - this shouldn't normally happen
on_internal_error(mplog, "mutation_partition: cannot convert single-row partition to multi-row");
} else {
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
return current_allocator().construct<rows_entry>(s, *x);
};
get_rows_storage().clone_from(x.get_rows_storage(), cloner, current_deleter<rows_entry>());
}
}
}
mutation_partition::mutation_partition(const mutation_partition& x, const schema& schema,
@@ -65,7 +87,9 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
: _tombstone(x._tombstone)
, _static_row(schema, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _rows(use_single_row_storage(schema) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(schema.version())
@@ -74,19 +98,37 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
_rows.insert_before_hint(_rows.end(), std::move(ce), rows_entry::tri_compare(schema));
if (use_single_row_storage(schema)) {
// Single-row storage: just copy the row if it exists
if (x.uses_single_row_storage()) {
const auto& x_row = x.get_single_row_storage();
if (x_row) {
get_single_row_storage() = deletable_row(schema, *x_row);
}
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
_row_tombstones.apply(schema, rt.tombstone());
} else {
// Filtering from multi-row - shouldn't happen with consistent schema
on_internal_error(mplog, "mutation_partition: filtering from multi-row to single-row storage");
}
} else {
// Multi-row storage with filtering
if (x.uses_single_row_storage()) {
on_internal_error(mplog, "mutation_partition: filtering from single-row to multi-row storage");
} else {
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(ce), rows_entry::tri_compare(schema));
}
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
_row_tombstones.apply(schema, rt.tombstone());
}
}
} catch (...) {
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
throw;
}
}
} catch (...) {
_rows.clear_and_dispose(current_deleter<rows_entry>());
throw;
}
}
@@ -104,14 +146,20 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(x._schema_version == _schema_version);
#endif
{
auto deleter = current_deleter<rows_entry>();
auto it = _rows.begin();
for (auto&& range : ck_ranges.ranges()) {
_rows.erase_and_dispose(it, lower_bound(schema, range), deleter);
it = upper_bound(schema, range);
if (use_single_row_storage(schema)) {
// Single-row storage: no filtering needed, row either exists or doesn't
// The move constructor has already moved the row if it exists
} else {
// Multi-row storage: filter the rows
if (!uses_single_row_storage()) {
auto deleter = current_deleter<rows_entry>();
auto it = get_rows_storage().begin();
for (auto&& range : ck_ranges.ranges()) {
get_rows_storage().erase_and_dispose(it, lower_bound(schema, range), deleter);
it = upper_bound(schema, range);
}
get_rows_storage().erase_and_dispose(it, get_rows_storage().end(), deleter);
}
_rows.erase_and_dispose(it, _rows.end(), deleter);
}
{
for (auto&& range : ck_ranges.ranges()) {
@@ -127,7 +175,11 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
}
mutation_partition::~mutation_partition() {
_rows.clear_and_dispose(current_deleter<rows_entry>());
if (uses_single_row_storage()) {
// Single-row storage: optional destructor handles cleanup
} else {
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
}
}
mutation_partition&
@@ -141,10 +193,14 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
void mutation_partition::ensure_last_dummy(const schema& s) {
check_schema(s);
if (_rows.empty() || !_rows.rbegin()->is_last_dummy()) {
if (uses_single_row_storage()) {
// Single-row storage doesn't use dummy entries
return;
}
if (get_rows_storage().empty() || !get_rows_storage().rbegin()->is_last_dummy()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::yes));
_rows.insert_before(_rows.end(), std::move(e));
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
}
}
@@ -419,9 +475,18 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
check_schema(schema);
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
auto j = _rows.find(key, rows_entry::tri_compare(schema));
if (j != _rows.end()) {
t.apply(j->row().deleted_at(), j->row().marker());
if (use_single_row_storage(schema)) {
// Single-row storage: check if the single row exists and has tombstone
const auto& row_opt = get_single_row_storage();
if (row_opt) {
t.apply(row_opt->deleted_at(), row_opt->marker());
}
} else {
// Multi-row storage: search in B-tree
auto j = get_rows_storage().find(key, rows_entry::tri_compare(schema));
if (j != get_rows_storage().end()) {
t.apply(j->row().deleted_at(), j->row().marker());
}
}
return t;
@@ -504,97 +569,178 @@ void mutation_partition::apply_insert(const schema& s, clustering_key_view key,
clustered_row(s, key).apply(row_marker(created_at, ttl, expiry));
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key, std::move(row)));
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
if (use_single_row_storage(s)) {
// Single-row storage: just set the row
get_single_row_storage() = std::move(row);
} else {
// Multi-row storage: insert into B-tree
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key, std::move(row)));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
}
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
check_schema(s);
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, key, row));
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
if (use_single_row_storage(s)) {
// Single-row storage: just copy the row
get_single_row_storage() = row;
} else {
// Multi-row storage: insert into B-tree
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, key, row));
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
}
}
const row*
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
check_schema(s);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
if (use_single_row_storage(s)) {
// Single-row storage: return the single row's cells if it exists
const auto& row_opt = get_single_row_storage();
if (row_opt) {
return &row_opt->cells();
}
return nullptr;
} else {
// Multi-row storage: search in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
return nullptr;
}
return &i->row().cells();
}
return &i->row().cells();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(std::move(key)));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(std::move(key)));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
return i->row();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
return i->row();
}
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
check_schema(s);
check_row_key(s, key, is_dummy::no);
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
if (use_single_row_storage(s)) {
// Single-row storage: create row if it doesn't exist
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
// Multi-row storage: find or insert in B-tree
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return i->row();
}
return i->row();
}
rows_entry&
mutation_partition::clustered_rows_entry(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
check_row_key(s, pos, dummy);
auto i = _rows.find(pos, rows_entry::tri_compare(s));
if (i == _rows.end()) {
if (use_single_row_storage(s)) {
// Single-row storage doesn't use rows_entry - this shouldn't be called
on_internal_error(mplog, "mutation_partition::clustered_rows_entry() called with single-row storage");
}
auto i = get_rows_storage().find(pos, rows_entry::tri_compare(s));
if (i == get_rows_storage().end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
}
return *i;
}
deletable_row&
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
return clustered_rows_entry(s, pos, dummy, continuous).row();
if (use_single_row_storage(s)) {
// Single-row storage: ignore dummy/continuous flags, just get/create the row
check_row_key(s, pos, dummy);
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
} else {
return clustered_rows_entry(s, pos, dummy, continuous).row();
}
}
deletable_row&
mutation_partition::append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
check_row_key(s, pos, dummy);
if (use_single_row_storage(s)) {
// Single-row storage: just create/get the row
auto& row_opt = get_single_row_storage();
if (!row_opt) {
row_opt = deletable_row();
}
return *row_opt;
}
const auto cmp = rows_entry::tri_compare(s);
auto i = _rows.end();
if (!_rows.empty() && (cmp(*std::prev(i), pos) >= 0)) {
auto i = get_rows_storage().end();
if (!get_rows_storage().empty() && (cmp(*std::prev(i), pos) >= 0)) {
on_internal_error(mplog, format("mutation_partition::append_clustered_row(): cannot append clustering row with key {} to the partition"
", last clustering row is equal or greater: {}", pos, std::prev(i)->position()));
}
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = _rows.insert_before_hint(i, std::move(e), cmp).first;
i = get_rows_storage().insert_before_hint(i, std::move(e), cmp).first;
return i->row();
}
@@ -602,19 +748,33 @@ mutation_partition::append_clustered_row(const schema& s, position_in_partition_
mutation_partition::rows_type::const_iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (!r.start()) {
return std::cbegin(_rows);
if (use_single_row_storage(schema)) {
// Single-row storage: always return end iterator (empty range)
static const rows_type empty_rows;
return empty_rows.end();
}
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
if (!r.start()) {
return std::cbegin(get_rows_storage());
}
return get_rows_storage().lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
}
mutation_partition::rows_type::const_iterator
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) const {
check_schema(schema);
if (!r.end()) {
return std::cend(_rows);
if (use_single_row_storage(schema)) {
// Single-row storage: always return end iterator (empty range)
static const rows_type empty_rows;
return empty_rows.end();
}
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
if (!r.end()) {
return std::cend(get_rows_storage());
}
return get_rows_storage().lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
}
std::ranges::subrange<mutation_partition::rows_type::const_iterator>
@@ -625,17 +785,32 @@ mutation_partition::range(const schema& schema, const query::clustering_range& r
std::ranges::subrange<mutation_partition::rows_type::iterator>
mutation_partition::range(const schema& schema, const query::clustering_range& r) {
return unconst(_rows, static_cast<const mutation_partition*>(this)->range(schema, r));
if (use_single_row_storage(schema)) {
// Single-row storage: return empty range (rows_entry iteration not applicable)
static rows_type empty_rows;
return std::ranges::subrange(empty_rows.begin(), empty_rows.end());
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->range(schema, r));
}
mutation_partition::rows_type::iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) {
return unconst(_rows, static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
if (use_single_row_storage(schema)) {
// Single-row storage: return end iterator (empty range)
static rows_type empty_rows;
return empty_rows.end();
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
}
mutation_partition::rows_type::iterator
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) {
return unconst(_rows, static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
if (use_single_row_storage(schema)) {
// Single-row storage: return end iterator (empty range)
static rows_type empty_rows;
return empty_rows.end();
}
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
}
template<typename Func>
@@ -1377,7 +1552,15 @@ bool mutation_partition::empty() const
if (_tombstone.timestamp != api::missing_timestamp) {
return false;
}
return !_static_row.size() && _rows.empty() && _row_tombstones.empty();
if (_static_row.size() || !_row_tombstones.empty()) {
return false;
}
if (uses_single_row_storage()) {
return !get_single_row_storage().has_value();
} else {
return get_rows_storage().empty();
}
}
bool
@@ -1422,7 +1605,11 @@ mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_t
uint64_t
mutation_partition::row_count() const {
return _rows.calculate_size();
if (uses_single_row_storage()) {
return get_single_row_storage().has_value() ? 1 : 0;
} else {
return get_rows_storage().calculate_size();
}
}
rows_entry::rows_entry(rows_entry&& o) noexcept
@@ -2219,15 +2406,22 @@ public:
mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const schema& s, tombstone t)
: _tombstone(t)
, _static_row_continuous(!s.has_static_columns())
, _rows()
, _rows(use_single_row_storage(s) ?
rows_storage_type(std::optional<deletable_row>{}) :
rows_storage_type(rows_type{}))
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
_rows.insert_before(_rows.end(), std::move(e));
if (use_single_row_storage(s)) {
// Single-row storage: no dummy entries needed, leave row as empty optional
} else {
// Multi-row storage: add last dummy entry for discontinuous partition
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
}
}
bool mutation_partition::is_fully_continuous() const {

View File

@@ -9,6 +9,7 @@
#pragma once
#include <iosfwd>
#include <variant>
#include <boost/intrusive/parent_from_member.hpp>
#include <seastar/util/optimized_optional.hh>
@@ -1188,6 +1189,12 @@ inline void check_row_key(const schema& s, position_in_partition_view pos, is_du
}
}
// Returns true if the schema has no clustering keys, meaning partitions can have at most one row.
// When true, mutation_partition uses std::optional<deletable_row> instead of full rows_type container.
inline bool use_single_row_storage(const schema& s) {
return s.clustering_key_size() == 0;
}
// Represents a set of writes made to a single partition.
//
// The object is schema-dependent. Each instance is governed by some
@@ -1228,20 +1235,45 @@ inline void check_row_key(const schema& s, position_in_partition_view pos, is_du
class mutation_partition final {
public:
using rows_type = rows_entry::container_type;
using rows_storage_type = std::variant<rows_type, std::optional<deletable_row>>;
friend class size_calculator;
private:
tombstone _tombstone;
lazy_row _static_row;
bool _static_row_continuous = true;
rows_type _rows;
rows_storage_type _rows;
// Contains only strict prefixes so that we don't have to lookup full keys
// in both _row_tombstones and _rows.
// Note: empty when using single-row storage (std::optional<deletable_row> variant)
range_tombstone_list _row_tombstones;
#ifdef SEASTAR_DEBUG
table_schema_version _schema_version;
#endif
friend class converting_mutation_partition_applier;
// Returns true if this partition uses single-row storage
bool uses_single_row_storage() const {
return std::holds_alternative<std::optional<deletable_row>>(_rows);
}
// Get reference to rows container (multi-row storage)
rows_type& get_rows_storage() {
return std::get<rows_type>(_rows);
}
const rows_type& get_rows_storage() const {
return std::get<rows_type>(_rows);
}
// Get reference to single row storage
std::optional<deletable_row>& get_single_row_storage() {
return std::get<std::optional<deletable_row>>(_rows);
}
const std::optional<deletable_row>& get_single_row_storage() const {
return std::get<std::optional<deletable_row>>(_rows);
}
public:
struct copy_comparators_only {};
struct incomplete_tag {};
@@ -1251,14 +1283,14 @@ public:
return mutation_partition(incomplete_tag(), s, t);
}
mutation_partition(const schema& s)
: _rows()
: _rows(use_single_row_storage(s) ? rows_storage_type(std::optional<deletable_row>{}) : rows_storage_type(rows_type{}))
, _row_tombstones(s)
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{ }
mutation_partition(mutation_partition& other, copy_comparators_only)
: _rows()
: _rows(other._rows.index() == 0 ? rows_storage_type(rows_type{}) : rows_storage_type(std::optional<deletable_row>{}))
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(other._schema_version)
@@ -1269,6 +1301,8 @@ public:
mutation_partition(const mutation_partition&, const schema&, query::clustering_key_filter_ranges);
mutation_partition(mutation_partition&&, const schema&, query::clustering_key_filter_ranges);
~mutation_partition();
// Returns the mutation_partition containing the given rows_type.
// Can only be used when the mutation_partition uses multi-row storage.
static mutation_partition& container_of(rows_type&);
mutation_partition& operator=(mutation_partition&& x) noexcept;
bool equal(const schema&, const mutation_partition&) const;
@@ -1462,9 +1496,31 @@ public:
const lazy_row& static_row() const { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
const rows_type& clustered_rows() const noexcept { return _rows; }
utils::immutable_collection<rows_type> clustered_rows() noexcept { return _rows; }
rows_type& mutable_clustered_rows() noexcept { return _rows; }
// For single-row storage (clustering_key_size() == 0), returns an empty container.
// Callers should check uses_single_row_storage() and use get_single_row() for single-row case.
const rows_type& clustered_rows() const noexcept {
if (uses_single_row_storage()) {
static const rows_type empty_rows;
return empty_rows;
}
return get_rows_storage();
}
utils::immutable_collection<rows_type> clustered_rows() noexcept {
return const_cast<const mutation_partition*>(this)->clustered_rows();
}
rows_type& mutable_clustered_rows() noexcept {
// Should only be called when NOT using single-row storage
return get_rows_storage();
}
// Access the single row when using single-row storage (clustering_key_size() == 0)
const std::optional<deletable_row>& get_single_row() const {
return get_single_row_storage();
}
std::optional<deletable_row>& get_single_row() {
return get_single_row_storage();
}
const range_tombstone_list& row_tombstones() const noexcept { return _row_tombstones; }
utils::immutable_collection<range_tombstone_list> row_tombstones() noexcept { return _row_tombstones; }
@@ -1482,8 +1538,14 @@ public:
rows_type::iterator upper_bound(const schema& schema, const query::clustering_range& r);
std::ranges::subrange<rows_type::iterator> range(const schema& schema, const query::clustering_range& r);
// Returns an iterator range of rows_entry, with only non-dummy entries.
// For single-row storage, returns an empty range.
auto non_dummy_rows() const {
return std::ranges::subrange(_rows.begin(), _rows.end())
if (uses_single_row_storage()) {
static const rows_type empty_rows;
return std::ranges::subrange(empty_rows.begin(), empty_rows.end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
return std::ranges::subrange(get_rows_storage().begin(), get_rows_storage().end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
void accept(const schema&, mutation_partition_visitor&) const;
@@ -1517,7 +1579,21 @@ private:
inline
mutation_partition& mutation_partition::container_of(rows_type& rows) {
return *boost::intrusive::get_parent_from_member(&rows, &mutation_partition::_rows);
// This method can only be called when using multi-row storage (rows_type variant alternative).
// With std::variant, when rows_type is the active alternative (index 0), it's stored at the beginning of the variant.
// We can use pointer arithmetic to get back to the mutation_partition.
// Calculate offset from rows_type to the containing variant
// The rows reference should be the active rows_type inside the variant
static_assert(std::is_same_v<std::variant_alternative_t<0, rows_storage_type>, rows_type>,
"rows_type must be the first alternative in rows_storage_type");
// Get address of the variant containing this rows_type
// When rows_type is active (index 0), it's at offset 0 in the variant's storage
rows_storage_type* variant_ptr = reinterpret_cast<rows_storage_type*>(&rows);
// Now get the mutation_partition from the variant
return *boost::intrusive::get_parent_from_member(variant_ptr, &mutation_partition::_rows);
}
bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tombstone tomb = tombstone(),

View File

@@ -1195,6 +1195,8 @@ private:
rlogger.info("{}", msg);
throw std::runtime_error(msg);
}
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
_rs._repair_compaction_locks[_frozen_topology_guard].push_back(std::move(lock_holder));

View File

@@ -84,6 +84,10 @@ class compaction_group {
seastar::named_gate _async_gate;
// Gates flushes.
seastar::named_gate _flush_gate;
// Gates sstable being added to the group.
// This prevents the group from being considered empty when sstables are being added.
// Crucial for tablet split which ACKs split for a table when all pre-split groups are empty.
seastar::named_gate _sstable_add_gate;
bool _tombstone_gc_enabled = true;
std::optional<compaction::compaction_backlog_tracker> _backlog_tracker;
repair_classifier_func _repair_sstable_classifier;
@@ -248,6 +252,10 @@ public:
return _flush_gate;
}
seastar::named_gate& sstable_add_gate() noexcept {
return _sstable_add_gate;
}
compaction::compaction_manager& get_compaction_manager() noexcept;
const compaction::compaction_manager& get_compaction_manager() const noexcept;
@@ -434,7 +442,7 @@ public:
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) = 0;
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;

View File

@@ -604,9 +604,28 @@ public:
data_dictionary::table as_data_dictionary() const;
// The usage of these functions are restricted to preexisting sstables that aren't being
// moved anywhere, so should never be used in the context of file streaming and intra
// node migration. The only user today is distributed loader, which populates the
// sstables for each column family on boot.
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
// Restricted to new sstables produced by external processes such as repair.
// The sstable might undergo split if table is in split mode.
// If no need for split, the input sstable will only be attached to the sstable set.
// If split happens, the output sstables will be attached and the input sstable unlinked.
// On failure, the input sstable is unlinked and exception propagated to the caller.
// The on_add callback will be called on all sstables to be added into the set.
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
std::function<future<>(sstables::shared_sstable)> on_add,
sstables::offstrategy offstrategy = sstables::offstrategy::no);
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
std::function<future<>(sstables::shared_sstable)> on_add);
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
sstables::shared_sstable make_sstable();
void set_truncation_time(db_clock::time_point truncated_at) noexcept {
@@ -724,7 +743,9 @@ private:
return _config.enable_cache && _schema->caching_options().enabled();
}
void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept;
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy, bool trigger_compaction);
// This function can throw even if the sstable was added into the set. When the sstable was successfully
// added, the sstable ptr @sst will be set to nullptr. Allowing caller to optionally discard the sstable.
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy, bool trigger_compaction);
future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction);
// Helpers which add sstable on behalf of a compaction group and refreshes compound set.
void add_sstable(compaction_group& cg, sstables::shared_sstable sstable);
@@ -1358,7 +1379,8 @@ public:
// Clones storage of a given tablet. Memtable is flushed first to guarantee that the
// snapshot (list of sstables) will include all the data written up to the time it was taken.
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid);
// If leave_unsealead is set, all the destination sstables will be left unsealed.
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed);
friend class compaction_group;
friend class compaction::compaction_task_impl;

View File

@@ -721,7 +721,7 @@ public:
bool all_storage_groups_split() override { return true; }
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); }
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override {
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override {
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
}
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
@@ -879,7 +879,7 @@ public:
bool all_storage_groups_split() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override;
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override;
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
return tablet_map().get_token_range_after_split(token);
}
@@ -1130,7 +1130,8 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
}
future<std::vector<sstables::shared_sstable>>
tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) {
tablet_storage_group_manager::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
co_await utils::get_local_injector().inject("maybe_split_new_sstable_wait", utils::wait_for_message(120s));
if (!tablet_map().needs_split()) {
co_return std::vector<sstables::shared_sstable>{sst};
}
@@ -1138,8 +1139,7 @@ tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable
auto& cg = compaction_group_for_sstable(sst);
auto holder = cg.async_gate().hold();
auto& view = cg.view_for_sstable(sst);
auto lock_holder = co_await _t.get_compaction_manager().get_incremental_repair_read_lock(view, "maybe_split_sstable");
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, view, co_await split_compaction_options());
co_return co_await _t.get_compaction_manager().maybe_split_new_sstable(sst, view, co_await split_compaction_options());
}
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
@@ -1149,7 +1149,7 @@ future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
future<std::vector<sstables::shared_sstable>> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
auto holder = async_gate().hold();
co_return co_await _sg_manager->maybe_split_sstable(sst);
co_return co_await _sg_manager->maybe_split_new_sstable(sst);
}
dht::token_range table::get_token_range_after_split(const dht::token& token) const noexcept {
@@ -1330,7 +1330,7 @@ future<utils::chunked_vector<sstables::shared_sstable>> table::take_sstable_set_
}
future<utils::chunked_vector<sstables::entry_descriptor>>
table::clone_tablet_storage(locator::tablet_id tid) {
table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) {
utils::chunked_vector<sstables::entry_descriptor> ret;
auto holder = async_gate().hold();
@@ -1342,7 +1342,7 @@ table::clone_tablet_storage(locator::tablet_id tid) {
// by compaction while we are waiting for the lock.
auto deletion_guard = co_await get_sstable_list_permit();
co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
ret.push_back(co_await sst->clone(calculate_generation_for_new_table()));
ret.push_back(co_await sst->clone(calculate_generation_for_new_table(), leave_unsealed));
});
co_return ret;
}
@@ -1354,10 +1354,10 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no
}
future<>
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy,
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy offstrategy,
bool trigger_compaction) {
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept {
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () mutable noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
// atomically load all opened sstables into column family.
if (!offstrategy) {
@@ -1369,6 +1369,8 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss
if (trigger_compaction) {
try_trigger_compaction(cg);
}
// Reseting sstable ptr to inform the caller the sstable has been loaded successfully.
sst = nullptr;
}), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}), [sst, schema = _schema] (const dht::decorated_key& key) {
return sst->filter_has_key(sstables::key::from_partition_key(*schema, key.key()));
});
@@ -1376,12 +1378,10 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss
future<>
table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
for (auto sst : co_await maybe_split_new_sstable(new_sst)) {
auto& cg = compaction_group_for_sstable(sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction);
}
auto& cg = compaction_group_for_sstable(new_sst);
// Hold gate to make share compaction group is alive.
auto holder = cg.async_gate().hold();
co_await do_add_sstable_and_update_cache(cg, new_sst, offstrategy, trigger_compaction);
}
future<>
@@ -1399,6 +1399,85 @@ table::add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>
trigger_compaction();
}
future<std::vector<sstables::shared_sstable>>
table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
std::function<future<>(sstables::shared_sstable)> on_add,
sstables::offstrategy offstrategy) {
std::vector<sstables::shared_sstable> ret, ssts;
std::exception_ptr ex;
try {
bool trigger_compaction = offstrategy == sstables::offstrategy::no;
auto& cg = compaction_group_for_sstable(new_sst);
// This prevents compaction group from being considered empty until the holder is released.
// Helpful for tablet split, where split is acked for a table when all pre-split groups are empty.
auto sstable_add_holder = cg.sstable_add_gate().hold();
ret = ssts = co_await maybe_split_new_sstable(new_sst);
// on sucessful split, input sstable is unlinked.
new_sst = nullptr;
for (auto& sst : ssts) {
auto& cg = compaction_group_for_sstable(sst);
// Hold gate to make sure compaction group is alive.
auto holder = cg.async_gate().hold();
co_await on_add(sst);
// If do_add_sstable_and_update_cache() throws after sstable has been loaded, the pointer
// sst passed by reference will be set to nullptr, so it won't be unlinked in the exception
// handler below.
co_await do_add_sstable_and_update_cache(cg, sst, offstrategy, trigger_compaction);
sst = nullptr;
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
// on failed split, input sstable is unlinked here.
if (new_sst) {
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", new_sst->get_filename(), new_sst->get_origin(), ex);
co_await new_sst->unlink();
}
// on failure after sucessful split, sstables not attached yet will be unlinked
co_await coroutine::parallel_for_each(ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
if (sst) {
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
co_await sst->unlink();
}
});
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return std::move(ret);
}
future<std::vector<sstables::shared_sstable>>
table::add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
std::function<future<>(sstables::shared_sstable)> on_add) {
std::exception_ptr ex;
std::vector<sstables::shared_sstable> ret;
// We rely on add_new_sstable_and_update_cache() to unlink the sstable feeded into it,
// so the exception handling below will only have to unlink sstables not processed yet.
try {
for (auto& sst: new_ssts) {
auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add);
std::ranges::move(ssts, std::back_inserter(ret));
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
if (sst) {
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
co_await sst->unlink();
}
});
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return std::move(ret);
}
future<>
table::update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts) {
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
@@ -2612,8 +2691,8 @@ public:
sstables::sstables_manager& get_sstables_manager() noexcept override {
return _t.get_sstables_manager();
}
sstables::shared_sstable make_sstable() const override {
return _t.make_sstable();
sstables::shared_sstable make_sstable(sstables::sstable_state state) const override {
return _t.make_sstable(state);
}
sstables::sstable_writer_config configure_writer(sstring origin) const override {
auto cfg = _t.get_sstables_manager().configure_writer(std::move(origin));
@@ -2731,6 +2810,7 @@ future<> compaction_group::stop(sstring reason) noexcept {
auto flush_future = co_await seastar::coroutine::as_future(flush());
co_await _flush_gate.close();
co_await _sstable_add_gate.close();
// FIXME: indentation
_compaction_disabler_for_views.clear();
co_await utils::get_local_injector().inject("compaction_group_stop_wait", utils::wait_for_message(60s));
@@ -2744,7 +2824,7 @@ future<> compaction_group::stop(sstring reason) noexcept {
}
bool compaction_group::empty() const noexcept {
return _memtables->empty() && live_sstable_count() == 0;
return _memtables->empty() && live_sstable_count() == 0 && _sstable_add_gate.get_count() == 0;
}
const schema_ptr& compaction_group::schema() const {
@@ -3200,7 +3280,7 @@ db::replay_position table::highest_flushed_replay_position() const {
}
struct manifest_json : public json::json_base {
json::json_chunked_list<sstring> files;
json::json_chunked_list<std::string_view> files;
manifest_json() {
register_params();
@@ -3224,7 +3304,7 @@ table::seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets)
manifest_json manifest;
for (const auto& fsp : file_sets) {
for (auto& rf : *fsp) {
manifest.files.push(std::move(rf));
manifest.files.push(std::string_view(rf));
}
}
auto streamer = json::stream_object(std::move(manifest));

View File

@@ -390,9 +390,11 @@ dark_green = (195, 215, 195)
light_red = (255, 200, 200)
light_green = (200, 255, 200)
light_gray = (240, 240, 240)
scylla_blue = (87, 209, 229)
tablet_colors = {
(Tablet.STATE_NORMAL, None): GRAY,
(Tablet.STATE_NORMAL, 'repair'): scylla_blue,
(Tablet.STATE_JOINING, 'allow_write_both_read_old'): dark_green,
(Tablet.STATE_LEAVING, 'allow_write_both_read_old'): dark_red,
(Tablet.STATE_JOINING, 'write_both_read_old'): dark_green,
@@ -532,6 +534,8 @@ def update_from_cql(initial=False):
state = (Tablet.STATE_JOINING, tablet.stage)
elif replica in leaving:
state = (Tablet.STATE_LEAVING, tablet.stage)
elif tablet.stage == 'repair':
state = (Tablet.STATE_NORMAL, tablet.stage)
else:
state = (Tablet.STATE_NORMAL, None)

View File

@@ -224,7 +224,13 @@ future<> service::client_state::has_access(const sstring& ks, auth::command_desc
ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser.");
}
if (cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) {
static const std::unordered_set<auth::resource> vector_search_system_resources = {
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY),
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::VERSIONS),
};
if ((cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) ||
(cmd.permission == auth::permission::SELECT && vector_search_system_resources.contains(cmd.resource))) {
co_return co_await ensure_has_permission<auth::command_desc_with_permission_set>({auth::permission_set::of<auth::permission::SELECT, auth::permission::VECTOR_SEARCH_INDEXING>(), cmd.resource});

View File

@@ -6526,14 +6526,19 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
leaving.host, pending.host));
}
auto d = co_await smp::submit_to(leaving.shard, [this, tablet] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
// All sstables cloned locally will be left unsealed, until they're loaded into the table.
// This is to guarantee no unsplit sstables will be left sealed on disk, which could
// cause problems if unsplit sstables are found after split was ACKed to coordinator.
bool leave_unsealed = true;
auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
auto& table = _db.local().find_column_family(tablet.table);
auto op = table.stream_in_progress();
co_return co_await table.clone_tablet_storage(tablet.tablet);
co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed);
});
rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size());
auto load_sstable = [] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto load_sstable = [leave_unsealed] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
auto& mng = t.get_sstables_manager();
auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal),
d.version, d.format, db_clock::now(), default_io_error_handler_gen());
@@ -6541,7 +6546,8 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
// will still point to leaving replica at this stage in migration. If node goes down,
// SSTables will be loaded at pending replica and migration is retried, so correctness
// wise, we're good.
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true };
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true,
.unsealed_sstable = leave_unsealed };
co_await sst->load(sharder, cfg);
co_return sst;
};
@@ -6549,16 +6555,23 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> {
// Loads cloned sstables from leaving replica into pending one.
auto& table = _db.local().find_column_family(tablet.table);
auto& sstm = table.get_sstables_manager();
auto op = table.stream_in_progress();
dht::auto_refreshing_sharder sharder(table.shared_from_this());
std::vector<sstables::shared_sstable> ssts;
ssts.reserve(d.size());
std::unordered_set<sstables::shared_sstable> ssts;
for (auto&& sst_desc : d) {
ssts.push_back(co_await load_sstable(sharder, table, std::move(sst_desc)));
ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc)));
}
co_await table.add_sstables_and_update_cache(ssts);
_view_building_worker.local().load_sstables(tablet.table, ssts);
auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (ssts.contains(loading_sst)) {
auto cfg = sstm.configure_writer(loading_sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add);
_view_building_worker.local().load_sstables(tablet.table, loaded_ssts);
});
rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending);
}

View File

@@ -1931,6 +1931,10 @@ public:
const auto& table_groups = _tm->tablets().all_table_groups();
auto finalize_decision = [&] {
if (utils::get_local_injector().enter("tablet_resize_finalization_postpone")) {
return;
}
_stats.for_cluster().resizes_finalized++;
resize_plan.finalize_resize.insert(table);
};

View File

@@ -2623,7 +2623,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await _voter_handler.on_node_removed(replaced_node_id, _as);
}
}
utils::get_local_injector().inject("crash_coordinator_before_stream", [] { abort(); });
utils::get_local_injector().inject("crash_coordinator_before_stream", [] {
rtlogger.info("crash_coordinator_before_stream: aborting");
abort();
});
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
auto state = node.rs->state;
try {

View File

@@ -1696,7 +1696,9 @@ void writer::consume_end_of_stream() {
.map = _collector.get_ext_timestamp_stats()
});
_sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats));
_sst.seal_sstable(_cfg.backup).get();
if (!_cfg.leave_unsealed) {
_sst.seal_sstable(_cfg.backup).get();
}
}
uint64_t writer::data_file_position_for_tests() const {

View File

@@ -83,6 +83,8 @@ struct sstable_open_config {
bool current_shard_as_sstable_owner = false;
// Do not move the sharding metadata to the sharder, keeping it in the scylla metadata..
bool keep_sharding_metadata = false;
// Allows unsealed sstable to be loaded, since it must read components from temporary TOC instead.
bool unsealed_sstable = false;
};
}

View File

@@ -836,13 +836,14 @@ future<std::vector<sstring>> sstable::read_and_parse_toc(file f) {
// This is small enough, and well-defined. Easier to just read it all
// at once
future<> sstable::read_toc() noexcept {
future<> sstable::read_toc(sstable_open_config cfg) noexcept {
if (_recognized_components.size()) {
co_return;
}
try {
co_await do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> {
auto toc_type = cfg.unsealed_sstable ? component_type::TemporaryTOC : component_type::TOC;
co_await do_read_simple(toc_type, [&] (version_types v, file f) -> future<> {
auto comps = co_await read_and_parse_toc(f);
for (auto& c: comps) {
// accept trailing newlines
@@ -900,8 +901,8 @@ future<std::unordered_map<component_type, file>> sstable::readable_file_for_all_
co_return std::move(files);
}
future<entry_descriptor> sstable::clone(generation_type new_generation) const {
co_await _storage->snapshot(*this, _storage->prefix(), storage::absolute_path::yes, new_generation);
future<entry_descriptor> sstable::clone(generation_type new_generation, bool leave_unsealed) const {
co_await _storage->snapshot(*this, _storage->prefix(), storage::absolute_path::yes, new_generation, storage::leave_unsealed(leave_unsealed));
co_return entry_descriptor(new_generation, _version, _format, component_type::TOC, _state);
}
@@ -1725,7 +1726,7 @@ void sstable::disable_component_memory_reload() {
}
future<> sstable::load_metadata(sstable_open_config cfg) noexcept {
co_await read_toc();
co_await read_toc(cfg);
// read scylla-meta after toc. Might need it to parse
// rest (hint extensions)
co_await read_scylla_metadata();
@@ -3960,11 +3961,13 @@ class sstable_stream_sink_impl : public sstable_stream_sink {
shared_sstable _sst;
component_type _type;
bool _last_component;
bool _leave_unsealed;
public:
sstable_stream_sink_impl(shared_sstable sst, component_type type, bool last_component)
sstable_stream_sink_impl(shared_sstable sst, component_type type, sstable_stream_sink_cfg cfg)
: _sst(std::move(sst))
, _type(type)
, _last_component(last_component)
, _last_component(cfg.last_component)
, _leave_unsealed(cfg.leave_unsealed)
{}
private:
future<> load_metadata() const {
@@ -4011,10 +4014,12 @@ public:
co_return co_await make_file_output_stream(std::move(f), stream_options);
}
future<shared_sstable> close_and_seal() override {
future<shared_sstable> close() override {
if (_last_component) {
// If we are the last component in a sequence, we can seal the table.
co_await _sst->_storage->seal(*_sst);
if (!_leave_unsealed) {
co_await _sst->_storage->seal(*_sst);
}
co_return std::move(_sst);
}
_sst = {};
@@ -4031,7 +4036,7 @@ public:
}
};
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, bool last_component) {
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, sstable_stream_sink_cfg cfg) {
auto desc = parse_path(component_filename, schema->ks_name(), schema->cf_name());
auto sst = sstm.make_sstable(schema, s_opts, desc.generation, state, desc.version, desc.format);
@@ -4042,7 +4047,7 @@ std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstab
type = component_type::TemporaryTOC;
}
return std::make_unique<sstable_stream_sink_impl>(std::move(sst), type, last_component);
return std::make_unique<sstable_stream_sink_impl>(std::move(sst), type, cfg);
}
generation_type

View File

@@ -109,6 +109,7 @@ struct sstable_writer_config {
size_t promoted_index_auto_scale_threshold;
uint64_t max_sstable_size = std::numeric_limits<uint64_t>::max();
bool backup = false;
bool leave_unsealed = false;
mutation_fragment_stream_validation_level validation_level;
std::optional<db::replay_position> replay_position;
std::optional<int> sstable_level;
@@ -417,8 +418,8 @@ public:
return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
}
component_name get_filename() const {
return component_name(*this, component_type::Data);
component_name get_filename(component_type f = component_type::Data) const {
return component_name(*this, f);
}
component_name toc_filename() const {
@@ -693,7 +694,7 @@ private:
future<> update_info_for_opened_data(sstable_open_config cfg = {});
future<> read_toc() noexcept;
future<> read_toc(sstable_open_config cfg = {}) noexcept;
future<> read_summary() noexcept;
void write_summary() {
@@ -1069,8 +1070,9 @@ public:
future<std::unordered_map<component_type, file>> readable_file_for_all_components() const;
// Clones this sstable with a new generation, under the same location as the original one.
// If leave_unsealed is true, the destination sstable is left unsealed.
// Implementation is underlying storage specific.
future<entry_descriptor> clone(generation_type new_generation) const;
future<entry_descriptor> clone(generation_type new_generation, bool leave_unsealed = false) const;
struct lesser_reclaimed_memory {
// comparator class to be used by the _reclaimed set in sstables manager
@@ -1244,13 +1246,18 @@ public:
// closes this component. If this is the last component in a set (see "last_component" in creating method below)
// the table on disk will be sealed.
// Returns sealed sstable if last, or nullptr otherwise.
virtual future<shared_sstable> close_and_seal() = 0;
virtual future<shared_sstable> close() = 0;
virtual future<> abort() = 0;
};
struct sstable_stream_sink_cfg {
bool last_component = false;
bool leave_unsealed = false;
};
// Creates a sink object which can receive a component file sourced from above source object data.
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, bool last_component);
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, sstable_stream_sink_cfg cfg);
} // namespace sstables

View File

@@ -50,7 +50,14 @@ class filesystem_storage final : public sstables::storage {
std::optional<std::filesystem::path> _temp_dir; // Valid while the sstable is being created, until sealed
private:
using mark_for_removal = bool_class<class mark_for_removal_tag>;
struct mark_for_removal_tag {};
struct leave_unsealed_tag {};
enum class link_mode {
default_mode,
mark_for_removal,
leave_unsealed,
};
template <typename Comp>
requires std::is_same_v<Comp, component_type> || std::is_same_v<Comp, sstring>
@@ -61,7 +68,9 @@ private:
future<> check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
future<> remove_temp_dir();
virtual future<> create_links(const sstable& sst, const std::filesystem::path& dir) const override;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal mark_for_removal) const;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, link_mode mode) const;
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal_tag) const;
future<> create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen, leave_unsealed_tag) const;
future<> create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> dst_gen) const;
future<> touch_temp_dir(const sstable& sst);
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay) override;
@@ -83,7 +92,7 @@ public:
{}
virtual future<> seal(const sstable& sst) override;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const override;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed) const override;
virtual future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
virtual void open(sstable& sst) override;
@@ -356,8 +365,13 @@ future<> filesystem_storage::check_create_links_replay(const sstable& sst, const
/// \param sst - the sstable to work on
/// \param dst_dir - the destination directory.
/// \param generation - the generation of the destination sstable
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, mark_for_removal mark_for_removal) const {
/// \param mode - what will be done after all components were linked
/// mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
/// leave_unsealed - leaves the destination sstable unsealed
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, link_mode mode) const {
// They're mutually exclusive, so we can assume only one is set.
bool mark_for_removal = mode == link_mode::mark_for_removal;
bool leave_unsealed = mode == link_mode::leave_unsealed;
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", sst.get_filename(), dst_dir, generation, mark_for_removal);
auto comps = sst.all_components();
co_await check_create_links_replay(sst, dst_dir, generation, comps);
@@ -366,7 +380,11 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
co_await sst.sstable_write_io_check(idempotent_link_file, fmt::to_string(sst.filename(component_type::TOC)), std::move(dst));
auto dir = opened_directory(dst_dir);
co_await dir.sync(sst._write_error_handler);
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation] (auto p) {
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation, leave_unsealed] (auto p) {
// Skips the linking of TOC file if the destination will be left unsealed.
if (leave_unsealed && p.first == component_type::TOC) {
return make_ready_future<>();
}
auto src = filename(sst, _dir.native(), sst._generation, p.second);
auto dst = filename(sst, dst_dir, generation, p.second);
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
@@ -379,9 +397,10 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
auto src_temp_toc = filename(sst, _dir.native(), sst._generation, component_type::TemporaryTOC);
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
co_await _dir.sync(sst._write_error_handler);
} else {
} else if (!leave_unsealed) {
// Now that the source sstable is linked to dir, remove
// the TemporaryTOC file at the destination.
// This is bypassed if destination will be left unsealed.
co_await sst.sstable_write_io_check(remove_file, std::move(dst_temp_toc));
}
co_await dir.sync(sst._write_error_handler);
@@ -389,15 +408,23 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
sstlog.trace("create_links: {} -> {} generation={}: done", sst.get_filename(), dst_dir, generation);
}
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal_tag) const {
return create_links_common(sst, dst_dir, dst_gen, link_mode::mark_for_removal);
}
future<> filesystem_storage::create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen, leave_unsealed_tag) const {
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), link_mode::leave_unsealed);
}
future<> filesystem_storage::create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen) const {
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), mark_for_removal::no);
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), link_mode::default_mode);
}
future<> filesystem_storage::create_links(const sstable& sst, const std::filesystem::path& dir) const {
return create_links_common(sst, dir.native(), sst._generation, mark_for_removal::no);
return create_links_common(sst, dir.native(), sst._generation, link_mode::default_mode);
}
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed leave_unsealed) const {
std::filesystem::path snapshot_dir;
if (abs) {
snapshot_dir = dir;
@@ -405,7 +432,11 @@ future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_
snapshot_dir = _dir.path() / dir;
}
co_await sst.sstable_touch_directory_io_check(snapshot_dir);
co_await create_links_common(sst, snapshot_dir, std::move(gen));
if (leave_unsealed) {
co_await create_links_common(sst, snapshot_dir, std::move(gen), leave_unsealed_tag{});
} else {
co_await create_links_common(sst, snapshot_dir, std::move(gen));
}
}
future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
@@ -413,7 +444,7 @@ future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generatio
sstring old_dir = _dir.native();
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal::yes);
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal_tag{});
co_await change_dir(new_dir);
generation_type old_generation = sst._generation;
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
@@ -598,7 +629,7 @@ public:
{}
future<> seal(const sstable& sst) override;
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type>) const override;
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type>, storage::leave_unsealed) const override;
future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
// runs in async context
void open(sstable& sst) override;
@@ -815,7 +846,7 @@ future<> object_storage_base::unlink_component(const sstable& sst, component_typ
}
}
future<> object_storage_base::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
future<> object_storage_base::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed) const {
on_internal_error(sstlog, "Snapshotting S3 objects not implemented");
co_return;
}

View File

@@ -97,9 +97,10 @@ public:
using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
using sync_dir = bool_class<struct sync_dir_tag>; // meaningful only to filesystem storage
using leave_unsealed = bool_class<struct leave_unsealed_tag>;
virtual future<> seal(const sstable& sst) = 0;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen = {}) const = 0;
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen = {}, leave_unsealed lu = leave_unsealed::no) const = 0;
virtual future<> change_state(const sstable& sst, sstable_state to, generation_type generation, delayed_commit_changes* delay) = 0;
// runs in async context
virtual void open(sstable& sst) = 0;

View File

@@ -63,30 +63,45 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
}
schema_ptr s = reader.schema();
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
auto cfg = cf->get_sstables_manager().configure_writer(origin);
cfg.leave_unsealed = true;
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cfg, encoding_stats{}).then([sst] {
return sst->open_data();
}).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard] -> future<> {
if (repaired_at && sstables::repair_origin == origin) {
sst->being_repaired = frozen_guard;
if (sstable_list_to_mark_as_repaired) {
sstable_list_to_mark_as_repaired->insert(sst);
}).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] -> future<std::vector<sstables::shared_sstable>> {
auto on_add = [sst, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] (sstables::shared_sstable loading_sst) -> future<> {
if (repaired_at && sstables::repair_origin == origin) {
loading_sst->being_repaired = frozen_guard;
if (sstable_list_to_mark_as_repaired) {
sstable_list_to_mark_as_repaired->insert(loading_sst);
}
}
}
if (loading_sst == sst) {
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
if (offstrategy && sstables::repair_origin == origin) {
sstables::sstlog.debug("Enabled automatic off-strategy trigger for table {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name());
cf->enable_off_strategy_trigger();
}
co_await cf->add_sstable_and_update_cache(sst, offstrategy);
}).then([cf, s, sst, use_view_update_path, &vb, &vbw]() mutable -> future<> {
co_return co_await cf->add_new_sstable_and_update_cache(sst, on_add, offstrategy);
}).then([cf, s, sst, use_view_update_path, &vb, &vbw] (std::vector<sstables::shared_sstable> new_sstables) mutable -> future<> {
auto& vb_ = vb;
auto new_sstables_ = std::move(new_sstables);
auto table = cf;
if (use_view_update_path == db::view::sstable_destination_decision::staging_managed_by_vbc) {
return vbw.local().register_staging_sstable_tasks({sst}, cf->schema()->id());
co_return co_await vbw.local().register_staging_sstable_tasks(new_sstables_, cf->schema()->id());
} else if (use_view_update_path == db::view::sstable_destination_decision::staging_directly_to_generator) {
return vb.local().register_staging_sstable(sst, std::move(cf));
co_await coroutine::parallel_for_each(new_sstables_, [&vb_, &table] (sstables::shared_sstable sst) -> future<> {
return vb_.local().register_staging_sstable(sst, table);
});
}
return make_ready_future<>();
co_return;
});
};
if (!offstrategy) {

View File

@@ -52,8 +52,16 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d
auto erm = t.get_effective_replication_map();
auto& sstm = t.get_sstables_manager();
auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format);
co_await sst->load(erm->get_sharder(*t.schema()));
co_await t.add_sstable_and_update_cache(sst);
sstables::sstable_open_config cfg { .unsealed_sstable = true };
co_await sst->load(erm->get_sharder(*t.schema()), cfg);
auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto cfg = sstm.configure_writer(sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add);
blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename());
if (state == sstables::sstable_state::staging) {
@@ -64,7 +72,7 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d
// so then, the view building coordinator can decide to process it once the migration
// is finished.
// (Instead of registering the sstable to view update generator which may process it immediately.)
co_await sharded_vbw.local().register_staging_sstable_tasks({sst}, t.schema()->id());
co_await sharded_vbw.local().register_staging_sstable_tasks(new_sstables, t.schema()->id());
}
});
}
@@ -343,7 +351,11 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work
auto& table = db.find_column_family(meta.table);
auto& sstm = table.get_sstables_manager();
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, meta.fops == file_ops::load_sstables);
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables,
.leave_unsealed = true };
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg);
auto out = co_await sstable_sink->output(foptions, stream_options);
co_return output_result{
[sstable_sink = std::move(sstable_sink), &meta, &db, &vbw](store_result res) -> future<> {
@@ -351,7 +363,7 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work
co_await sstable_sink->abort();
co_return;
}
auto sst = co_await sstable_sink->close_and_seal();
auto sst = co_await sstable_sink->close();
if (sst) {
blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", meta.ops_id, sst->toc_filename(), meta.dst_shard_id);
auto desc = sst->get_descriptor(sstables::component_type::TOC);

View File

@@ -110,7 +110,7 @@ public:
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
virtual reader_permit make_compaction_reader_permit() const override { return _semaphore.make_permit(); }
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
virtual sstables::shared_sstable make_sstable() const override { return _sstable_factory(); }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return _sstable_factory(); }
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return _sst_man.configure_writer(std::move(origin)); }
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }

View File

@@ -387,4 +387,27 @@ SEASTAR_TEST_CASE(select_from_vector_indexed_table) {
enable_tablets(db_config_with_auth()));
}
SEASTAR_TEST_CASE(select_from_vector_search_system_table) {
return do_with_cql_env_thread(
[](auto&& env) {
create_user_if_not_exists(env, bob);
with_user(env, bob, [&env] {
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.group0_history").get(), exceptions::unauthorized_exception,
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
});
with_user(env, bob, [&env] {
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.versions").get(), exceptions::unauthorized_exception,
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
});
cquery_nofail(env, "GRANT VECTOR_SEARCH_INDEXING ON ALL KEYSPACES TO bob");
with_user(env, bob, [&env] {
cquery_nofail(env, "SELECT * FROM system.group0_history");
});
with_user(env, bob, [&env] {
cquery_nofail(env, "SELECT * FROM system.versions");
});
},
db_config_with_auth());
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -6275,11 +6275,11 @@ SEASTAR_TEST_CASE(splitting_compaction_test) {
auto& cm = t->get_compaction_manager();
auto split_opt = compaction::compaction_type_options::split{classify_fn};
auto new_ssts = cm.maybe_split_sstable(input, t.as_compaction_group_view(), split_opt).get();
auto new_ssts = cm.maybe_split_new_sstable(input, t.as_compaction_group_view(), split_opt).get();
BOOST_REQUIRE(new_ssts.size() == expected_output_size);
for (auto& sst : new_ssts) {
// split sstables don't require further split.
auto ssts = cm.maybe_split_sstable(sst, t.as_compaction_group_view(), split_opt).get();
auto ssts = cm.maybe_split_new_sstable(sst, t.as_compaction_group_view(), split_opt).get();
BOOST_REQUIRE(ssts.size() == 1);
BOOST_REQUIRE(ssts.front() == sst);
}
@@ -6291,9 +6291,97 @@ SEASTAR_TEST_CASE(splitting_compaction_test) {
}
return classify_fn(t);
};
BOOST_REQUIRE_THROW(cm.maybe_split_sstable(input, t.as_compaction_group_view(), compaction::compaction_type_options::split{throwing_classifier}).get(),
BOOST_REQUIRE_THROW(cm.maybe_split_new_sstable(input, t.as_compaction_group_view(), compaction::compaction_type_options::split{throwing_classifier}).get(),
std::runtime_error);
});
}
SEASTAR_TEST_CASE(unsealed_sstable_compaction_test) {
BOOST_REQUIRE(smp::count == 1);
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder("tests", "unsealed_sstable_compaction_test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
auto t = env.make_table_for_tests(s);
auto close_t = deferred_stop(t);
t->start();
mutation mut(s, partition_key::from_exploded(*s, {to_bytes("alpha")}));
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
sstable_writer_config sst_cfg = env.manager().configure_writer();
sst_cfg.leave_unsealed = true;
auto unsealed_sstable = make_sstable_easy(env, make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut)), sst_cfg);
BOOST_REQUIRE(file_exists(unsealed_sstable->get_filename(sstables::component_type::TemporaryTOC).format()).get());
auto sst_gen = env.make_sst_factory(s);
auto info = compact_sstables(env, compaction::compaction_descriptor({ unsealed_sstable }), t, sst_gen).get();
BOOST_REQUIRE(info.new_sstables.size() == 1);
});
}
SEASTAR_TEST_CASE(sstable_clone_leaving_unsealed_dest_sstable) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pk = ss.make_pkey();
auto mut1 = mutation(s, pk);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
auto sst = make_sstable_containing(env.make_sstable(s), {std::move(mut1)});
auto table = env.make_table_for_tests(s);
auto close_table = deferred_stop(table);
sstables::sstable_generation_generator gen_generator;
bool leave_unsealed = true;
auto d = sst->clone(gen_generator(), leave_unsealed).get();
auto sst2 = env.make_sstable(s, d.generation, d.version, d.format);
sst2->load(s->get_sharder(), sstable_open_config{ .unsealed_sstable = leave_unsealed }).get();
BOOST_REQUIRE(!file_exists(sst2->get_filename(sstables::component_type::TOC).format()).get());
BOOST_REQUIRE(file_exists(sst2->get_filename(sstables::component_type::TemporaryTOC).format()).get());
leave_unsealed = false;
d = sst->clone(gen_generator(), leave_unsealed).get();
auto sst3 = env.make_sstable(s, d.generation, d.version, d.format);
sst3->load(s->get_sharder(), sstable_open_config{ .unsealed_sstable = leave_unsealed }).get();
BOOST_REQUIRE(file_exists(sst3->get_filename(sstables::component_type::TOC).format()).get());
BOOST_REQUIRE(!file_exists(sst3->get_filename(sstables::component_type::TemporaryTOC).format()).get());
});
}
SEASTAR_TEST_CASE(failure_when_adding_new_sstable_test) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pk = ss.make_pkey();
auto mut1 = mutation(s, pk);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
auto sst = make_sstable_containing(env.make_sstable(s), {mut1});
auto table = env.make_table_for_tests(s);
auto close_table = deferred_stop(table);
auto on_add = [] (sstables::shared_sstable) { throw std::runtime_error("fail to seal"); return make_ready_future<>(); };
BOOST_REQUIRE_THROW(table->add_new_sstable_and_update_cache(sst, on_add).get(), std::runtime_error);
// Verify new sstable was unlinked on failure.
BOOST_REQUIRE(!file_exists(sst->get_filename(sstables::component_type::Data).format()).get());
auto sst2 = make_sstable_containing(env.make_sstable(s), {mut1});
auto sst3 = make_sstable_containing(env.make_sstable(s), {mut1});
BOOST_REQUIRE_THROW(table->add_new_sstables_and_update_cache({sst2, sst3}, on_add).get(), std::runtime_error);
// Verify both sstables are unlinked on failure.
BOOST_REQUIRE(!file_exists(sst2->get_filename(sstables::component_type::Data).format()).get());
BOOST_REQUIRE(!file_exists(sst3->get_filename(sstables::component_type::Data).format()).get());
});
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -17,6 +17,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing import Event
from pathlib import Path
from typing import TYPE_CHECKING
from test import TOP_SRC_DIR, path_to
from test.pylib.runner import testpy_test_fixture_scope
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name
@@ -58,6 +59,20 @@ logger = logging.getLogger(__name__)
print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
async def decode_backtrace(build_mode: str, input: str):
executable = Path(path_to(build_mode, "scylla"))
proc = await asyncio.create_subprocess_exec(
(TOP_SRC_DIR / "seastar" / "scripts" / "seastar-addr2line").absolute(),
"-e",
executable.absolute(),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(input=input.encode())
return f"{stdout.decode()}\n{stderr.decode()}"
def pytest_addoption(parser):
parser.addoption('--manager-api', action='store',
help='Manager unix socket path')
@@ -243,6 +258,11 @@ async def manager(request: pytest.FixtureRequest,
# test failure.
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
failed = failed or found_errors
if failed:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
@@ -266,10 +286,44 @@ async def manager(request: pytest.FixtureRequest,
await manager_client.stop() # Stop client session and close driver after each test
if cluster_status["server_broken"]:
pytest.fail(
f"test case {test_case_name} leave unfinished tasks on Scylla server. Server marked as broken,"
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"
f" server_broken_reason: {cluster_status["message"]}"
)
if found_errors:
full_message = []
for server, data in found_errors.items():
summary = []
detailed = []
if criticals := data.get("critical", []):
summary.append(f"{len(criticals)} critical error(s)")
detailed.extend(map(str.rstrip, criticals))
if backtraces := data.get("backtraces", []):
summary.append(f"{len(backtraces)} backtrace(s)")
with open(failed_test_dir_path / f"scylla-{server.server_id}-backtraces.txt", "w") as bt_file:
for backtrace in backtraces:
bt_file.write(backtrace + "\n\n")
decoded_bt = await decode_backtrace(build_mode, backtrace)
bt_file.write(decoded_bt + "\n\n")
detailed.append(f"{len(backtraces)} backtrace(s) saved in {Path(bt_file.name).name}")
if errors := data.get("error", []):
summary.append(f"{len(errors)} error(s)")
detailed.extend(map(str.rstrip, errors))
if cores := data.get("cores", []):
summary.append(f"{len(cores)} core(s): {', '.join(cores)}")
if summary:
summary_line = f"Server {server.server_id}: found {', '.join(summary)} (log: { data['log']})"
detailed = [f" {line}" for line in detailed]
full_message.append(summary_line)
full_message.extend(detailed)
with open(failed_test_dir_path / "found_errors.txt", "w") as f:
f.write("\n".join(full_message))
pytest.fail(f"\n{'\n'.join(full_message)}")
# "cql" fixture: set up client object for communicating with the CQL API.
# Since connection is managed by manager just return that object

View File

@@ -110,6 +110,9 @@ def fixture_dtest_setup(request: FixtureRequest,
except Exception as e: # noqa: BLE001
logger.error("Error stopping cluster: %s", str(e))
manager.ignore_log_patterns.extend(dtest_setup.ignore_log_patterns)
manager.ignore_cores_log_patterns.extend(dtest_setup.ignore_cores_log_patterns)
try:
if not dtest_setup.allow_log_errors:
exclude_errors = []

View File

@@ -14,7 +14,7 @@ import time
from collections import defaultdict
from functools import cached_property
from functools import wraps
from typing import List, Dict, Callable
from typing import List, Dict, Callable, Optional, Tuple
from cassandra import ConsistencyLevel
from cassandra import WriteTimeout, ReadTimeout, OperationTimedOut
@@ -64,6 +64,7 @@ class Worker:
"""
A single worker increments its dedicated column `s{i}` via LWT:
UPDATE .. SET s{i}=? WHERE pk=? IF <guards on other cols> AND s{i}=?
bump global phase-ops counter via on_applied()
It checks for applied state and retries on "uncertainty" timeouts.
"""
def __init__(
@@ -76,8 +77,10 @@ class Worker:
other_columns: List[int],
get_lower_bound: Callable[[int, int], int],
on_applied: Callable[[int, int, int], None],
stop_event: asyncio.Event
stop_event: asyncio.Event,
counter_update_statement: Optional[PreparedStatement] = None,
counters_random_delta: bool = False,
counters_max_delta: int = 5,
):
self.stop_event = stop_event
self.success_counts: Dict[int, int] = {pk: 0 for pk in pks}
@@ -91,7 +94,11 @@ class Worker:
self.cql = cql
self.get_lower_bound = get_lower_bound
self.on_applied = on_applied
# counters
self.counter_update_statement = counter_update_statement
self.counters_random_delta = counters_random_delta
self.counters_max_delta = max(1, counters_max_delta)
self.counter_deltas: Dict[int, int] = {pk: 0 for pk in pks}
async def verify_update_through_select(self, pk, new_val, prev_val):
"""
@@ -106,6 +113,24 @@ class Worker:
assert current_val == new_val or current_val == prev_val
return current_val == new_val
def _next_counter_delta(self) -> int:
"""
Compute the next delta to apply to the counter table.
If random mode is disabled -> always +1.
If random mode is enabled -> random value from
[-max_delta..-1] U [1..max_delta].
"""
if not self.counters_random_delta:
return 1
# Avoid 0 by choosing magnitude in [1, max] and random sign.
mag = self.rng.randint(1, self.counters_max_delta)
sign = -1 if self.rng.random() < 0.5 else 1
return sign * mag
async def _inc_counter(self, pk: int, delta: int) -> None:
stmt = self.counter_update_statement.bind([delta, pk])
stmt.consistency_level = ConsistencyLevel.LOCAL_QUORUM
await self.cql.run_async(stmt)
def stop(self) -> None:
self.stop_event.set()
@@ -170,6 +195,11 @@ class Worker:
self.on_applied(pk, self.worker_id, new_val)
self.success_counts[pk] += 1
if self.counter_update_statement:
delta = self._next_counter_delta()
self.counter_deltas[pk] += delta
await self._inc_counter(pk, delta)
await asyncio.sleep(0.1)
except Exception:
@@ -187,7 +217,8 @@ class BaseLWTTester:
def __init__(
self, manager: ManagerClient, ks: str, tbl: str,
num_workers: int = DEFAULT_WORKERS, num_keys: int = DEFAULT_NUM_KEYS
num_workers: int = DEFAULT_WORKERS, num_keys: int = DEFAULT_NUM_KEYS, use_counters: bool = False,
counters_random_delta: bool = False, counters_max_delta: int = 5, counter_tbl: Optional[str] = None
):
self.ks = ks
self.tbl = tbl
@@ -202,6 +233,12 @@ class BaseLWTTester:
self.migrations = 0
self.phase = "warmup" # "warmup" -> "migrating" -> "post"
self.phase_ops = defaultdict(int)
# counters config
self.use_counters = use_counters
self.counters_random_delta = counters_random_delta
self.counters_max_delta = counters_max_delta
self.counter_tbl = counter_tbl or (f"{tbl}_ctr" if use_counters else None)
def _get_lower_bound(self, pk: int, col_idx: int) -> int:
return self.lb_counts[pk][col_idx]
@@ -233,6 +270,14 @@ class BaseLWTTester:
def create_workers(self, stop_event) -> List[Worker]:
workers: List[Worker] = []
counter_stmt: Optional[PreparedStatement] = None
if self.use_counters:
counter_stmt = self.cql.prepare(
f"UPDATE {self.ks}.{self.counter_tbl} "
f"SET c = c + ? WHERE pk = ?"
)
for i in range(self.num_workers):
other_columns = [j for j in range(self.num_workers) if j != i]
cond = " AND ".join([*(f"s{j} >= ?" for j in other_columns), f"s{i} = ?"])
@@ -247,6 +292,9 @@ class BaseLWTTester:
other_columns=other_columns,
get_lower_bound=self._get_lower_bound,
on_applied=self._on_applied,
counter_update_statement=counter_stmt,
counters_random_delta=self.counters_random_delta,
counters_max_delta=self.counters_max_delta,
)
workers.append(worker)
return workers
@@ -258,6 +306,11 @@ class BaseLWTTester:
f"CREATE TABLE {self.ks}.{self.tbl} (pk int PRIMARY KEY, {cols_def})"
)
logger.info("Created table %s.%s with %d columns", self.ks, self.tbl, self.num_workers)
if self.use_counters:
await self.cql.run_async(
f"CREATE TABLE {self.ks}.{self.counter_tbl} (pk int PRIMARY KEY, c counter)"
)
logger.info("Created counter table %s.%s", self.ks, self.counter_tbl)
async def initialize_rows(self):
"""
@@ -296,7 +349,7 @@ class BaseLWTTester:
assert not errs, f"worker errors: {errs}"
logger.info("All workers stopped")
async def verify_consistency(self):
async def _verify_base_table(self):
"""Ensure every (pk, column) reflects the number of successful CAS writes."""
# Run SELECTs for all PKs in parallel using prepared statement
tasks = []
@@ -320,6 +373,35 @@ class BaseLWTTester:
total_ops = sum(sum(w.success_counts.values()) for w in self.workers)
logger.info("Consistency verified %d total successful CAS operations", total_ops)
async def _verify_counters(self):
if not self.use_counters:
return
stmt = SimpleStatement(
f"SELECT pk, c FROM {self.ks}.{self.counter_tbl}",
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
)
rows = await self.cql.run_async(stmt)
db_values: Dict[int, int] = {row.pk: row.c for row in rows}
mismatches = []
for pk in self.pks:
actual = db_values.get(pk, 0)
expected = sum(worker.counter_deltas.get(pk, 0) for worker in self.workers)
if actual != expected:
mismatches.append(
f"counter mismatch pk={pk} c={actual}, expected={expected}"
)
assert not mismatches, "Counter consistency violations: " + "; ".join(mismatches)
total_delta = sum(sum(worker.counter_deltas.values()) for worker in self.workers)
logger.info("Counter table consistency verified total delta=%d", total_delta)
async def verify_consistency(self):
await self._verify_base_table()
await self._verify_counters()
async def get_token_for_pk(cql, ks: str, tbl: str, pk: int) -> int:
"""Get the token for a given primary key"""

View File

@@ -0,0 +1,412 @@
# Copyright (C) 2025-present ScyllaDB
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import asyncio
import logging
import random
import re
import time
from typing import Dict
import pytest
from test.cluster.conftest import skip_mode
from test.cluster.lwt.lwt_common import (
BaseLWTTester,
DEFAULT_WORKERS,
DEFAULT_NUM_KEYS,
wait_for_tablet_count
)
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError
from test.pylib.tablets import get_tablet_count
from test.pylib.tablets import get_tablet_replicas
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
TARGET_RESIZE_COUNT = 20
NUM_MIGRATIONS = 20
WARMUP_LWT_CNT = 100
POST_LWT_CNT = 100
PHASE_WARMUP = "warmup"
PHASE_RESIZE = "resize"
PHASE_POST = "post"
MIN_TABLETS = 1
MAX_TABLETS = 20
RESIZE_TIMEOUT = 240
MIGRATE_ONE_TIMEOUT_S = 60
NO_REPLICA_RE = re.compile(r"has no replica on", re.IGNORECASE)
DST_REPLICA_RE = re.compile(r"has replica on", re.IGNORECASE)
def _err_code(e: Exception):
return getattr(e, "code", None)
def _err_text(e: Exception):
return getattr(e, "text", "") or str(e)
def _is_tablet_in_transition_http_error(e: Exception) -> bool:
return isinstance(e, HTTPError) and _err_code(e) == 500 and "in transition" in _err_text(e).lower()
def _is_no_replica_on_src_error(e: Exception) -> bool:
return isinstance(e, HTTPError) and _err_code(e) == 500 and NO_REPLICA_RE.search(_err_text(e)) is not None
def _is_dst_already_replica_error(e: Exception) -> bool:
return isinstance(e, HTTPError) and _err_code(e) == 500 and DST_REPLICA_RE.search(_err_text(e)) is not None
async def _move_tablet_with_retry(manager, src_server, ks, tbl,
src_host_id, src_shard, dst_host_id, dst_shard, token,
*, timeout_s=MIGRATE_ONE_TIMEOUT_S, base_sleep=0.1, max_sleep=2.0):
deadline = time.time() + timeout_s
sleep = base_sleep
while True:
try:
await manager.api.move_tablet(
src_server.ip_addr, ks, tbl,
src_host_id, src_shard, dst_host_id, dst_shard, token
)
return
except Exception as e:
if _is_tablet_in_transition_http_error(e) and time.time() + sleep < deadline:
logger.info("Token %s in transition, retry in %.2fs", token, sleep)
await asyncio.sleep(sleep + random.uniform(0, sleep))
sleep = min(sleep * 1.7, max_sleep)
continue
raise
async def tablet_migration_ops(
stop_event: asyncio.Event,
manager: ManagerClient,
servers,
tester: BaseLWTTester,
table: str,
num_ops: int,
pause_range=(0.5, 2.0),
*,
server_properties,
) -> None:
logger.info("Starting tablet migration ops for %s.%s: target=%d", tester.ks, table, num_ops)
migration_count = 0
intranode_ratio = 0.3
# server_id -> rack
server_id_to_rack: Dict[str, str] = {
s.server_id: prop["rack"] for s, prop in zip(servers, server_properties)
}
host_ids = await asyncio.gather(
*(manager.get_host_id(s.server_id) for s in servers)
)
# server_id -> host_id и host_id -> server
server_id_to_host_id: Dict[str, str] = {
s.server_id: hid for s, hid in zip(servers, host_ids)
}
host_id_to_server = {
hid: s for s, hid in zip(servers, host_ids)
}
attempt = 0
while not stop_event.is_set() and migration_count < num_ops:
attempt += 1
sample_pk = random.choice(tester.pks)
token = tester.pk_to_token[sample_pk]
replicas = await get_tablet_replicas(
manager, servers[0], tester.ks, table, token
)
src_host_id, src_shard = random.choice(replicas)
src_server = host_id_to_server.get(src_host_id)
assert src_server is not None, (
f"Source host_id {src_host_id} for token {token} not found in host_id_to_server (attempt {attempt})"
)
if random.random() < intranode_ratio:
dst_host_id = src_host_id
dst_server = src_server
dst_shard = 0 if src_shard != 0 else 1
else:
replica_hids = {h for (h, _sh) in replicas}
src_rack = server_id_to_rack[src_server.server_id]
same_rack_candidates = [
s for s in servers if server_id_to_rack[s.server_id] == src_rack
and server_id_to_host_id[s.server_id] not in replica_hids
]
assert same_rack_candidates, (
f"No same-rack non-replica candidate for token {token} (attempt {attempt})"
)
dst_server = random.choice(same_rack_candidates)
dst_host_id = server_id_to_host_id[dst_server.server_id]
dst_shard = 0
try:
await _move_tablet_with_retry(
manager, src_server, tester.ks, table,
src_host_id, src_shard, dst_host_id, dst_shard, token,
timeout_s=60,
)
migration_count += 1
logger.info(
"Completed migration #%d (token=%s -> %s:%d) for %s.%s",
migration_count, token, dst_server.ip_addr, dst_shard, tester.ks, table,
)
await asyncio.sleep(random.uniform(*pause_range))
continue
except Exception as e:
if _is_tablet_in_transition_http_error(e):
logger.info("Token %s in transition, switching token (attempt %d)",
token, attempt)
continue
if _is_no_replica_on_src_error(e) or _is_dst_already_replica_error(e):
logger.info("Src replica vanished for token %s, re-pick (attempt %d)",
token, attempt)
continue
raise
assert migration_count == num_ops, f"Only completed {migration_count}/{num_ops} migrations for {tester.ks}.{table}"
logger.info("Completed tablet migration ops for %s.%s: %d/%d", tester.ks, table, migration_count, num_ops)
def powers_of_two_in_range(lo: int, hi: int):
if lo > hi or hi < 1:
return []
lo = max(1, lo)
start_e = (lo - 1).bit_length()
end_e = hi.bit_length()
return [1 << e for e in range(start_e, end_e + 1) if (1 << e) <= hi]
async def run_random_resizes(
stop_event_: asyncio.Event,
manager: ManagerClient,
servers,
tester: BaseLWTTester,
ks: str,
table: str,
counter_table: str,
target_steps: int = TARGET_RESIZE_COUNT,
pause_range=(0.5, 2.0),
):
"""
Perform randomized tablet count changes (splits/merges) on the main LWT table
and its counter table. Runs until target resize count is reached or stop_event_
is set. Returns a dict with simple stats.
"""
split_count = 0
merge_count = 0
current_resize_count = 0
pow2_targets = powers_of_two_in_range(MIN_TABLETS, MAX_TABLETS)
while not stop_event_.is_set() and current_resize_count < target_steps:
# Drive resize direction from the main table.
current_main = await get_tablet_count(manager, servers[0], ks, table)
candidates = [t for t in pow2_targets if t != current_main]
target_cnt = random.choice(candidates)
direction = "split" if target_cnt > current_main else "merge"
logger.info(
"[%s] starting: %s.%s=%d, %s.%s -> target %d",
direction.upper(), ks, table, current_main, ks,
counter_table, target_cnt
)
tables = [table, counter_table]
# Apply ALTER TABLE to both tables.
for tbl in tables:
await tester.cql.run_async(
f"ALTER TABLE {ks}.{tbl} "
f"WITH tablets = {{'min_tablet_count': {target_cnt}}}"
)
if direction == "split":
predicate = lambda c, tgt=target_cnt: c >= tgt
else:
predicate = lambda c, tgt=target_cnt: c <= tgt
# Wait for both tables to converge.
main_after, counter_after = await asyncio.gather(
wait_for_tablet_count(
manager,
servers[0],
tester.ks,
table,
predicate=predicate,
target=target_cnt,
timeout_s=RESIZE_TIMEOUT,
),
wait_for_tablet_count(
manager,
servers[0],
tester.ks,
counter_table,
predicate=predicate,
target=target_cnt,
timeout_s=RESIZE_TIMEOUT,
),
)
# Sanity: both tables should end up with the same tablet count.
assert main_after == counter_after, (
f"Tablet counts diverged: {ks}.{table}={main_after}, "
f"{ks}.{counter_table}={counter_after}"
)
if direction == "split":
logger.info(
"[SPLIT] converged: %s.%s %d -> %d, %s.%s -> %d (target %d)",
ks, table, current_main, main_after, ks, counter_table,
counter_after, target_cnt
)
assert main_after >= current_main, (
f"Tablet count expected to increase during split "
f"(was {current_main}, now {main_after})"
)
split_count += 1
else:
logger.info(
"[MERGE] converged: %s.%s %d -> %d, %s.%s -> %d (target %d)",
ks, table, current_main, main_after, ks, counter_table,
counter_after, target_cnt
)
assert main_after <= current_main, (
f"Tablet count expected to decrease during merge "
f"(was {current_main}, now {main_after})"
)
merge_count += 1
current_resize_count += 1
await asyncio.sleep(random.uniform(*pause_range))
return {
"steps_done": current_resize_count,
"seen_split": split_count,
"seen_merge": merge_count,
}
@pytest.mark.asyncio
@skip_mode("debug", "debug mode is too slow for this test")
async def test_multi_column_lwt_migrate_and_random_resizes(manager: ManagerClient):
cfg = {
"enable_tablets": True,
"tablet_load_stats_refresh_interval_in_seconds": 1,
"target-tablet-size-in-bytes": 1024 * 16,
}
properties = [
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"},
]
cmdline = [
'--logger-log-level', 'paxos=trace', '--smp=2',
]
servers = await manager.servers_add(6, config=cfg, property_file=properties, cmdline=cmdline)
async with new_test_keyspace(
manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} "
"AND tablets = {'initial': 1}",
) as ks:
stop_event_ = asyncio.Event()
table = "lwt_split_merge_table"
cnt_table = "lwt_split_merge_counters"
tester = BaseLWTTester(
manager,
ks,
table,
num_workers=DEFAULT_WORKERS,
num_keys=DEFAULT_NUM_KEYS,
use_counters=True,
counters_random_delta=True,
counters_max_delta=5,
counter_tbl=cnt_table,
)
await tester.create_schema()
await tester.initialize_rows()
await tester.start_workers(stop_event_)
try:
# PHASE: warmup
tester.set_phase(PHASE_WARMUP)
logger.info("LWT warmup: waiting for %d applied CAS", WARMUP_LWT_CNT)
await tester.wait_for_phase_ops(stop_event_, PHASE_WARMUP, WARMUP_LWT_CNT, timeout=180, poll=0.2)
logger.info("LWT warmup complete: %d ops", tester.get_phase_ops(PHASE_WARMUP))
# PHASE: resize + migrate
tester.set_phase(PHASE_RESIZE)
logger.info("Starting RESIZE (random powers-of-two) + %d migrations per table", NUM_MIGRATIONS)
resize_task = asyncio.create_task(
run_random_resizes(
stop_event_=stop_event_,
manager=manager,
servers=servers,
tester=tester,
ks=ks,
table=table,
target_steps=TARGET_RESIZE_COUNT,
counter_table=cnt_table,
)
)
migrate_task = asyncio.create_task(
tablet_migration_ops(
stop_event_,
manager, servers, tester,
num_ops=NUM_MIGRATIONS,
pause_range=(0.3, 1.0),
server_properties=properties,
table=table,
)
)
migrate_cnt_task = asyncio.create_task(
tablet_migration_ops(
stop_event_,
manager, servers, tester,
num_ops=NUM_MIGRATIONS,
pause_range=(0.3, 1.0),
server_properties=properties,
table=cnt_table
)
)
resize_stats = await resize_task
await asyncio.gather(migrate_task, migrate_cnt_task)
logger.info(
"Randomized resize stats: steps_done=%d, split=%d, merge=%d; LWT ops during resize=%d",
resize_stats["steps_done"], resize_stats["seen_split"], resize_stats["seen_merge"],
tester.get_phase_ops(PHASE_RESIZE),
)
assert resize_stats["steps_done"] >= 1, "Resize phase performed 0 steps"
assert tester.get_phase_ops(PHASE_RESIZE) > 0, "Expected LWT ops during RESIZE phase"
# PHASE: post
tester.set_phase(PHASE_POST)
logger.info("LWT post resize: waiting for %d applied CAS", POST_LWT_CNT)
await tester.wait_for_phase_ops(stop_event_, PHASE_POST, POST_LWT_CNT, timeout=180, poll=0.2)
logger.info("LWT post resize complete: %d ops", tester.get_phase_ops(PHASE_POST))
total_ops = sum(tester.phase_ops.values())
assert total_ops >= (WARMUP_LWT_CNT + POST_LWT_CNT), f"Too few total LWT ops: {total_ops}"
finally:
await tester.stop_workers()
await tester.verify_consistency()
logger.info("Combined LWT during random split/merge + migrations test completed successfully")

View File

@@ -121,7 +121,7 @@ async def test_change_two(manager, random_tables, build_mode):
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_start(servers[1].server_id)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
servers[1] = servers[1]._replace(ip_addr=s1_new_ip, rpc_address=s1_new_ip)
if build_mode != 'release':
s0_logs = await manager.server_open_log(servers[0].server_id)
await s0_logs.wait_for('crash-before-prev-ip-removed hit, killing the node')
@@ -132,7 +132,7 @@ async def test_change_two(manager, random_tables, build_mode):
await wait_proper_ips([servers[0], servers[1]])
await manager.server_start(servers[2].server_id)
servers[2] = ServerInfo(servers[2].server_id, s2_new_ip, s2_new_ip, servers[2].datacenter, servers[2].rack)
servers[2] = servers[2]._replace(ip_addr=s2_new_ip, rpc_address=s2_new_ip)
await reconnect_driver(manager)
await wait_proper_ips([servers[0], servers[1], servers[2]])

View File

@@ -51,6 +51,9 @@ async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
coordinators_ids = await get_coordinator_host_ids(manager)
assert len(coordinators_ids) == 1, "At least 1 coordinator id should be found"
# Configure manager to ignore crashes caused by crash_coordinator_before_stream injection
manager.ignore_cores_log_patterns.append("crash_coordinator_before_stream: aborting")
# kill coordinator during decommission
logger.debug("Kill coordinator during decommission")
coordinator_host = await get_coordinator_host(manager)

View File

@@ -48,7 +48,7 @@ async def test_no_removed_node_event_on_ip_change(manager: ManagerClient, caplog
with test_cluster.connect() as test_cql:
logger.info(f"starting the follower node {servers[1]}")
await manager.server_start(servers[1].server_id)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
servers[1] = servers[1]._replace(ip_addr=s1_new_ip, rpc_address=s1_new_ip)
logger.info("waiting for cql and hosts")
await wait_for_cql_and_get_hosts(test_cql, servers, time.time() + 30)

View File

@@ -10,7 +10,7 @@ from cassandra.policies import FallthroughRetryPolicy
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name, wait_for
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count, TabletReplicas
from test.cluster.conftest import skip_mode
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace
@@ -1981,3 +1981,171 @@ async def test_timed_out_reader_after_cleanup(manager: ManagerClient):
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/26041
@pytest.mark.asyncio
@pytest.mark.parametrize("repair_before_split", [False, True])
@skip_mode('release', 'error injections are not supported in release mode')
async def test_split_and_incremental_repair_synchronization(manager: ManagerClient, repair_before_split: bool):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
'--logger-log-level', 'compaction=debug',
]
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg, auto_rack_dc="dc1")
cql = manager.get_cql()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
initial_tablets = 2
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
expected_tablet_count = 4 # expected tablet count post split
async def run_split_prepare():
await manager.api.enable_injection(servers[0].ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
# force split on the test table
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
await s0_log.wait_for('Finalizing resize decision for table', from_mark=s0_mark)
async def generate_repair_work():
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
token = 'all'
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
if repair_before_split:
await generate_repair_work()
for server in servers:
await manager.api.enable_injection(server.ip_addr, "incremental_repair_prepare_wait", one_shot=True)
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental'))
await s0_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s0_mark)
await s1_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s1_mark)
await run_split_prepare()
for server in servers:
await manager.api.message_injection(server.ip_addr, "incremental_repair_prepare_wait")
await repair_task
else:
await run_split_prepare()
await generate_repair_work()
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.disable_injection(servers[0].ip_addr, "tablet_resize_finalization_postpone")
async def finished_splitting():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= expected_tablet_count or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
await manager.server_stop(servers[0].server_id)
await manager.server_start(servers[0].server_id)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_split_and_intranode_synchronization(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
'--logger-log-level', 'compaction=debug',
'--smp', '2',
]
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
server = servers[0]
cql = manager.get_cql()
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
initial_tablets = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
host_id = await manager.get_host_id(server.server_id)
src_shard = replica[1]
# if tablet replica is at shard 0, move it to shard 1.
if src_shard == 0:
dst_shard = 1
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
await manager.api.enable_tablet_balancing(server.ip_addr)
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)
# force split on the test table
expected_tablet_count = initial_tablets * 2
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
# Check that shard 0 ACKed split.
mark, _ = await log.wait_for('Setting split ready sequence number to', from_mark=mark)
# Move tablet replica back to shard 0, where split was already ACKed.
src_shard = 1
dst_shard = 0
migration_task = asyncio.create_task(manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token))
mark, _ = await log.wait_for("Finished intra-node streaming of tablet", from_mark=mark)
await manager.api.stop_compaction(server.ip_addr, "SPLIT")
await migration_task
await manager.api.disable_injection(server.ip_addr, "tablet_resize_finalization_postpone")
async def finished_splitting():
tablet_count = await get_tablet_count(manager, server, ks, 'test')
return tablet_count >= expected_tablet_count or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)

View File

@@ -1,5 +1,8 @@
add_library(test-lib STATIC)
target_sources(test-lib
PUBLIC
boost_test_tree_lister.cc
boost_tree_lister_injector.cc
PRIVATE
cql_assertions.cc
dummy_sharder.cc

View File

@@ -0,0 +1,422 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "test/lib/boost_test_tree_lister.hh"
#include <boost/algorithm/string/replace.hpp>
#include <fmt/ranges.h>
#include <memory>
#include <ranges>
namespace {
using label_info = internal::label_info;
using test_case_info = internal::test_case_info;
using test_suite_info = internal::test_suite_info;
using test_file_info = internal::test_file_info;
} // anonymous namespace
/// --------------------
///
/// Implementation notes
///
/// --------------------
///
/// The structure of the Boost.Test's test tree consists solely
/// of nodes representing test suites and test cases. It ignores
/// information like, for instance, the name of the file those
/// entities reside [1].
///
/// What's more, a test suite can span multiple files as long
/// as it has the same name [2].
///
/// We'd like to re-visualize the tree in a different manner:
/// have a forest, where each tree represents the internal structure
/// of a specific file. The non-leaf nodes represent test suites,
/// and the leaves -- test cases.
///
/// This type achieves that very goal (albeit in a bit ugly manner).
///
/// ---
///
/// Note that the implementation suffers from the same problems
/// Boost.Test itself does. For instance, when parametrizing tests
/// with `boost::unit_test::data`, the test will appear as a test suite,
/// while cases for each of the data instances -- as test cases.
/// There's no way to overcome that, so we're stuck with it.
///
/// -----------
///
/// Assumptions
///
/// -----------
///
/// We rely on the following assumptions:
///
/// 1. The tree traversal is performed pre-order. That's the case for
/// Boost.Test 1.89.0.
/// 2. If a test case TC belong to a test suite TS (directly or indirectly),
/// the following execution order holds:
/// i. `test_suite_start(TC)`,
/// ii. `visit(TC)`,
/// iii. `test_suite_finish(TC)`.
/// 3. If test suite TS1 is nested within test suite TS2, the following
/// execution order holds:
/// i. `test_suite_start(TS1)`,
/// ii. `test_suite_start(TS2)`,
/// iii. `test_suite_finish(TS2)`,
/// iv. `test_suite_finish(TS1)`.
///
/// ----------
///
/// References
///
/// ----------
///
/// [1] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree.html
/// [2] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree/test_suite.html
///
/// ----------------------------
///
/// Example of high-level output
///
/// ----------------------------
///
/// Let's consider the following organization of tests.
///
/// TestFile1.cc:
/// - Suite A:
/// - Suite A1:
/// - Test A1.1 (labels: L1)
/// - Test A1.2
/// - Suite A2:
/// - Test A2.1
/// - Test A.1
/// - Suite B:
/// - Test B1
/// - Test B2 (labels: L2, L3)
/// - Test 1
///
/// TestFile2.cc:
/// - Suite A:
/// - Suite A3
/// - Test A3.1
/// - Test A.2
/// - Suite C:
/// - Test C.1
/// - Test 2
///
/// This structure will be translated into the following JSON (we're
/// omitting some details to make it cleaner and easier to read):
///
/// [
/// {
/// "file": "TestFile1.cc",
/// "content": {
/// "suites": [
/// {
/// "name": "A",
/// "suites": [
/// {
/// "name": "A1",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test A1.1",
/// "labels": "L1"
/// },
/// {
/// "name": "Test A1.2",
/// "labels": ""
/// }
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test1",
/// "labels": ""
/// }
/// ]
/// },
/// {
/// "name": "B",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test B1",
/// "labels": ""
/// },
/// {
/// "name": "Test B2",
/// "labels": "L2,L3"
/// },
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test 1",
/// "labels": ""
/// }
/// ]
/// }
/// },
/// {
/// "file": "TestFile2.cc",
/// "content": {
/// "suites": [
/// {
/// "name": "A",
/// "suites": [
/// {
/// "name": "A3",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test A3.1",
/// "labels": ""
/// }
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test A.2",
/// "labels": ""
/// }
/// ]
/// },
/// {
/// "name": "C",
/// "suites": [],
/// "tests": [
/// {
/// "name": "Test C.1",
/// "labels": ""
/// }
/// ]
/// }
/// ],
/// "tests": [
/// {
/// "name": "Test 2",
/// "labels": ""
/// }
/// ]
/// }
/// }
/// ]
///
/// Note that although Boost.Test treats Suite A in TestFile1.cc
/// and Suite A in TestFile2.cc as the SAME suite, we consider it
/// separately for each of the files it resides in.
struct boost_test_tree_lister::impl {
public:
/// The final result we're building while traversing the test tree.
test_file_forest file_forest;
/// The path from the root to the current suite.
std::vector<std::string> active_suites;
public:
void process_test_case(const boost::unit_test::test_case& tc) {
const std::string_view filename = {tc.p_file_name.begin(), tc.p_file_name.end()};
test_file_info& test_file = get_file_info(filename);
std::string test_name = tc.p_name;
std::vector<label_info> labels = tc.p_labels.get();
test_case_info test_info {.name = std::move(test_name), .labels = std::move(labels)};
if (active_suites.empty()) {
test_file.free_tests.push_back(std::move(test_info));
} else {
test_suite_info& suite_info = get_active_suite(filename);
suite_info.tests.push_back(std::move(test_info));
}
}
bool test_suite_start(const boost::unit_test::test_suite& ts) {
// The suite is the master test suite, so let's ignore it
// because it doesn't represent any actual test suite.
if (ts.p_parent_id == boost::unit_test::INV_TEST_UNIT_ID) {
assert(active_suites.empty());
return true;
}
std::string suite_name = ts.p_name.value;
add_active_suite(std::move(suite_name));
return true;
}
void test_suite_finish(const boost::unit_test::test_suite& ts) {
// The suite is the master test suite, so let's ignore it
// because it doesn't represent any actual test suite.
if (ts.p_parent_id == boost::unit_test::INV_TEST_UNIT_ID) {
assert(active_suites.empty());
return;
}
drop_active_suite();
}
private:
test_file_info& get_file_info(std::string_view filename) {
auto& test_files = file_forest.test_files;
auto it = test_files.find(filename);
if (it == test_files.end()) {
std::tie(it, std::ignore) = test_files.emplace(filename, std::vector<test_suite_info>{});
}
return it->second;
}
void add_active_suite(std::string suite_name) {
active_suites.push_back(std::move(suite_name));
}
void drop_active_suite() {
assert(!active_suites.empty());
active_suites.pop_back();
}
test_suite_info& get_active_suite(std::string_view filename) {
assert(!active_suites.empty());
test_file_info& file_info = get_file_info(filename);
test_suite_info* last = &get_root_suite(file_info, active_suites[0]);
for (const auto& suite_name : active_suites | std::views::drop(1)) {
last = &get_subsuite(*last, suite_name);
}
return *last;
}
test_suite_info& get_root_suite(test_file_info& file_info, std::string_view suite_name) {
auto suite_it = std::ranges::find(file_info.suites, suite_name, &test_suite_info::name);
if (suite_it != file_info.suites.end()) {
return *suite_it;
}
test_suite_info suite_info {.name = std::string(suite_name)};
file_info.suites.push_back(std::move(suite_info));
return *file_info.suites.rbegin();
}
test_suite_info& get_subsuite(test_suite_info& parent, std::string_view suite_name) {
auto suite_it = std::ranges::find(parent.subsuites, suite_name, [] (auto&& suite_ptr) -> std::string_view {
return suite_ptr->name;
});
if (suite_it != parent.subsuites.end()) {
return **suite_it;
}
auto suite = std::make_unique<test_suite_info>(std::string(suite_name));
parent.subsuites.push_back(std::move(suite));
return **parent.subsuites.rbegin();
}
};
boost_test_tree_lister::boost_test_tree_lister() : _impl(std::make_unique<impl>()) {}
boost_test_tree_lister::~boost_test_tree_lister() noexcept = default;
const test_file_forest& boost_test_tree_lister::get_result() const {
return _impl->file_forest;
}
void boost_test_tree_lister::visit(const boost::unit_test::test_case& tc) {
return _impl->process_test_case(tc);
}
bool boost_test_tree_lister::test_suite_start(const boost::unit_test::test_suite& ts) {
return _impl->test_suite_start(ts);
}
void boost_test_tree_lister::test_suite_finish(const boost::unit_test::test_suite& ts) {
return _impl->test_suite_finish(ts);
}
// Replace every occurrenace of a double quotation mark (`"`) with a string `\"`.
static std::string escape_quotation_marks(std::string_view str) {
const std::size_t double_quotation_count = std::ranges::count(str, '"');
std::string result(str.size() + double_quotation_count, '\\');
std::size_t offset = 0;
for (std::size_t i = 0; i < str.size(); ++i) {
if (str[i] == '"') {
result[i + offset] = '\\';
++offset;
}
result[i + offset] = str[i];
}
return result;
}
auto fmt::formatter<internal::test_case_info>::format(
const internal::test_case_info& test_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
// Sanity check. The names of tests are expected to comprise only of alphanumeric characters.
assert(std::ranges::count(test_info.name, '"') == 0);
auto label_range = test_info.labels | std::views::transform(escape_quotation_marks);
return fmt::format_to(ctx.out(), R"({{"name":"{}","labels":"{}"}})",
test_info.name, fmt::join(label_range, ","));
}
auto fmt::formatter<internal::test_suite_info>::format(
const internal::test_suite_info& suite_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
auto actual_suite_range = suite_info.subsuites | std::views::transform([] (auto&& ptr) -> const test_suite_info& {
return *ptr;
});
auto suite_range = fmt::join(actual_suite_range, ",");
auto test_range = fmt::join(suite_info.tests, ",");
return fmt::format_to(ctx.out(), R"({{"name":"{}","suites":[{}],"tests":[{}]}})",
suite_info.name, std::move(suite_range), std::move(test_range));
}
auto fmt::formatter<internal::test_file_info>::format(
const internal::test_file_info& file_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
auto suite_range = fmt::join(file_info.suites, ",");
auto test_range = fmt::join(file_info.free_tests, ",");
return fmt::format_to(ctx.out(), R"({{"suites":[{}],"tests":[{}]}})",
std::move(suite_range), std::move(test_range));
}
auto fmt::formatter<internal::test_file_forest>::format(
const internal::test_file_forest& forest_info,
fmt::format_context& ctx) const -> decltype(ctx.out())
{
std::size_t files_left = forest_info.test_files.size();
fmt::format_to(ctx.out(), "[");
for (const auto& [file, content] : forest_info.test_files) {
fmt::format_to(ctx.out(), R"({{"file":"{}","content":{}}})",
file, content);
if (files_left > 1) {
fmt::format_to(ctx.out(), ",");
}
--files_left;
}
return fmt::format_to(ctx.out(), "]");
}

View File

@@ -0,0 +1,117 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <boost/test/tree/visitor.hpp>
#include <fmt/base.h>
#include <memory>
namespace internal {
using label_info = std::string;
/// Type representing a single Boost test case.
struct test_case_info {
/// The name of the test case.
std::string name;
/// The labels the test was marked with.
std::vector<label_info> labels;
};
/// Type representing a single Boost test suite within a single file.
///
/// Note that a single suite can span multiple files (as of Boost.Test 1.89.0); see:
/// https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree/test_suite.html.
///
/// We turn away from that convention and list suites from different files separately.
/// However, that doesn't change the fact that it's still the same suite from the
/// perspective of Boost.Test. In particular, if a suite is marked with a label,
/// it's applied to it globally.
struct test_suite_info {
std::string name;
std::vector<std::unique_ptr<test_suite_info>> subsuites;
/// The tests belonging directly to this suite.
std::vector<test_case_info> tests;
};
struct test_file_info {
std::vector<test_suite_info> suites;
std::vector<test_case_info> free_tests;
};
struct test_file_forest {
std::map<std::string, test_file_info, std::less<>> test_files;
};
} // namespace internal
using test_file_forest = internal::test_file_forest;
/// Implementation of the `boost::unit_test::test_tree_visitor` that
/// produces a similar result to running a Boost.Test executable with
/// `--list_content=HRF` or `--list_content=DOT`. This type results
/// in the JSON format of the output.
///
/// The crucial difference between this implementation and the built-in
/// HRF and DOT ones is that the result obtained by a call to `get_result()`
/// (after the traversal has finished) is going to have a different structure.
///
/// The type `boost_test_tree_lister` will treat the same suite from different
/// files as separate ones, even if they share the name. Boost.Test would treat
/// them as the same one and group the results by suites. In other words,
/// this type groups results by (in order):
///
/// 1. File
/// 2. Suite(s)
/// 3. Test cases
class boost_test_tree_lister : public boost::unit_test::test_tree_visitor {
private:
struct impl;
private:
std::unique_ptr<impl> _impl;
public:
boost_test_tree_lister();
~boost_test_tree_lister() noexcept;
public:
const test_file_forest& get_result() const;
private:
virtual void visit(const boost::unit_test::test_case&) override;
virtual bool test_suite_start(const boost::unit_test::test_suite&) override;
virtual void test_suite_finish(const boost::unit_test::test_suite&) override;
};
template <>
struct fmt::formatter<internal::test_case_info> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_case_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <>
struct fmt::formatter<internal::test_suite_info> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_suite_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <>
struct fmt::formatter<internal::test_file_info> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_file_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <>
struct fmt::formatter<internal::test_file_forest> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const internal::test_file_forest&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -0,0 +1,115 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "test/lib/boost_test_tree_lister.hh"
#include <boost/test/framework.hpp>
#include <boost/test/tree/traverse.hpp>
#include <boost/test/unit_test_suite.hpp>
#include <fmt/core.h>
namespace {
/// Traverse the test tree and collect information about
/// its structure and the tests.
///
/// The output is going to be in the JSON format.
/// For more details, see the implementation of
/// `boost_test_tree_lister`.
void print_boost_tests() {
namespace but = boost::unit_test;
but::framework::finalize_setup_phase();
boost_test_tree_lister traverser;
but::traverse_test_tree(but::framework::master_test_suite().p_id, traverser, true);
fmt::print("{}", traverser.get_result());
}
/// --------
/// Examples
/// --------
///
/// # This will NOT list the tests because Boost.Test
/// # will interpret it as an argument to the framework.
/// $ ./path/to/my/test/exec --list_json_content
///
/// # This will NOT list the tests because Boost.Test requires
/// # that all non-Boost.Test arguments be provided AFTER
/// # a `--` sequence (cf. example below).
/// $ ./path/to/my/test/exec list_json_content
///
/// # This will NOT list the tests because Boost.Test because
/// # the option simply doesn't match the exepected one.
/// $ ./path/to/my/test/exec list_json_content
///
/// # This DOES work and DOES what we expect, i.e. it lists the tests.
/// $ ./path/to/my/test/exec -- --list_json_content
bool list_tests(int argc, char** argv) {
for (int i = 1; i < argc; ++i) {
std::string_view option = argv[i];
if (option == "--list_json_content") {
return true;
}
}
return false;
}
struct boost_tree_lister_injector {
boost_tree_lister_injector() {
const auto& master_suite = boost::unit_test::framework::master_test_suite();
/// The arguments here don't include Boost.Test-specific arguments.
/// Those present correspond to the path to the binary and options
/// specified for the "end-code".
///
/// --------
/// Examples
/// --------
/// $ ./path/to/my/test/exec my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// $ ./path/to/my/test/exec -- my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// $ ./path/to/my/test/exec --auto_start_dbg=0 -- my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// $ ./path/to/my/test/exec --auto_start_dbg=0 my_custom_arg
/// Arguments: [<path>, "my_custom_arg"]
///
/// ------------------------------------------
/// Interaction with some Boost.Test arguments
/// ------------------------------------------
///
/// Note, however, that some Boost.Test options may prevent us
/// from accessing this code. For instance, if the user runs
///
/// $ ./path/to/my/test/exec --list_content -- my_custom_arg
///
/// then Boost.Test will immediately move to its own code and not
/// execute this one (because it's only called by a global fixture).
auto&& [argc, argv] = std::make_pair(master_suite.argc, master_suite.argv);
if (list_tests(argc, argv)) {
print_boost_tests();
// At this point, it's impossible to prevent Boost.Test
// from executing the tests it collected. This is all
// we can do (at least without writing a lot more code.
// I don't know if it would still be possible to avoid it).
std::exit(0);
}
}
};
} // anonymous namespace
BOOST_GLOBAL_FIXTURE(boost_tree_lister_injector);

View File

@@ -91,7 +91,7 @@ public:
sstables::sstables_manager& get_sstables_manager() noexcept override {
return _sstables_manager;
}
sstables::shared_sstable make_sstable() const override {
sstables::shared_sstable make_sstable(sstables::sstable_state) const override {
return table().make_sstable();
}
sstables::sstable_writer_config configure_writer(sstring origin) const override {

View File

@@ -385,7 +385,7 @@ def test_repair_options_hosts_and_dcs_tablets(nodetool, datacenter, hosts):
[("--tablet-tokens", "1")],
[("--tablet-tokens", "-1,2")],
[("--tablet-tokens", "-1"), ("--tablet-tokens", "2")]])
def test_repair_options_hosts_tablets(nodetool, tokens):
def test_repair_options_tokens_tablets(nodetool, tokens):
_do_test_repair_options_tablets(nodetool, tokens=tokens)
def test_repair_all_with_vnode_keyspace(nodetool):

View File

@@ -623,7 +623,7 @@ Repair session 1
Repair session 1 finished
"""
def test_repair_keyspace(nodetool):
def test_repair_keyspace_failure(nodetool):
check_nodetool_fails_with(
nodetool,
("repair", "ks"),

View File

@@ -29,9 +29,9 @@ class KeyProvider(Enum):
class KeyProviderFactory:
"""Base class for provider factories"""
def __init__(self, key_provider : KeyProvider):
def __init__(self, key_provider : KeyProvider, tmpdir):
self.key_provider = key_provider
self.system_keyfile = None
self.system_key_location = os.path.join(tmpdir, "resources/system_keys")
async def __aenter__(self):
return self
@@ -50,7 +50,7 @@ class KeyProviderFactory:
def configuration_parameters(self) -> dict[str, str]:
"""scylla.conf entries for provider"""
return {}
return {"system_key_directory": self.system_key_location}
def additional_cf_options(self) -> dict[str, str]:
# pylint: disable=unused-argument
@@ -62,7 +62,7 @@ class KeyProviderFactory:
class LocalFileSystemKeyProviderFactory(KeyProviderFactory):
"""LocalFileSystemKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(LocalFileSystemKeyProviderFactory, self).__init__( KeyProvider.local)
super(LocalFileSystemKeyProviderFactory, self).__init__(KeyProvider.local, tmpdir)
self.secret_file = os.path.join(tmpdir, "test/node1/conf/data_encryption_keys")
def additional_cf_options(self) -> dict[str, str]:
@@ -72,8 +72,7 @@ class LocalFileSystemKeyProviderFactory(KeyProviderFactory):
class ReplicatedKeyProviderFactory(KeyProviderFactory):
"""ReplicatedKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(ReplicatedKeyProviderFactory, self).__init__( KeyProvider.replicated)
self.system_key_location = os.path.join(tmpdir, "resources/system_keys")
super(ReplicatedKeyProviderFactory, self).__init__(KeyProvider.replicated, tmpdir)
self.system_key_file_name = "system_key"
async def __aenter__(self):
@@ -88,17 +87,13 @@ class ReplicatedKeyProviderFactory(KeyProviderFactory):
raise RuntimeError(f'Could not generate system key: {stderr.decode()}')
return self
def configuration_parameters(self) -> dict[str, str]:
"""scylla.conf entries for provider"""
return super().configuration_parameters() | {"system_key_directory": self.system_key_location}
def additional_cf_options(self):
return super().additional_cf_options() | {"system_key": self.system_key_file_name}
class KmipKeyProviderFactory(KeyProviderFactory):
"""KmipKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(KmipKeyProviderFactory, self).__init__( KeyProvider.kmip)
super(KmipKeyProviderFactory, self).__init__(KeyProvider.kmip, tmpdir)
self.tmpdir = tmpdir
self.kmip_server_wrapper = None
self.kmip_host = "kmip_test"
@@ -178,7 +173,7 @@ class KmipKeyProviderFactory(KeyProviderFactory):
class KMSKeyProviderFactory(KeyProviderFactory):
"""KMSKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(KMSKeyProviderFactory, self).__init__( KeyProvider.kms)
super(KMSKeyProviderFactory, self).__init__(KeyProvider.kms, tmpdir)
self.tmpdir = tmpdir
self.master_key = "alias/Scylla-test"
self.kms_host = "kms_test"
@@ -260,7 +255,7 @@ class KMSKeyProviderFactory(KeyProviderFactory):
class AzureKeyProviderFactory(KeyProviderFactory):
"""AzureKeyProviderFactory proxy"""
def __init__(self, tmpdir):
super(AzureKeyProviderFactory, self).__init__( KeyProvider.azure)
super(AzureKeyProviderFactory, self).__init__(KeyProvider.azure, tmpdir)
self.tmpdir = tmpdir
self.azure_host = "azure_test"
self.azure_server = None

View File

@@ -22,12 +22,13 @@ class ServerInfo(NamedTuple):
rpc_address: IPAddress
datacenter: str
rack: str
pid: int
def __str__(self):
return f"Server({self.server_id}, {self.ip_addr}, {self.rpc_address}, {self.datacenter}, {self.rack})"
return f"Server({self.server_id}, {self.ip_addr}, {self.rpc_address}, {self.datacenter}, {self.rack}, {self.pid})"
def as_dict(self) -> dict[str, object]:
return {"server_id": self.server_id, "ip_addr": self.ip_addr, "rpc_address": self.rpc_address, "datacenter": self.datacenter, "rack": self.rack}
return {"server_id": self.server_id, "ip_addr": self.ip_addr, "rpc_address": self.rpc_address, "datacenter": self.datacenter, "rack": self.rack, "pid": self.pid}
def property_file(self) -> dict[str, str]:
return {"dc": self.datacenter, "rack": self.rack}

View File

@@ -167,3 +167,48 @@ class ScyllaLogFile:
line = await self._run_in_executor(log_file.readline, loop=loop)
return matches
async def find_backtraces(self, from_mark: int | None = None) -> list[str]:
"""
Find and extract all backtraces from the log file.
Each backtrace starts with a line "Backtrace:" followed by lines that start with exactly 2 spaces.
If `from_mark` argument is given, the log is searched from that position, otherwise from the beginning.
Return a list of strings, where each string is a complete backtrace (all lines joined together).
"""
loop = asyncio.get_running_loop()
backtraces = []
with self.file.open(encoding="utf-8") as log_file:
if from_mark:
await self._run_in_executor(log_file.seek, from_mark, loop=loop)
line = await self._run_in_executor(log_file.readline, loop=loop)
while line:
if line.strip() == "Backtrace:":
# Found a backtrace, collect all lines that start with exactly 2 spaces
backtrace_lines = [line]
while True:
next_line = await self._run_in_executor(log_file.readline, loop=loop)
if not next_line:
# End of file
break
if next_line.startswith(" ") and not next_line.startswith(" "):
# Line starts with exactly 2 spaces (backtrace entry)
backtrace_lines.append(next_line)
else:
# End of backtrace
line = next_line
break
if backtrace_lines:
# Join all backtrace lines into a single string
backtraces.append(''.join(backtrace_lines))
# Continue from current line (already read in the inner loop)
continue
line = await self._run_in_executor(log_file.readline, loop=loop)
return backtraces

View File

@@ -8,7 +8,9 @@
Provides helper methods to test cases.
Manages driver refresh when cluster is cycled.
"""
from collections import defaultdict
import pathlib
import re
import shutil
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
@@ -64,6 +66,8 @@ class ManagerClient:
self.metrics = ScyllaMetricsClient()
self.thread_pool = ThreadPoolExecutor()
self.test_finished_event = asyncio.Event()
self.ignore_log_patterns = [] # patterns to ignore in server logs when checking for errors
self.ignore_cores_log_patterns = [] # patterns to ignore in server logs when checking for core files
@property
def client(self):
@@ -179,6 +183,82 @@ class ManagerClient:
logger.info("Cluster after test %s: %s", test_case_name, cluster_status)
return cluster_status
async def check_all_errors(self, check_all_errors=False) -> dict[ServerInfo, dict[str, Union[list[str], list[str], Path, list[str]]]]:
errors = defaultdict(dict)
# find errors in logs
for server in await self.all_servers():
log_file = await self.server_open_log(server_id=server.server_id)
# check if we should ignore cores on this server
ignore_cores = []
if self.ignore_cores_log_patterns:
if matches := log_file.grep("|".join(f"({p})" for p in set(self.ignore_cores_log_patterns))):
logger.debug(f"Will ignore cores on {server}. Found the following log messages: {matches}")
ignore_cores.append(server)
critical_error_pattern = r"Assertion.*failed|AddressSanitizer"
if server not in ignore_cores:
critical_error_pattern += "|Aborting on shard"
if found_critical := await log_file.grep(critical_error_pattern):
errors[server]["critical"] = [e[0] for e in found_critical]
# Find the backtraces for the critical errors
if found_backtraces := await log_file.find_backtraces():
errors[server]["backtraces"] = found_backtraces
if check_all_errors:
if found_errors := await log_file.grep_for_errors(distinct_errors=True):
if filtered_errors := await self.filter_errors(found_errors):
errors[server]["error"] = filtered_errors
# find core files
for server, cores in (await self.find_cores()).items():
errors[server]["cores"] = cores
# add log file path to the report for servers that had errors or cores
for server in await self.all_servers():
log_file = await self.server_open_log(server_id=server.server_id)
if server in errors:
errors[server]["log"] = log_file.file.name
return errors
async def filter_errors(self, errors: list[str]):
exclude_errors_pattern = re.compile("|".join(f"{p}" for p in {
*self.ignore_log_patterns,
*self.ignore_cores_log_patterns,
r"Compaction for .* deliberately stopped",
r"update compaction history failed:.*ignored",
# We may stop nodes that have not finished starting yet.
r"(Startup|start) failed:.*(seastar::sleep_aborted|raft::request_aborted)",
r"Timer callback failed: seastar::gate_closed_exception",
# Ignore expected RPC errors when nodes are stopped.
r"rpc - client .*(connection dropped|fail to connect)",
# We see benign RPC errors when nodes start/stop.
# If they cause system malfunction, it should be detected using higher-level tests.
r"rpc::unknown_verb_error",
r"raft_rpc - Failed to send",
r"raft_topology.*(seastar::broken_promise|rpc::closed_error)",
# Expected tablet migration stream failure where a node is stopped.
# Refs: https://github.com/scylladb/scylladb/issues/19640
r"Failed to handle STREAM_MUTATION_FRAGMENTS.*rpc::stream_closed",
# Expected Raft errors on decommission-abort or node restart with MV.
r"raft_topology - raft_topology_cmd.*failed with: raft::request_aborted",
}))
return [e for e in errors if not exclude_errors_pattern.search(e)]
async def find_cores(self) -> dict[ServerInfo, list[str]]:
"""Find core files on all servers"""
# find *.core files in current dir
cores = [str(core_file.absolute()) for core_file in pathlib.Path('.').glob('*.core')]
server_cores = dict()
# match core files to servers by pid
for server in await self.all_servers():
if found_cores := [core for core in cores if f".{server.pid}." in core]:
server_cores[server] = found_cores
return server_cores
async def gather_related_logs(self, failed_test_path_dir: Path, logs: Dict[str, Path]) -> None:
for server in await self.all_servers():
@@ -212,8 +292,7 @@ class ManagerClient:
except RuntimeError as exc:
raise Exception("Failed to get list of running servers") from exc
assert isinstance(server_info_list, list), "running_servers got unknown data type"
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
for info in server_info_list]
return [ServerInfo(*info) for info in server_info_list]
async def all_servers(self) -> list[ServerInfo]:
"""Get List of server info (id and IP address) of all servers"""
@@ -222,8 +301,7 @@ class ManagerClient:
except RuntimeError as exc:
raise Exception("Failed to get list of servers") from exc
assert isinstance(server_info_list, list), "all_servers got unknown data type"
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
for info in server_info_list]
return [ServerInfo(*info) for info in server_info_list]
async def starting_servers(self) -> list[ServerInfo]:
"""Get List of server info (id and IP address) of servers currently
@@ -236,8 +314,7 @@ class ManagerClient:
except RuntimeError as exc:
raise Exception("Failed to get list of starting servers") from exc
assert isinstance(server_info_list, list), "starting_servers got unknown data type"
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
for info in server_info_list]
return [ServerInfo(*info) for info in server_info_list]
async def mark_dirty(self) -> None:
"""Manually mark current cluster dirty.
@@ -276,6 +353,9 @@ class ManagerClient:
Replace CLI options and environment variables with `cmdline_options_override` and `append_env_override`
if provided.
"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
logger.debug("ManagerClient starting %s", server_id)
data = {
"expected_error": expected_error,
@@ -412,6 +492,9 @@ class ManagerClient:
expected_server_up_state: Optional[ServerUpState] = None,
connect_driver: bool = True) -> ServerInfo:
"""Add a new server"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
try:
data = self._create_server_add_data(
replace_cfg,
@@ -440,11 +523,7 @@ class ManagerClient:
except Exception as exc:
raise Exception("Failed to add server") from exc
try:
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
IPAddress(server_info["ip_addr"]),
IPAddress(server_info["rpc_address"]),
server_info["datacenter"],
server_info["rack"])
s_info = ServerInfo(**server_info)
except Exception as exc:
raise RuntimeError(f"server_add got invalid server data {server_info}") from exc
logger.debug("ManagerClient added %s", s_info)
@@ -473,6 +552,9 @@ class ManagerClient:
assert servers_num > 0, f"servers_add: cannot add {servers_num} servers, servers_num must be positive"
assert not (property_file and auto_rack_dc), f"Either property_file or auto_rack_dc can be provided, but not both"
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
if auto_rack_dc:
property_file = [{"dc":auto_rack_dc, "rack":f"rack{i+1}"} for i in range(servers_num)]
@@ -489,11 +571,7 @@ class ManagerClient:
s_infos = list[ServerInfo]()
for server_info in server_infos:
try:
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
IPAddress(server_info["ip_addr"]),
IPAddress(server_info["rpc_address"]),
server_info["datacenter"],
server_info["rack"])
s_info = ServerInfo(**server_info)
s_infos.append(s_info)
except Exception as exc:
raise RuntimeError(f"servers_add got invalid server data {server_info}") from exc
@@ -512,6 +590,9 @@ class ManagerClient:
wait_removed_dead: bool = True,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT) -> None:
"""Invoke remove node Scylla REST API for a specified server"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
logger.debug("ManagerClient remove node %s on initiator %s", server_id, initiator_id)
# If we remove a node, we should wait until other nodes see it as dead
@@ -532,6 +613,9 @@ class ManagerClient:
expected_error: str | None = None,
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT) -> None:
"""Tell a node to decommission with Scylla REST API"""
if expected_error is not None:
self.ignore_log_patterns.append(re.escape(expected_error))
logger.debug("ManagerClient decommission %s", server_id)
data = {"expected_error": expected_error}
await self.client.put_json(f"/cluster/decommission-node/{server_id}", data,

View File

@@ -477,7 +477,8 @@ class ScyllaServer:
return "DEFAULT_RACK"
def server_info(self) -> ServerInfo:
return ServerInfo(self.server_id, self.ip_addr, self.rpc_address, self.datacenter, self.rack)
pid = self.cmd.pid if self.cmd else None
return ServerInfo(self.server_id, self.ip_addr, self.rpc_address, self.datacenter, self.rack, pid)
def change_rpc_address(self, rpc_address: IPAddress) -> None:
"""Change RPC IP address of the current server. Pre: the server is

View File

@@ -397,3 +397,80 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}, increasing from 1 to 2 tablets", from_mark=mark)
await assert_resize_task_info(table_id, lambda response: len(response) == 2 and all(r.resize_task_info is None for r in response))
# Verify that new sstable produced by repair cannot be split, if disk utilization level is critical.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes_factory: Callable) -> None:
cfg = {
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
logger.info("Create and populate test table")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 2}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
table_id = (await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{table}'"))[0].id
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 64)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
async def run_split():
await manager.api.enable_injection(coord_serv.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
# force split on the test table
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 4}}")
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
async def generate_repair_work():
insert_stmt = cql.prepare(f"INSERT INTO {cf} (pk, t) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, f'{k}')) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
await generate_repair_work()
await manager.api.enable_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait", one_shot=True)
token = 'all'
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, table, token))
# Emit split decision during repair.
await run_split()
await log.wait_for("maybe_split_new_sstable_wait: waiting", from_mark=mark)
await manager.api.disable_injection(coord_serv.ip_addr, "tablet_resize_finalization_postpone")
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
await manager.api.message_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait")
# Expect repair to fail when splitting new sstables
await log.wait_for("Repair for tablet migration of .* failed", from_mark=mark)
await log.wait_for("Cannot split .* because manager has compaction disabled", from_mark=mark)
assert await log.grep(f"compaction .* Split {cf}", from_mark=mark) == []
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
await repair_task
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}", from_mark=mark)

View File

@@ -1,6 +1,10 @@
## Copyright 2025-present ScyllaDB
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# This file is generated by cargo-toml-template. Do not edit directly.
# To make changes, edit the template and regenerate with command:
# "$ cargo-toml-template > Cargo.toml".
[workspace]
members = ["crates/*"]
default-members = ["crates/validator"]
@@ -12,13 +16,16 @@ edition = "2024"
[workspace.dependencies]
anyhow = "1.0.97"
async-backtrace = "0.2.7"
futures = "0.3.31"
scylla = { version = "1.2.0", features = ["time-03"] }
tokio = { version = "1.44.1", features = ["full"] }
tracing = "0.1.41"
uuid = "1.16.0"
vector-search-validator-engine = { git = "https://github.com/scylladb/vector-store.git", rev = "3ee46a5" }
vector-search-validator-tests = { git = "https://github.com/scylladb/vector-store.git", rev = "3ee46a5" }
httpclient = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
vector-search-validator-engine = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
vector-search-validator-tests = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
vector-store = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
[patch.'https://github.com/scylladb/vector-store.git']
[patch.'https://github.com/scylladb/scylladb.git']
vector-search-validator-scylla = { path = "crates/validator-scylla" }

View File

@@ -8,6 +8,8 @@ namespace to separate it from the host environment. `vector-search-validator`
contains DNS server and all tests in one binary. It uses external scylla and
vector-store binaries.
## Running tests
The `test_validator.py::test_validator[test-case]` is the entry point for
running the tests. It is parametrized with name of the test case. Available
test cases are taken dynamically from the `vector-search-validator` binary.
@@ -37,6 +39,22 @@ $ pytest --mode=dev test/vector_search_validator/test_validator.py --filters fil
Logs are stored in
`testlog/{mode}/vector_search_validator/{test-case}-{run_id}/` directory.
Implementing new test cases on the Scylla repository side means adding new test
in crate `crates/validator-scylla`.
## Development of test cases
`vector-search-validator` (in short `validator`) is divided into multiple
crates:
- `validator` - a main crate that contains only the entry point
- `validator-scylla` - contains implementation of the validator tests on the
scylladb.git side. If you want to add/modify the tests implemented in the
scylladb.git, you will work in this crate.
- `vector-store.git/validator-engine` - contains the core logic of the
validator - overall test runner and implementation of actors for tests (dns
server, scylla cluster, vector store cluster)
- `vector-store.git/validator-tests` - contains the core logic of the framework
tests, provides base structures for tests and actor interfaces. In the
future we should check if it is possible to merge it with `validator-engine`
crate.
- `vector-store.git/validator-vector-store` - contains implementation of the
validator tests on the vector-store.git side. If you want to add/modify the
tests implemented in the vector-store.git, you will work in this crate.

View File

@@ -1,2 +1,2 @@
VECTOR_STORE_GIT=https://github.com/scylladb/vector-store.git
VECTOR_STORE_REV=3ee46a5
VECTOR_STORE_REV=d79ee80

View File

@@ -11,6 +11,10 @@ cat << EOF
## Copyright 2025-present ScyllaDB
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# This file is generated by cargo-toml-template. Do not edit directly.
# To make changes, edit the template and regenerate with command:
# "\$ cargo-toml-template > Cargo.toml".
[workspace]
members = ["crates/*"]
default-members = ["crates/validator"]
@@ -22,14 +26,17 @@ edition = "2024"
[workspace.dependencies]
anyhow = "1.0.97"
async-backtrace = "0.2.7"
futures = "0.3.31"
scylla = { version = "1.2.0", features = ["time-03"] }
tokio = { version = "1.44.1", features = ["full"] }
tracing = "0.1.41"
uuid = "1.16.0"
httpclient = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
vector-search-validator-engine = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
vector-search-validator-tests = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
vector-store = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
[patch.'$VECTOR_STORE_GIT']
[patch.'https://github.com/scylladb/scylladb.git']
vector-search-validator-scylla = { path = "crates/validator-scylla" }
EOF

View File

@@ -4,5 +4,11 @@ version = "0.1.0"
edition = "2024"
[dependencies]
async-backtrace.workspace = true
httpclient.workspace = true
scylla.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
vector-search-validator-tests.workspace = true
vector-store.workspace = true

View File

@@ -0,0 +1,302 @@
/*
* Copyright 2025-present ScyllaDB
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
use async_backtrace::framed;
use httpclient::HttpClient;
use scylla::client::session::Session;
use scylla::client::session_builder::SessionBuilder;
use scylla::response::query_result::QueryRowsResult;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;
use tokio::time;
use tracing::info;
use uuid::Uuid;
use vector_search_validator_tests::DnsExt;
use vector_search_validator_tests::ScyllaClusterExt;
use vector_search_validator_tests::ScyllaNodeConfig;
use vector_search_validator_tests::TestActors;
use vector_search_validator_tests::VectorStoreClusterExt;
use vector_search_validator_tests::VectorStoreNodeConfig;
use vector_store::httproutes::IndexStatus;
use vector_store::IndexInfo;
pub(crate) const DEFAULT_TEST_TIMEOUT: Duration = Duration::from_secs(120);
pub(crate) const VS_NAMES: [&str; 3] = ["vs1", "vs2", "vs3"];
pub(crate) const VS_PORT: u16 = 6080;
pub(crate) const DB_OCTET_1: u8 = 1;
pub(crate) const DB_OCTET_2: u8 = 2;
pub(crate) const DB_OCTET_3: u8 = 3;
pub(crate) const VS_OCTET_1: u8 = 128;
pub(crate) const VS_OCTET_2: u8 = 129;
pub(crate) const VS_OCTET_3: u8 = 130;
#[framed]
pub(crate) async fn get_default_vs_urls(actors: &TestActors) -> Vec<String> {
let domain = actors.dns.domain().await;
VS_NAMES
.iter()
.map(|name| format!("http://{name}.{domain}:{VS_PORT}"))
.collect()
}
pub(crate) fn get_default_vs_ips(actors: &TestActors) -> Vec<Ipv4Addr> {
vec![
actors.services_subnet.ip(VS_OCTET_1),
actors.services_subnet.ip(VS_OCTET_2),
actors.services_subnet.ip(VS_OCTET_3),
]
}
pub(crate) fn get_default_db_ips(actors: &TestActors) -> Vec<Ipv4Addr> {
vec![
actors.services_subnet.ip(DB_OCTET_1),
actors.services_subnet.ip(DB_OCTET_2),
actors.services_subnet.ip(DB_OCTET_3),
]
}
#[framed]
pub(crate) async fn get_default_scylla_node_configs(actors: &TestActors) -> Vec<ScyllaNodeConfig> {
let default_vs_urls = get_default_vs_urls(actors).await;
get_default_db_ips(actors)
.iter()
.enumerate()
.map(|(i, &ip)| {
let mut vs_urls = default_vs_urls.clone();
ScyllaNodeConfig {
db_ip: ip,
primary_vs_uris: vec![vs_urls.remove(i)],
secondary_vs_uris: vs_urls,
}
})
.collect()
}
pub(crate) fn get_default_vs_node_configs(actors: &TestActors) -> Vec<VectorStoreNodeConfig> {
let db_ips = get_default_db_ips(actors);
get_default_vs_ips(actors)
.iter()
.zip(db_ips.iter())
.map(|(&vs_ip, &db_ip)| VectorStoreNodeConfig {
vs_ip,
db_ip,
envs: HashMap::new(),
})
.collect()
}
#[framed]
pub(crate) async fn init(actors: TestActors) {
info!("started");
let scylla_configs = get_default_scylla_node_configs(&actors).await;
let vs_configs = get_default_vs_node_configs(&actors);
init_with_config(actors, scylla_configs, vs_configs).await;
info!("finished");
}
#[framed]
pub(crate) async fn init_with_config(
actors: TestActors,
scylla_configs: Vec<ScyllaNodeConfig>,
vs_configs: Vec<VectorStoreNodeConfig>,
) {
let vs_ips = get_default_vs_ips(&actors);
for (name, ip) in VS_NAMES.iter().zip(vs_ips.iter()) {
actors.dns.upsert(name.to_string(), *ip).await;
}
actors.db.start(scylla_configs, None).await;
assert!(actors.db.wait_for_ready().await);
actors.vs.start(vs_configs).await;
assert!(actors.vs.wait_for_ready().await);
}
#[framed]
pub(crate) async fn cleanup(actors: TestActors) {
info!("started");
for name in VS_NAMES.iter() {
actors.dns.remove(name.to_string()).await;
}
actors.vs.stop().await;
actors.db.stop().await;
info!("finished");
}
#[framed]
pub(crate) async fn prepare_connection_with_custom_vs_ips(
actors: &TestActors,
vs_ips: Vec<Ipv4Addr>,
) -> (Arc<Session>, Vec<HttpClient>) {
let session = Arc::new(
SessionBuilder::new()
.known_node(actors.services_subnet.ip(DB_OCTET_1).to_string())
.build()
.await
.expect("failed to create session"),
);
let clients = vs_ips
.iter()
.map(|&ip| HttpClient::new((ip, VS_PORT).into()))
.collect();
(session, clients)
}
#[framed]
pub(crate) async fn wait_for<F, Fut>(mut condition: F, msg: &str, timeout: Duration)
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
time::timeout(timeout, async {
while !condition().await {
time::sleep(Duration::from_millis(100)).await;
}
})
.await
.unwrap_or_else(|_| panic!("Timeout on: {msg}"))
}
#[framed]
pub(crate) async fn wait_for_value<F, Fut, T>(mut poll_fn: F, msg: &str, timeout: Duration) -> T
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Option<T>>,
{
time::timeout(timeout, async {
loop {
if let Some(value) = poll_fn().await {
return value;
}
time::sleep(Duration::from_millis(100)).await;
}
})
.await
.unwrap_or_else(|_| panic!("Timeout on: {msg}"))
}
#[framed]
pub(crate) async fn wait_for_index(
client: &HttpClient,
index: &IndexInfo,
) -> vector_store::httproutes::IndexStatusResponse {
wait_for_value(
|| async {
match client.index_status(&index.keyspace, &index.index).await {
Ok(resp) if resp.status == IndexStatus::Serving => Some(resp),
_ => None,
}
},
"Waiting for index to be SERVING",
Duration::from_secs(20),
)
.await
}
#[framed]
pub(crate) async fn get_query_results(query: String, session: &Session) -> QueryRowsResult {
session
.query_unpaged(query, ())
.await
.expect("failed to run query")
.into_rows_result()
.expect("failed to get rows")
}
#[framed]
pub(crate) async fn create_keyspace(session: &Session) -> String {
let keyspace = format!("ks_{}", Uuid::new_v4().simple());
// Create keyspace with replication factor of 3 for the 3-node cluster
session.query_unpaged(
format!("CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}"),
(),
).await.expect("failed to create a keyspace");
// Use keyspace
session
.use_keyspace(&keyspace, false)
.await
.expect("failed to use a keyspace");
keyspace
}
#[framed]
pub(crate) async fn create_table(
session: &Session,
columns: &str,
options: Option<&str>,
) -> String {
let table = format!("tbl_{}", Uuid::new_v4().simple());
let extra = if let Some(options) = options {
format!("WITH {options}")
} else {
String::new()
};
// Create table
session
.query_unpaged(format!("CREATE TABLE {table} ({columns}) {extra}"), ())
.await
.expect("failed to create a table");
table
}
#[framed]
pub(crate) async fn create_index(
session: &Session,
clients: &[HttpClient],
table: &str,
column: &str,
) -> IndexInfo {
let index = format!("idx_{}", Uuid::new_v4().simple());
// Create index
session
.query_unpaged(
format!("CREATE INDEX {index} ON {table}({column}) USING 'vector_index'"),
(),
)
.await
.expect("failed to create an index");
// Wait for the index to be created
wait_for(
|| async {
for client in clients.iter() {
if !client
.indexes()
.await
.iter()
.any(|idx| idx.index.to_string() == index)
{
return false;
}
}
true
},
"Waiting for the first index to be created",
Duration::from_secs(10),
)
.await;
clients
.first()
.expect("No vector store clients provided")
.indexes()
.await
.into_iter()
.find(|idx| idx.index.to_string() == index)
.expect("index not found")
}

View File

@@ -3,12 +3,13 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
use std::time::Duration;
use vector_search_validator_tests::common;
use vector_search_validator_tests::*;
use crate::common;
use async_backtrace::framed;
use vector_search_validator_tests::TestCase;
#[framed]
pub(crate) async fn new() -> TestCase {
let timeout = Duration::from_secs(30);
let timeout = common::DEFAULT_TEST_TIMEOUT;
TestCase::empty()
.with_init(timeout, common::init)
.with_cleanup(timeout, common::cleanup)

View File

@@ -0,0 +1,115 @@
/*
* Copyright 2025-present ScyllaDB
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
use crate::common;
use async_backtrace::framed;
use tracing::info;
use vector_search_validator_tests::ScyllaClusterExt;
use vector_search_validator_tests::ScyllaNodeConfig;
use vector_search_validator_tests::TestActors;
use vector_search_validator_tests::TestCase;
use vector_search_validator_tests::VectorStoreNodeConfig;
#[framed]
pub(crate) async fn new() -> TestCase {
let timeout = common::DEFAULT_TEST_TIMEOUT;
TestCase::empty()
.with_cleanup(timeout, common::cleanup)
.with_test(
"secondary_uri_works_correctly",
timeout,
test_secondary_uri_works_correctly,
)
}
#[framed]
async fn test_secondary_uri_works_correctly(actors: TestActors) {
info!("started");
let vs_urls = common::get_default_vs_urls(&actors).await;
let vs_url = &vs_urls[0];
let scylla_configs: Vec<ScyllaNodeConfig> = vec![
ScyllaNodeConfig {
db_ip: actors.services_subnet.ip(common::DB_OCTET_1),
primary_vs_uris: vec![vs_url.clone()],
secondary_vs_uris: vec![],
},
ScyllaNodeConfig {
db_ip: actors.services_subnet.ip(common::DB_OCTET_2),
primary_vs_uris: vec![],
secondary_vs_uris: vec![vs_url.clone()],
},
ScyllaNodeConfig {
db_ip: actors.services_subnet.ip(common::DB_OCTET_3),
primary_vs_uris: vec![],
secondary_vs_uris: vec![vs_url.clone()],
},
];
let vs_configs = vec![VectorStoreNodeConfig {
vs_ip: actors.services_subnet.ip(common::VS_OCTET_1),
db_ip: actors.services_subnet.ip(common::DB_OCTET_1),
envs: Default::default(),
}];
common::init_with_config(actors.clone(), scylla_configs, vs_configs).await;
let vs_ips = vec![actors.services_subnet.ip(common::VS_OCTET_1)];
let (session, clients) = common::prepare_connection_with_custom_vs_ips(&actors, vs_ips).await;
let keyspace = common::create_keyspace(&session).await;
let table =
common::create_table(&session, "pk INT PRIMARY KEY, v VECTOR<FLOAT, 3>", None).await;
// Insert vectors
for i in 0..100 {
let embedding = vec![i as f32, (i * 2) as f32, (i * 3) as f32];
session
.query_unpaged(
format!("INSERT INTO {table} (pk, v) VALUES (?, ?)"),
(i, &embedding),
)
.await
.expect("failed to insert data");
}
let index = common::create_index(&session, &clients, &table, "v").await;
for client in &clients {
let index_status = common::wait_for_index(&client, &index).await;
assert_eq!(
index_status.count, 100,
"Expected 100 vectors to be indexed"
);
}
// Down the first node with primary URI
let first_node_ip = actors.services_subnet.ip(common::DB_OCTET_1);
info!("Bringing down node {first_node_ip}");
actors.db.down_node(first_node_ip).await;
// Should work via secondary URIs
let results = common::get_query_results(
format!("SELECT pk FROM {table} ORDER BY v ANN OF [0.0, 0.0, 0.0] LIMIT 10"),
&session,
)
.await;
let rows = results
.rows::<(i32,)>()
.expect("failed to get rows after node down");
assert!(
rows.rows_remaining() <= 10,
"Expected at most 10 results from ANN query after node down"
);
// Drop keyspace
session
.query_unpaged(format!("DROP KEYSPACE {keyspace}"), ())
.await
.expect("failed to drop a keyspace");
info!("finished");
}

View File

@@ -3,12 +3,19 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
mod common;
mod cql;
mod high_availability;
use async_backtrace::framed;
use vector_search_validator_tests::TestCase;
#[framed]
pub async fn test_cases() -> impl Iterator<Item = (String, TestCase)> {
vec![("cql", cql::new().await)]
.into_iter()
.map(|(name, test_case)| (name.to_string(), test_case))
vec![
("scylla_cql", cql::new().await),
("scylla_high_availability", high_availability::new().await),
]
.into_iter()
.map(|(name, test_case)| (name.to_string(), test_case))
}

View File

@@ -819,7 +819,7 @@ public:
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
virtual reader_permit make_compaction_reader_permit() const override { return _permit; }
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
virtual sstables::shared_sstable make_sstable() const override { return do_make_sstable(); }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return do_make_sstable(); }
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return do_configure_writer(std::move(origin)); }
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }
@@ -909,7 +909,7 @@ void scrub_operation(schema_ptr schema, reader_permit permit, const std::vector<
auto compaction_descriptor = compaction::compaction_descriptor(std::move(sstables));
compaction_descriptor.options = compaction::compaction_type_options::make_scrub(scrub_mode, compaction::compaction_type_options::scrub::quarantine_invalid_sstables::no);
compaction_descriptor.creator = [&compaction_group_view] (shard_id) { return compaction_group_view.make_sstable(); };
compaction_descriptor.creator = [&compaction_group_view] (shard_id) { return compaction_group_view.make_sstable(sstables::sstable_state::normal); };
compaction_descriptor.replacer = [] (compaction::compaction_completion_desc) { };
auto compaction_data = compaction::compaction_data{};

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-43-20251208
docker.io/scylladb/scylla-toolchain:fedora-43-20251217