Compare commits

...

23 Commits

Author SHA1 Message Date
Robert Bindar
c575bbf1e8 test_refresh_deletes_uploaded_sstables should wait for sstables to get deleted
SSTable unlinking is async, so in some cases it may happen that
the upload dir is not empty immediately after refresh is done.
This patch adjusts test_refresh_deletes_uploaded_sstables so
it waits with a timeout till the upload dir becomes empty
instead of just assuming the API will sync on sstables being
gone.

Fixes SCYLLADB-1190

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>

Closes scylladb/scylladb#29215
2026-03-26 08:43:14 +03:00
Marcin Maliszkiewicz
7fdd650009 Merge 'test: audit: clean up test helper class naming' from Dario Mirovic
Remove unused `pytest.mark.single_node` marker from `TestCQLAudit`.

Rename `TestCQLAudit` to `CQLAuditTester` to reflect that it is a test helper, not a test class. This avoids accidental pytest collection and subsequent warning about `__init__`.

Logs before the fixes:
```
test/cluster/test_audit.py:514: 14 warnings
  /home/dario/dev/scylladb/test/cluster/test_audit.py:514: PytestCollectionWarning: cannot collect test class 'TestCQLAudit' because it has a __init__ constructor (from: cluster/test_audit.py)
    @pytest.mark.single_node
```

Fixes SCYLLADB-1237

This is an addition to the latest master code. No backport needed.

Closes scylladb/scylladb#29237

* github.com:scylladb/scylladb:
  test: audit: rename TestCQLAudit to CQLAuditTester
  test: audit: remove unused pytest.mark.single_node
2026-03-25 15:30:16 +01:00
Dario Mirovic
552a2d0995 test: audit: rename TestCQLAudit to CQLAuditTester
pytest tries to collect tests for execution in several ways.
One is to pick all classes that start with 'Test'. Those classes
must not have custom '__init__' constructor. TestCQLAudit does.

TestCQLAudit after migration from test/cluster/dtest is not a test
class anymore, but rather a helper class. There are two ways to fix
this:
1. Add __init__ = False to the TestCQLAudit class
2. Rename it to not start with 'Test'

Option 2 feels better because the new name itself does not convey
the wrong message about its role.

Fixes SCYLLADB-1237
2026-03-25 13:21:08 +01:00
Dario Mirovic
73de865ca3 test: audit: remove unused pytest.mark.single_node
Remove unused pytest.mark.single_node in TestCQLAudit class.
This is a leftover from audit tests migration from
test/cluster/dtest to test/cluster.

Refs SCYLLADB-1237
2026-03-25 13:18:37 +01:00
Marcin Maliszkiewicz
f988ec18cb test/lib: fix port in-use detection in start_docker_service
Previously, the result of when_all was discarded. when_all stores
exceptions in the returned futures rather than throwing, so the outer
catch(in_use&) could never trigger. Now we capture the when_all result
and inspect each future individually to properly detect in_use from
either stream.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1216

Closes scylladb/scylladb#29219
2026-03-25 11:45:53 +02:00
Artsiom Mishuta
cd1679934c test/pylib: use exponential backoff in wait_for()
Change wait_for() defaults from period=1s/no backoff to period=0.1s
with 1.5x backoff capped at 1.0s. This catches fast conditions in
100ms instead of 1000ms, benefiting ~100 call sites automatically.

Add completion logging with elapsed time and iteration count.

Tested local with test/cluster/test_fencing.py::test_fence_hints (dev mode),
log output:

  wait_for(at_least_one_hint_failed) completed in 0.83s (4 iterations)
  wait_for(exactly_one_hint_sent) completed in 1.34s (5 iterations)

Fixes SCYLLADB-738

Closes scylladb/scylladb#29173
2026-03-24 23:49:49 +02:00
Botond Dénes
d52fbf7ada Merge 'test: cluster: Deflake test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces' from Dawid Mędrek
The test was flaky. The scenario looked like this:

1. Stop server 1.
2. Set its rf_rack_valid_keyspaces configuration option to true.
3. Create an RF-rack-invalid keyspace.
4. Start server 1 and expect a failure during start-up.

It was wrong. We cannot predict when the Raft mutation corresponding to
the newly created keyspace will arrive at the node or when it will be
processed. If the check of the RF-rack-valid keyspaces we perform at
start-up was done before that, it won't include the keyspace. This will
lead to a test failure.

Unfortunately, it's not feasible to perform a read barrier during
start-up. What's more, although it would help the test, it wouldn't be
useful otherwise. Because of that, we simply fix the test, at least for
now.

The new scenario looks like this:

1. Disable the rf_rack_valid_keyspaces configuration option on server 1.
2. Start the server.
3. Create an RF-rack-invalid keyspace.
4. Perform a read barrier on server 1. This will ensure that it has
   observed all Raft mutations, and we won't run into the same problem.
5. Stop the node.
6. Set its rf_rack_valid_keyspaces configuration option to true.
7. Try to start the node and observe a failure.

This will make the test perform consistently.

---

I ran the test (in dev mode, on my local machine) three times before
these changes, and three times with them. I include the time results
below.

Before:
```
real    0m47.570s
user    0m41.631s
sys     0m8.634s

real    0m50.495s
user    0m42.499s
sys     0m8.607s

real    0m50.375s
user    0m41.832s
sys     0m8.789s
```

After:
```
real    0m50.509s
user    0m43.535s
sys     0m9.715s

real    0m50.857s
user    0m44.185s
sys     0m9.811s

real    0m50.873s
user    0m44.289s
sys     0m9.737s
```

Fixes SCYLLADB-1137

Backport: The test is present on all supported branches, and so we
          should backport these changes to them.

Closes scylladb/scylladb#29218

* github.com:scylladb/scylladb:
  test: cluster: Deflake test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces
  test: cluster: Mark test with @pytest.mark.asyncio in test_multidc.py
2026-03-24 21:09:19 +02:00
Patryk Jędrzejczak
141aa2d696 Merge 'test/cluster/test_incremental_repair.py: fix typo + enable compaction DEBUG logs' from Botond Dénes
This PR contains two small improvements to `test_incremental_repair.py`
motivated by the sporadic failure of
`test_tablet_incremental_repair_and_scrubsstables_abort`.

The test fails with `assert 3 == 2` on `len(sst_add)` in the second
repair round. The extra SSTable has `repaired_at=0`, meaning scrub
unexpectedly produced more unrepaired SSTables than anticipated. Since
scrub (and compaction in general) logs at DEBUG level and the test did
not enable debug logging, the existing logs do not contain enough
information to determine the root cause.

**Commit 1** fixes a long-standing typo in the helper function name
(`preapre` -> `prepare`).

**Commit 2** enables `compaction=debug` for the Scylla nodes started by
`do_tablet_incremental_repair_and_ops`, which covers all
`test_tablet_incremental_repair_and_*` variants. This will capture full
compaction/scrub activity on the next reproduction, making the failure
diagnosable.

Refs: SCYLLADB-1086

Backport: test improvement, no backport

Closes scylladb/scylladb#29175

* https://github.com/scylladb/scylladb:
  test/cluster/test_incremental_repair.py: enable compaction DEBUG logs in do_tablet_incremental_repair_and_ops
  test/cluster/test_incremental_repair.py: fix typo preapre -> prepare
2026-03-24 16:27:01 +01:00
Ernest Zaslavsky
c670183be8 cmake: fix precompiled header (PCH) creation
Two issues prevented the precompiled header from compiling
successfully when using CMake directly (rather than the
configure.py + ninja build system):

a) Propagate build flags to Rust binding targets reusing the
   PCH. The wasmtime_bindings and inc targets reuse the PCH
   from scylla-precompiled-header, which is compiled with
   Seastar's flags (including sanitizer flags in
   Debug/Sanitize modes). Without matching compile options,
   the compiler rejects the PCH due to flag mismatch (e.g.,
   -fsanitize=address). Link these targets against
   Seastar::seastar to inherit the required compile options.

Closes scylladb/scylladb#28941
2026-03-24 15:53:40 +02:00
Dawid Mędrek
e639dcda0b test: cluster: Deflake test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces
The test was flaky. The scenario looked like this:

1. Stop server 1.
2. Set its rf_rack_valid_keyspaces configuration option to true.
3. Create an RF-rack-invalid keyspace.
4. Start server 1 and expect a failure during start-up.

It was wrong. We cannot predict when the Raft mutation corresponding to
the newly created keyspace will arrive at the node or when it will be
processed. If the check of the RF-rack-valid keyspaces we perform at
start-up was done before that, it won't include the keyspace. This will
lead to a test failure.

Unfortunately, it's not feasible to perform a read barrier during
start-up. What's more, although it would help the test, it wouldn't be
useful otherwise. Because of that, we simply fix the test, at least for
now.

The new scenario looks like this:

1. Disable the rf_rack_valid_keyspaces configuration option on server 1.
2. Start the server.
3. Create an RF-rack-invalid keyspace.
4. Perform a read barrier on server 1. This will ensure that it has
   observed all Raft mutations, and we won't run into the same problem.
5. Stop the node.
6. Set its rf_rack_valid_keyspaces configuration option to true.
7. Try to start the node and observe a failure.

This will make the test perform consistently.

---

I ran the test (in dev mode, on my local machine) three times before
these changes, and three times with them. I include the time results
below.

Before:
```
real    0m47.570s
user    0m41.631s
sys     0m8.634s

real    0m50.495s
user    0m42.499s
sys     0m8.607s

real    0m50.375s
user    0m41.832s
sys     0m8.789s
```

After:
```
real    0m50.509s
user    0m43.535s
sys     0m9.715s

real    0m50.857s
user    0m44.185s
sys     0m9.811s

real    0m50.873s
user    0m44.289s
sys     0m9.737s
```

Fixes SCYLLADB-1137
2026-03-24 14:27:36 +01:00
Patryk Jędrzejczak
503a6e2d7e locator: everywhere_replication_strategy: fix sanity_check_read_replicas when read_new is true
ERMs created in `calculate_vnode_effective_replication_map` have RF computed based
on the old token metadata during a topology change. The reading replicas, however,
are computed based on the new token metadata (`target_token_metadata`) when
`read_new` is true. That can create a mismatch for EverywhereStrategy during some
topology changes - RF can be equal to the number of reading replicas +-1. During
bootstrap, this can cause the
`everywhere_replication_strategy::sanity_check_read_replicas` check to fail in
debug mode.

We fix the check in this commit by allowing one more reading replica when
`read_new` is true.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1147

Closes scylladb/scylladb#29150
2026-03-24 13:43:39 +01:00
Jenkins Promoter
0f02c0d6fa Update pgo profiles - x86_64 2026-03-24 14:11:38 +02:00
Dawid Mędrek
4fead4baae test: cluster: Mark test with @pytest.mark.asyncio in test_multidc.py
One of the tests,
test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces,
didn't have the marker. Let's add it now.
2026-03-24 12:52:00 +01:00
Botond Dénes
ffd58ca1f0 Merge 'test: cluster: Deflake test_write_cl_any_to_dead_node_generates_hints' from Dawid Mędrek
Before these changes, we would send mutations to the node and
immediately query the metrics to see how many hints had been written.
However, that could lead to random failures of the test: even if the
mutations have finished executing, hints are stored asynchronously, so
we don't have a guarantee they have already been processed.

To prevent such failures, we rewrite the check: we will perform multiple
checks against the metrics until we have confirmed that the hints have
indeed been written or we hit the timeout.

We're generous with the timeout: we give the test 60 seconds. That
should be enough time to avoid flakiness even on super slow machines,
and if the test does fail, we will know something is really wrong.

As a bonus, we improve the test in general too. We explicitly express
the preconditions we rely on, as well as bump the log level. If the
test fails in the future, it might be very difficult do debug it
without this additional information.

Fixes SCYLLADB-1133

Backport: The test is present on all supported branches. To avoid
          running into more failures, we should backport these changes
          to them.

Closes scylladb/scylladb#29191

* github.com:scylladb/scylladb:
  test: cluster: Increase log level in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Await all mutations concurrently in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Specify min_tablet_count in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Use new_test_table in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Introduce auxiliary function keyspace_has_tablets
  test: cluster: Deflake test_write_cl_any_to_dead_node_generates_hints
2026-03-24 13:39:56 +02:00
Andrei Chekun
f6fd3bbea0 test.py: reduce timeout for one test
Reduce the timeout for one test to 60 minutes. The longest test we had
so far was ~10-15 minutes. So reducing this timeout is pretty safe and
should help with hanging tests.

Closes scylladb/scylladb#29212
2026-03-24 12:50:10 +02:00
Dawid Mędrek
148217bed6 test: cluster: Increase log level in test_write_cl_any_to_dead_node_generates_hints
We increase the log level of `hints_manager` to TRACE in the test.
If it fails, it may be incredibly difficult to debug it without any
additional information.
2026-03-23 19:19:17 +01:00
Dawid Mędrek
2b472fe7fd test: cluster: Await all mutations concurrently in test_write_cl_any_to_dead_node_generates_hints 2026-03-23 19:19:17 +01:00
Dawid Mędrek
ae12c712ce test: cluster: Specify min_tablet_count in test_write_cl_any_to_dead_node_generates_hints
The test relies on the assumption that mutations will be distributed
more or less uniformly over the nodes. Although in practice this should
not be possible, theoretically it's possible that there's only one
tablet allocated for the table.

To clearly indicate this precondition, we explicitly set the property
`min_tablet_count` when creating the table. This way, we have a gurantee
that the table has multiple tablets. The load balancer should now take
care of distributing them over the nodes equally. Thanks to that,
`servers[1]` will have some tablets, and so it'll be the target for some
of the mutations we perform.
2026-03-23 19:19:14 +01:00
Dawid Mędrek
dd446aa442 test: cluster: Use new_test_table in test_write_cl_any_to_dead_node_generates_hints
The context manager is the de-facto standard in the test suite. It will
also allow us for a prettier way to conditionally enable per-table
tablet options in the following commit.
2026-03-23 19:07:01 +01:00
Dawid Mędrek
dea79b09a9 test: cluster: Introduce auxiliary function keyspace_has_tablets
The function is adapted from its counterpart in the cqlpy test suite:
cqlpy/util.py::keyspace_has_tablets. We will use it in a commit in this
series to conditionally set tablet properties when creating a table.
It might also be useful in general.
2026-03-23 19:07:01 +01:00
Dawid Mędrek
3d04fd1d13 test: cluster: Deflake test_write_cl_any_to_dead_node_generates_hints
Before these changes, we would send mutations to the node and
immediately query the metrics to see how many hints had been written.
However, that could lead to random failures of the test: even if the
mutations have finished executing, hints are stored asynchronously, so
we don't have a guarantee they have already been processed.

To prevent such failures, we rewrite the check: we will perform multiple
checks against the metrics until we have confirmed that the hints have
indeed been written or we hit the timeout.

We're generous with the timeout: we give the test 60 seconds. That
should be enough time to avoid flakiness even on super slow machines,
and if the test does fail, we will know something is really wrong.

Fixes SCYLLADB-1133
2026-03-23 19:06:57 +01:00
Botond Dénes
f5438e0587 test/cluster/test_incremental_repair.py: enable compaction DEBUG logs in do_tablet_incremental_repair_and_ops
The test sporadically fails because scrub produces an unexpected number
of SSTables. Compaction logs are needed to diagnose why, but were not
captured since scrub runs at DEBUG level. Enable compaction=debug for
the servers started by do_tablet_incremental_repair_and_ops so the next
reproduction provides enough information to root-cause the issue.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-23 15:48:26 +02:00
Botond Dénes
f6ab576ed9 test/cluster/test_incremental_repair.py: fix typo preapre -> prepare
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-23 15:48:12 +02:00
13 changed files with 189 additions and 85 deletions

View File

@@ -42,7 +42,14 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
const auto replication_factor = erm.get_replication_factor();
if (read_replicas.size() > replication_factor) {
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
if (read_replicas.size() > replication_factor + 1) {
return seastar::format(
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
read_replicas.size(), replication_factor);
}
} else if (read_replicas.size() > replication_factor) {
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
}
return {};

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
size 6502668
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
size 6598648

View File

@@ -87,6 +87,11 @@ target_include_directories(wasmtime_bindings
target_link_libraries(wasmtime_bindings
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
# this PCH must have matching compile options, otherwise the compiler rejects
# the PCH due to flag mismatch (e.g., -fsanitize=address).
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
endif()
@@ -108,5 +113,6 @@ target_include_directories(inc
target_link_libraries(inc
INTERFACE Rust::rust_combined)
if (Scylla_USE_PRECOMPILED_HEADER_USE)
target_link_libraries(inc PRIVATE Seastar::seastar)
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
endif()

View File

@@ -181,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
help="Run only tests for given build mode(s)")
parser.add_argument('--repeat', action="store", default="1", type=int,
help="number of times to repeat test execution")
parser.add_argument('--timeout', action="store", default="24000", type=int,
parser.add_argument('--timeout', action="store", default="3600", type=int,
help="timeout value for single test execution")
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
help="timeout value for test.py/pytest session execution")

View File

@@ -511,8 +511,7 @@ class AuditBackendComposite(AuditBackend):
return rows_dict
@pytest.mark.single_node
class TestCQLAudit(AuditTester):
class CQLAuditTester(AuditTester):
"""
Make sure CQL statements are audited
"""
@@ -1763,7 +1762,7 @@ class TestCQLAudit(AuditTester):
async def test_audit_table_noauth(manager: ManagerClient):
"""Table backend, no auth, single node — groups all tests that share this config."""
t = TestCQLAudit(manager)
t = CQLAuditTester(manager)
await t.test_using_non_existent_keyspace(AuditBackendTable)
await t.test_audit_keyspace(AuditBackendTable)
await t.test_audit_keyspace_extra_parameter(AuditBackendTable)
@@ -1787,7 +1786,7 @@ async def test_audit_table_noauth(manager: ManagerClient):
async def test_audit_table_auth(manager: ManagerClient):
"""Table backend, auth enabled, single node."""
t = TestCQLAudit(manager)
t = CQLAuditTester(manager)
await t.test_user_password_masking(AuditBackendTable)
await t.test_negative_audit_records_auth()
await t.test_negative_audit_records_admin()
@@ -1803,7 +1802,7 @@ async def test_audit_table_auth(manager: ManagerClient):
async def test_audit_table_auth_multinode(manager: ManagerClient):
"""Table backend, auth enabled, multi-node (rf=3)."""
t = TestCQLAudit(manager)
t = CQLAuditTester(manager)
await t.test_negative_audit_records_ddl()
@@ -1811,49 +1810,49 @@ async def test_audit_table_auth_multinode(manager: ManagerClient):
async def test_audit_type_none_standalone(manager: ManagerClient):
"""audit=None — verify no auditing occurs."""
await TestCQLAudit(manager).test_audit_type_none()
await CQLAuditTester(manager).test_audit_type_none()
async def test_audit_type_invalid_standalone(manager: ManagerClient):
"""audit=invalid — server should fail to start."""
await TestCQLAudit(manager).test_audit_type_invalid()
await CQLAuditTester(manager).test_audit_type_invalid()
async def test_composite_audit_type_invalid_standalone(manager: ManagerClient):
"""audit=table,syslog,invalid — server should fail to start."""
await TestCQLAudit(manager).test_composite_audit_type_invalid()
await CQLAuditTester(manager).test_composite_audit_type_invalid()
async def test_audit_empty_settings_standalone(manager: ManagerClient):
"""audit=none — verify no auditing occurs."""
await TestCQLAudit(manager).test_audit_empty_settings()
await CQLAuditTester(manager).test_audit_empty_settings()
async def test_composite_audit_empty_settings_standalone(manager: ManagerClient):
"""audit=table,syslog,none — verify no auditing occurs."""
await TestCQLAudit(manager).test_composite_audit_empty_settings()
await CQLAuditTester(manager).test_composite_audit_empty_settings()
async def test_audit_categories_invalid_standalone(manager: ManagerClient):
"""Invalid audit_categories — server should fail to start."""
await TestCQLAudit(manager).test_audit_categories_invalid()
await CQLAuditTester(manager).test_audit_categories_invalid()
async def test_insert_failure_standalone(manager: ManagerClient):
"""7-node topology, audit=table, no auth — standalone due to unique topology."""
await TestCQLAudit(manager).test_insert_failure_doesnt_report_success()
await CQLAuditTester(manager).test_insert_failure_doesnt_report_success()
async def test_service_level_statements_standalone(manager: ManagerClient):
"""audit=table, auth, cmdline=--smp 1 — standalone due to special cmdline."""
await TestCQLAudit(manager).test_service_level_statements()
await CQLAuditTester(manager).test_service_level_statements()
# AuditBackendSyslog, no auth, rf=1
async def test_audit_syslog_noauth(manager: ManagerClient):
"""Syslog backend, no auth, single node."""
t = TestCQLAudit(manager)
t = CQLAuditTester(manager)
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
await t.test_using_non_existent_keyspace(Syslog)
await t.test_audit_keyspace(Syslog)
@@ -1870,7 +1869,7 @@ async def test_audit_syslog_noauth(manager: ManagerClient):
async def test_audit_syslog_auth(manager: ManagerClient):
"""Syslog backend, auth enabled, single node."""
t = TestCQLAudit(manager)
t = CQLAuditTester(manager)
Syslog = functools.partial(AuditBackendSyslog, socket_path=syslog_socket_path)
await t.test_user_password_masking(Syslog)
await t.test_role_password_masking(Syslog)
@@ -1881,7 +1880,7 @@ async def test_audit_syslog_auth(manager: ManagerClient):
async def test_audit_composite_noauth(manager: ManagerClient):
"""Composite backend (table+syslog), no auth, single node."""
t = TestCQLAudit(manager)
t = CQLAuditTester(manager)
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
await t.test_using_non_existent_keyspace(Composite)
await t.test_audit_keyspace(Composite)
@@ -1898,7 +1897,7 @@ async def test_audit_composite_noauth(manager: ManagerClient):
async def test_audit_composite_auth(manager: ManagerClient):
"""Composite backend (table+syslog), auth enabled, single node."""
t = TestCQLAudit(manager)
t = CQLAuditTester(manager)
Composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_path)
await t.test_user_password_masking(Composite)
await t.test_role_password_masking(Composite)
@@ -1910,29 +1909,29 @@ _composite = functools.partial(AuditBackendComposite, socket_path=syslog_socket_
@pytest.mark.parametrize("helper_class,config_changer", [
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
])
async def test_config_no_liveupdate(manager: ManagerClient, helper_class, config_changer):
"""Non-live audit config params (audit, audit_unix_socket_path, audit_syslog_write_buffer_size) must be unmodifiable."""
await TestCQLAudit(manager).test_config_no_liveupdate(helper_class, config_changer)
await CQLAuditTester(manager).test_config_no_liveupdate(helper_class, config_changer)
@pytest.mark.parametrize("helper_class,config_changer", [
pytest.param(AuditBackendTable, TestCQLAudit.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, TestCQLAudit.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, TestCQLAudit.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, TestCQLAudit.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, TestCQLAudit.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, TestCQLAudit.AuditCqlConfigChanger, id="composite-cql"),
pytest.param(AuditBackendTable, CQLAuditTester.AuditSighupConfigChanger, id="table-sighup"),
pytest.param(AuditBackendTable, CQLAuditTester.AuditCqlConfigChanger, id="table-cql"),
pytest.param(_syslog, CQLAuditTester.AuditSighupConfigChanger, id="syslog-sighup"),
pytest.param(_syslog, CQLAuditTester.AuditCqlConfigChanger, id="syslog-cql"),
pytest.param(_composite, CQLAuditTester.AuditSighupConfigChanger, id="composite-sighup"),
pytest.param(_composite, CQLAuditTester.AuditCqlConfigChanger, id="composite-cql"),
])
async def test_config_liveupdate(manager: ManagerClient, helper_class, config_changer):
"""Live-updatable audit config params (categories, keyspaces, tables) must be modifiable at runtime."""
await TestCQLAudit(manager).test_config_liveupdate(helper_class, config_changer)
await CQLAuditTester(manager).test_config_liveupdate(helper_class, config_changer)
@pytest.mark.parametrize("helper_class", [
@@ -1942,4 +1941,4 @@ async def test_config_liveupdate(manager: ManagerClient, helper_class, config_ch
])
async def test_parallel_syslog_audit(manager: ManagerClient, helper_class):
"""Cluster must not fail when multiple queries are audited in parallel."""
await TestCQLAudit(manager).test_parallel_syslog_audit(helper_class)
await CQLAuditTester(manager).test_parallel_syslog_audit(helper_class)

View File

@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
from test.pylib.tablets import get_tablet_replicas
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import wait_for
from test.pylib.util import gather_safely, wait_for
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
logger = logging.getLogger(__name__)
@@ -51,28 +51,42 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
@pytest.mark.asyncio
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
node_count = 2
servers = await manager.servers_add(node_count)
cmdline = ["--logger-log-level", "hints_manager=trace"]
servers = await manager.servers_add(node_count, cmdline=cmdline)
async def wait_for_hints_written(min_hint_count: int, timeout: int):
async def aux():
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
if hints_written >= min_hint_count:
return True
return None
assert await wait_for(aux, time.time() + timeout)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
uses_tablets = await keyspace_has_tablets(manager, ks)
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
# Otherwise, it could happen that all mutations would target servers[0] only, which would
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
# distributed more or less uniformly!
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
await manager.server_stop_gracefully(servers[1].server_id)
await manager.server_stop_gracefully(servers[1].server_id)
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ANY
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
for i in range(100):
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
# Verify hints are written
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
assert hints_after > hints_before
# Verify hints are written
await wait_for_hints_written(hints_before + 1, timeout=60)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
@pytest.mark.asyncio
async def test_limited_concurrency_of_writes(manager: ManagerClient):

View File

@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
repaired_keys = set(range(0, nr_keys))
unrepaired_keys = set()
@@ -164,7 +164,7 @@ async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
@pytest.mark.asyncio
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
await insert_keys(cql, ks, 0, 100)
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
token = -1
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
@pytest.mark.asyncio
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
# Disable autocompaction
for server in servers:
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
nr_keys = 100
cmdline = ["--hinted-handoff-enabled", "0"]
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
await manager.server_stop_gracefully(servers[1].server_id)
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
nr_keys = 100
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
nr_keys = 100
# Make sure no data commit log replay after force server stop
cmdline = ['--enable-commitlog', '0']
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
# First repair
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
@pytest.mark.asyncio
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
token = -1
sstables_repaired_at = 0
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
@pytest.mark.asyncio
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
time1 = 0
time2 = 0
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
cmdline = ['--logger-log-level', 'repair=debug']
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)

View File

@@ -20,6 +20,7 @@ from cassandra.query import SimpleStatement
from test.pylib.async_cql import _wrap_future
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, TextType, Column
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name
from test.cluster.conftest import cluster_con
@@ -403,6 +404,7 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
for task in [*valid_keyspaces, *invalid_keyspaces]:
_ = tg.create_task(task)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
"""
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
@@ -464,22 +466,50 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
for rfs, tablets in valid_keyspaces:
_ = tg.create_task(create_keyspace(rfs, tablets))
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
running_servers = await manager.running_servers()
should_start = s1.server_id not in [server.server_id for server in running_servers]
if should_start:
await manager.server_start(s1.server_id)
ks = await create_keyspace(rfs, True)
# We need to wait for the new schema to propagate.
# Otherwise, it's not clear when the mutation
# corresponding to the created keyspace will
# arrive at server 1.
# It could happen only after the node performs
# the check upon start-up, effectively leading
# to a successful start-up, which we don't want.
# For more context, see issue: SCYLLADB-1137.
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
_ = await manager.server_start(s1.server_id, expected_error=err)
await manager.server_start(s1.server_id, expected_error=err)
await cql.run_async(f"DROP KEYSPACE {ks}")
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
# Test RF-rack-invalid keyspaces.
await try_fail([2, 0], "dc1", 2, 3)
await try_fail([3, 2], "dc2", 2, 1)
await try_fail([4, 1], "dc1", 4, 3)
_ = await manager.server_start(s1.server_id)
# We need to perform a read barrier on the node to make
# sure that it processes the last DROP KEYSPACE.
# Otherwise, the node could think the RF-rack-invalid
# keyspace still exists.
await manager.server_start(s1.server_id)
await read_barrier(manager.api, s1.ip_addr)
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
await manager.server_start(s1.server_id)
@pytest.mark.asyncio
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):

View File

@@ -23,10 +23,25 @@ from test.cluster.object_store.conftest import format_tuples
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
from test.cluster.util import new_test_keyspace
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name
from test.pylib.util import unique_name, wait_for
logger = logging.getLogger(__name__)
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
'''
Wait until the upload directory is empty with a timeout.
SSTable unlinking is asynchronous and in rare situations, it can happen
that not all sstables are deleted from the upload dir immediately after refresh is done.
'''
deadline = time.time() + timeout
async def check_empty():
files = os.listdir(upload_dir)
if not files:
return True
return None
await wait_for(check_empty, deadline, period=0.5)
class SSTablesOnLocalStorage:
def __init__(self):
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
@@ -153,7 +168,8 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
for s in servers:
cf_dir = dirs[s.server_id]["cf_dir"]
files = os.listdir(os.path.join(cf_dir, 'upload'))
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
upload_dir = os.path.join(cf_dir, 'upload')
assert os.path.exists(upload_dir)
await wait_for_upload_dir_empty(upload_dir)
shutil.rmtree(tmpbackup)

View File

@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
tombstone_mark = datetime.now(timezone.utc)
# test #2: the tombstones are not cleaned up when one node is down
with pytest.raises(AssertionError, match="Deadline exceeded"):
with pytest.raises(AssertionError, match="timed out"):
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
await verify_tombstone_gc(tombstone_mark, timeout=5)
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
with pytest.raises(AssertionError, match="Deadline exceeded"):
with pytest.raises(AssertionError, match="timed out"):
await verify_tombstone_gc(tombstone_mark, timeout=5)
await manager.remove_node(servers[0].server_id, down_server.server_id)

View File

@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
unpublished_generations = topo_res[0].unpublished_cdc_generations
return unpublished_generations is None or len(unpublished_generations) == 0 or None
await wait_for(all_generations_published, deadline=deadline, period=1.0)
await wait_for(all_generations_published, deadline=deadline)
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
@@ -470,6 +470,17 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
"""
Checks whether the given keyspace uses tablets.
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
"""
cql = manager.get_cql()
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
rows = list(rows_iter)
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]

View File

@@ -271,10 +271,21 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
// arbitrary timeout of 120s for the server to make some output. Very generous.
// but since we (maybe) run docker, and might need to pull image, this can take
// some time if we're unlucky.
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
} catch (in_use&) {
retry = true;
p = std::current_exception();
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
for (auto* f : {&f1, &f2}) {
if (f->failed()) {
try {
f->get();
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
if (!p) {
p = std::current_exception();
}
}
}
}
} catch (...) {
p = std::current_exception();
}

View File

@@ -56,15 +56,25 @@ def unique_name(unique_name_prefix = 'test_'):
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float,
period: float = 1,
period: float = 0.1,
before_retry: Optional[Callable[[], Any]] = None,
backoff_factor: float = 1,
max_period: float = None) -> T:
backoff_factor: float = 1.5,
max_period: float = 1.0,
label: Optional[str] = None) -> T:
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
while True:
assert(time.time() < deadline), "Deadline exceeded, failing test."
elapsed = time.time() - start
assert time.time() < deadline, \
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
res = await pred()
if res is not None:
if retries > 0:
logger.debug(f"wait_for({tag}) completed "
f"in {elapsed:.2f}s ({retries} retries)")
return res
retries += 1
await asyncio.sleep(period)
period *= backoff_factor
if max_period is not None:
@@ -273,14 +283,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline)
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
async def view_is_built():
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline)
await wait_for(view_is_built, deadline, label=f"view_{name}")
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):