Compare commits

..

38 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
3d3bd1d239 replica: set_skip_when_empty() for rare error-path metrics
Add .set_skip_when_empty() to four metrics in replica/database.cc that
are only incremented on very rare error paths and are almost always zero:

- database::dropped_view_updates: view updates dropped due to overload.
  NOTE: this metric appears to never be incremented in the current
  codebase and may be a candidate for removal.
- database::multishard_query_failed_reader_stops: documented as a 'hard
  badness counter' that should always be zero. NOTE: no increment site
  was found in the current codebase; may be a candidate for removal.
- database::multishard_query_failed_reader_saves: documented as a 'hard
  badness counter' that should always be zero.
- database::total_writes_rejected_due_to_out_of_space_prevention: only
  fires when disk utilization is critical and user table writes are
  disabled, a very rare operational state.

These metrics create unnecessary reporting overhead when they are
perpetually zero. set_skip_when_empty() suppresses them from metrics
output until they become non-zero.

AI-Assisted: yes
Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
2026-04-06 14:41:49 +03:00
Avi Kivity
b4f652b7c1 test: fix flaky test_create_ks_auth by removing bad retry timeout
get_session() was passing timeout=0.1 to patient_exclusive_cql_connection
and patient_cql_connection, leaving only 0.1 seconds for the retry loop
in retry_till_success(). Since each connection attempt can take up to 5
seconds (connect_timeout=5), the retry loop effectively got only one
attempt with no chance to retry on transient NoHostAvailable errors.

Use the default timeout=30 seconds, consistent with all other callers.

Fixes: SCYLLADB-1373

Closes scylladb/scylladb#29332
2026-04-05 19:13:15 +03:00
Jenkins Promoter
ab4a2cdde2 Update pgo profiles - aarch64 2026-04-05 16:58:02 +03:00
Jenkins Promoter
b97cf0083c Update pgo profiles - x86_64 2026-04-05 16:00:15 +03:00
Nikos Dragazis
6d50e67bd2 scylla_swap_setup: Remove Before=swap.target dependency from swap unit
When a Scylla node starts, the scylla-image-setup.service invokes the
`scylla_swap_setup` script to provision swap. This script allocates a
swap file and creates a swap systemd unit to delegate control to
systemd. By default, systemd injects a Before=swap.target dependency
into every swap unit, allowing other services to use swap.target to wait
for swap to be enabled.

On Azure, this doesn't work so well because we store the swap file on
the ephemeral disk [1] which has network dependencies (`_netdev` mount
option, configured by cloud-init [2]). This makes the swap.target
indirectly depend on the network, leading to dependency cycles such as:

swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target
-> network.target -> systemd-resolved.service -> tmp.mount -> swap.target

This patch breaks the cycle by removing the swap unit from swap.target
using DefaultDependencies=no. The swap unit will still be activated via
WantedBy=multi-user.target, just not during early boot.

Although this problem is specific to Azure, this patch applies the fix
to all clouds to keep the code simple.

Fixes #26519.
Fixes SCYLLADB-1257

[1] https://github.com/scylladb/scylla-machine-image/pull/426
[2] https://github.com/canonical/cloud-init/pull/1213#issuecomment-1026065501

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>

Closes scylladb/scylladb#28504
2026-04-05 15:07:50 +03:00
Tomasz Grabiec
74542be5aa test: pylib: Ignore exceptions in wait_for()
ManagerClient::get_ready_cql() calls server_sees_others(), which waits
for servers to see each other as alive in gossip. If one of the
servers is still early in boot, RESTful API call to
"gossiper/endpoint/live" may fail. It throws an exception, which
currently terminates the wait_for() and propagates up, failing the test.

Fix this by ignoring errors when polling inside wait_for. In case of
timeout, we log the last exception. This should fix the problem not
only in this case, for all uses of wait_for().

Example output:

```
pred = <function ManagerClient.server_sees_others.<locals>._sees_min_others at 0x7f022af9a140>
deadline = 1775218828.9172852, period = 1.0, before_retry = None
backoff_factor = 1.5, max_period = 1.0, label = None

    async def wait_for(
            pred: Callable[[], Awaitable[Optional[T]]],
            deadline: float,
            period: float = 0.1,
            before_retry: Optional[Callable[[], Any]] = None,
            backoff_factor: float = 1.5,
            max_period: float = 1.0,
            label: Optional[str] = None) -> T:
        tag = label or getattr(pred, '__name__', 'unlabeled')
        start = time.time()
        retries = 0
        last_exception: Exception | None = None
        while True:
            elapsed = time.time() - start
            if time.time() >= deadline:
                timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
                if last_exception is not None:
                    timeout_msg += (
                        f"; last exception: {type(last_exception).__name__}: {last_exception}"
                    )
                    raise AssertionError(timeout_msg) from last_exception
                raise AssertionError(timeout_msg)

            try:
>               res = await pred()

test/pylib/util.py:80:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    async def _sees_min_others():
>       raise Exception("asd")
E       Exception: asd

test/pylib/manager_client.py:802: Exception

The above exception was the direct cause of the following exception:

manager = <test.pylib.manager_client.ManagerClient object at 0x7f022af7e7b0>

    @pytest.mark.asyncio
    async def test_auth_after_reset(manager: ManagerClient) -> None:
        servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1")
>       cql, _ = await manager.get_ready_cql(servers)

test/cluster/auth_cluster/test_auth_after_reset.py:33:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test/pylib/manager_client.py:137: in get_ready_cql
    await self.servers_see_each_other(servers)
test/pylib/manager_client.py:820: in servers_see_each_other
    await asyncio.gather(*others)
test/pylib/manager_client.py:806: in server_sees_others
    await wait_for(_sees_min_others, time() + interval, period=.5)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pred = <function ManagerClient.server_sees_others.<locals>._sees_min_others at 0x7f022af9a140>
deadline = 1775218828.9172852, period = 1.0, before_retry = None
backoff_factor = 1.5, max_period = 1.0, label = None

    async def wait_for(
            pred: Callable[[], Awaitable[Optional[T]]],
            deadline: float,
            period: float = 0.1,
            before_retry: Optional[Callable[[], Any]] = None,
            backoff_factor: float = 1.5,
            max_period: float = 1.0,
            label: Optional[str] = None) -> T:
        tag = label or getattr(pred, '__name__', 'unlabeled')
        start = time.time()
        retries = 0
        last_exception: Exception | None = None
        while True:
            elapsed = time.time() - start
            if time.time() >= deadline:
                timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
                if last_exception is not None:
                    timeout_msg += (
                        f"; last exception: {type(last_exception).__name__}: {last_exception}"
                    )
>                   raise AssertionError(timeout_msg) from last_exception
E                   AssertionError: wait_for(_sees_min_others) timed out after 45.30s (46 retries); last exception: Exception: asd

test/pylib/util.py:76: AssertionError
```

Fixes a failure observed in test_auth_after_reset:

```
manager = <test.pylib.manager_client.ManagerClient object at 0x7fb3740e1630>

    @pytest.mark.asyncio
    async def test_auth_after_reset(manager: ManagerClient) -> None:
        servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1")
        cql, _ = await manager.get_ready_cql(servers)
        await cql.run_async("ALTER ROLE cassandra WITH PASSWORD = 'forgotten_pwd'")

        logging.info("Stopping cluster")
        await asyncio.gather(*[manager.server_stop_gracefully(server.server_id) for server in servers])

        logging.info("Deleting sstables")
        for table in ["roles", "role_members", "role_attributes", "role_permissions"]:
            await asyncio.gather(*[manager.server_wipe_sstables(server.server_id, "system", table) for server in servers])

        logging.info("Starting cluster")
        # Don't try connect to the servers yet, with deleted superuser it will be possible only after
        # quorum is reached.
        await asyncio.gather(*[manager.server_start(server.server_id, connect_driver=False) for server in servers])

        logging.info("Waiting for CQL connection")
        await repeat_until_success(lambda: manager.driver_connect(auth_provider=PlainTextAuthProvider(username="cassandra", password="cassandra")))
>       await manager.get_ready_cql(servers)

test/cluster/auth_cluster/test_auth_after_reset.py:50:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test/pylib/manager_client.py:137: in get_ready_cql
    await self.servers_see_each_other(servers)
test/pylib/manager_client.py:819: in servers_see_each_other
    await asyncio.gather(*others)
test/pylib/manager_client.py:805: in server_sees_others
    await wait_for(_sees_min_others, time() + interval, period=.5)
test/pylib/util.py:71: in wait_for
    res = await pred()
test/pylib/manager_client.py:802: in _sees_min_others
    alive_nodes = await self.api.get_alive_endpoints(server_ip)
test/pylib/rest_client.py:243: in get_alive_endpoints
    data = await self.client.get_json(f"/gossiper/endpoint/live", host=node_ip)
test/pylib/rest_client.py:99: in get_json
    ret = await self._fetch("GET", resource_uri, response_type = "json", host = host,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <test.pylib.rest_client.TCPRESTClient object at 0x7fb2404a0650>
method = 'GET', resource = '/gossiper/endpoint/live', response_type = 'json'
host = '127.15.252.8', port = 10000, params = None, json = None, timeout = None
allow_failed = False

    async def _fetch(self, method: str, resource: str, response_type: Optional[str] = None,
                     host: Optional[str] = None, port: Optional[int] = None,
                     params: Optional[Mapping[str, str]] = None,
                     json: Optional[Mapping] = None, timeout: Optional[float] = None, allow_failed: bool = False) -> Any:
        # Can raise exception. See https://docs.aiohttp.org/en/latest/web_exceptions.html
        assert method in ["GET", "POST", "PUT", "DELETE"], f"Invalid HTTP request method {method}"
        assert response_type is None or response_type in ["text", "json"], \
                f"Invalid response type requested {response_type} (expected 'text' or 'json')"
        # Build the URI
        port = port if port else self.default_port if hasattr(self, "default_port") else None
        port_str = f":{port}" if port else ""
        assert host is not None or hasattr(self, "default_host"), "_fetch: missing host for " \
                "{method} {resource}"
        host_str = host if host is not None else self.default_host
        uri = self.uri_scheme + "://" + host_str + port_str + resource
        logging.debug(f"RESTClient fetching {method} {uri}")

        client_timeout = ClientTimeout(total = timeout if timeout is not None else 300)
        async with request(method, uri,
                           connector = self.connector if hasattr(self, "connector") else None,
                           params = params, json = json, timeout = client_timeout) as resp:
            if allow_failed:
                return await resp.json()
            if resp.status != 200:
                text = await resp.text()
>               raise HTTPError(uri, resp.status, params, json, text)
E               test.pylib.rest_client.HTTPError: HTTP error 404, uri: http://127.15.252.8:10000/gossiper/endpoint/live, params: None, json: None, body:
E               {"message": "Not found", "code": 404}

test/pylib/rest_client.py:77: HTTPError
```

Fixes: SCYLLADB-1367

Closes scylladb/scylladb#29323
2026-04-05 13:52:26 +03:00
Andrzej Jackowski
8c0920202b test: protect populate_range in row_cache_test from bad_alloc
When test_exception_safety_of_update_from_memtable was converted from
manual fail_after()/catch to with_allocation_failures() in 74db08165d,
the populate_range() call ended up inside the failure injection scope
without a scoped_critical_alloc_section guard. The other two tests
converted in the same commit (test_exception_safety_of_transitioning...
and test_exception_safety_of_partition_scan) were correctly guarded.

Without the guard, the allocation failure injector can sometimes
target an allocation point inside the cleanup path of populate_range().
In a rare corner case, this triggers a bad_alloc in a noexcept context
(reader_concurrency_semaphore::stop()), causing std::terminate.

Fixes SCYLLADB-1346

Closes scylladb/scylladb#29321
2026-04-04 21:13:26 +03:00
Botond Dénes
2c22d69793 Merge 'Pytest: fix variable handling in GSServer (mock) and ensure docker service logs go to test log as well' from Calle Wilund
Fixes: SCYLLADB-1106

* Small fix in scylla_cluster - remove debug print
* Fix GSServer::unpublish so it does not except if publish was not called beforehand
* Improve dockerized_server so mock server logs echo to the test log to help diagnose CI failures (because we don't collect log files from mocks etc, and in any case correlation will be much easier).

No backport needed.

Closes scylladb/scylladb#29112

* github.com:scylladb/scylladb:
  dockerized_service: Convert log reader to pipes and push to test log
  test::cluster::conftest::GSServer: Fix unpublish for when publish was not called
  scylla_cluster: Use thread safe future signalling
  scylla_cluster: Remove left-over debug printout
2026-04-03 06:38:05 +03:00
Raphael S. Carvalho
b6ebbbf036 test/cluster/test_tablets2: Fix test_split_stopped_on_shutdown race with stale log messages
The test was failing because the call to:

    await log.wait_for('Stopping.*ongoing compactions')

was missing the 'from_mark=log_mark' argument. The log mark was updated
(line: log_mark = await log.mark()) immediately after detecting
'splitting_mutation_writer_switch_wait: waiting', and just before
launching the shutdown task. However, the wait_for call on the following
line was scanning from the beginning of the log, not from that mark.

As a result, the search immediately matched old 'Stopping N tasks for N
ongoing compactions for table system.X due to table removal' messages
emitted during initial server bootstrap (for system.large_partitions,
system.large_rows, system.large_cells), rather than waiting for the
shutdown to actually stop the user-table split compaction.

This caused the test to prematurely send the message to the
'splitting_mutation_writer_switch_wait' injection. The split compaction
was unblocked before the shutdown had aborted it, so it completed
successfully. Since the split succeeded, 'Failed to complete splitting
of table' was never logged.

Meanwhile, 'storage_service_drain_wait' was blocking do_drain() waiting
for a message. With the split already done, the test was stuck waiting
for the expected failure log that would never come (600s timeout). At
the same time, after 60s the 'storage_service_drain_wait' injection
timed out internally, triggering on_internal_error() which -- with
--abort-on-internal-error=1 -- crashed the server (exit code -6).

Fix: pass from_mark=log_mark to the wait_for('Stopping.*ongoing
compactions') call so it only matches messages that appear after the
shutdown has started, ensuring the test correctly synchronizes with the
shutdown aborting the user-table split compaction before releasing the
injection.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1319.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Closes scylladb/scylladb#29311
2026-04-03 06:28:51 +03:00
Andrei Chekun
6526a78334 test.py: fix nodetool mock server port collision
Replace the random port selection with an OS-assigned port. We open
a temporary TCP socket, bind it to (ip, 0) with SO_REUSEADDR, read back
the port number the OS selected, then close the socket before launching
rest_api_mock.py.
Add reuse_address=True and reuse_port=True to TCPSite in rest_api_mock.py
so the server itself can also reclaim a TIME_WAIT port if needed.

Fixes: SCYLLADB-1275

Closes scylladb/scylladb#29314
2026-04-02 16:24:07 +02:00
Botond Dénes
eb78498e07 test: fix flaky test_timeout_is_applied_on_lookup by using eventually_true
On slow/overloaded CI machines the lowres_clock timer may not have
fired after the fixed 2x sleep, causing the assertion on
get_abort_exception() to fail. Replace the fixed sleep with
sleep(1x) + eventually_true() which retries with exponential backoff,
matching the pattern already used in test_time_based_cache_eviction.

Fixes: SCYLLADB-1311

Closes scylladb/scylladb#29299
2026-04-01 18:20:11 +03:00
Robert Bindar
e7527392c4 test: close clients if cluster teardown throws
make sure the driver is stopped even though cluster
teardown throws and avoid potential stale driver
connections entering infinite reconnect loops which
exhaust cpu resources.

Fixes: SCYLLADB-1189

Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>

Closes scylladb/scylladb#29230
2026-04-01 17:22:19 +03:00
Tomasz Grabiec
2ec47a8a21 tests: address_map_test: Fix flakiness in debug mode due to task reordering
Debug mode shuffles task position in the queue. So the following is possible:
 1) shard 1 calls manual_clock::advance(). This expires timers on shard 1 and queues a background smp call to shard 0 which will expire timers there
 2) the smp::submit_to(0, ...) from shard 1 called by the test sumbits the call
 3) shard 0 creates tasks for both calls, but (2) is run first, and preempts the reactor
 4) shard 1 sees the completion, completes m_svc.invoke_on(1, ..)
 5) shard 0 inserts the completion from (4) before task from (1)
 6) the check on shard 0: m.find(id1) fails because the timer is not expired yet

To fix that, wait for timer expiration on shard 0, so that the test
doesn't depend on task execution order.

Note: I was not able to reproduce the problem locally using test.py --mode
debug --repeat 1000.

It happens in jenkins very rarely. Which is expected as the scenario which
leads to this is quite unlikely.

Fixes SCYLLADB-1265

Closes scylladb/scylladb#29290
2026-04-01 17:17:35 +03:00
Aleksandra Martyniuk
4d4ce074bb test: node_ops_tasks_tree: reconnect driver after topology changes
The test exercises all five node operations (bootstrap, replace, rebuild,
removenode, decommission) and by the end only one node out of four
remains alive. The CQL driver session, however, still holds stale
references to the dead hosts in its connection pool and load-balancing
policy state.

When the new_test_keyspace context manager exits and attempts
DROP KEYSPACE, the driver routes the query to the dead hosts first,
gets ConnectionShutdown from each, and throws NoHostAvailable before
ever trying the single live node.

Fix by calling driver_connect() after the decommission step, which
closes the old session and creates a fresh one connected only to the
servers the test manager reports as running.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1313.

Closes scylladb/scylladb#29306
2026-04-01 17:13:11 +03:00
Botond Dénes
0351756b15 Merge 'test: fix fuzzy_test timeout in release mode' from Piotr Smaron
The multishard_query_test/fuzzy_test was timing out (SIGKILL after
15 minutes) in release mode CI.

In release mode the test generates up to 64 partitions with up to
1000 clustering rows and 1000 range tombstones each.  With deeply
nested randomly-generated types (e.g. frozen<map<varint,
frozen<map<frozen<tuple<...>>>>>>), this volume of data can exceed
the 15-minute CI timeout.

Reduce the release-mode clustering-row and range-tombstone
distributions from 0-1000 to 0-200.  This caps the worst case at
~12,800 rows -- still 2x the devel-mode maximum (0-100) and
sufficient to exercise multi-partition paged scanning with many
pages.

Fixes: SCYLLADB-1270

No need to backport for now, only appeared on master.

Closes scylladb/scylladb#29293

* github.com:scylladb/scylladb:
  test: clean up fuzzy_test_config and add comments
  test: fix fuzzy_test timeout in release mode
2026-04-01 11:50:15 +03:00
Andrei Chekun
18f41dcd71 test.py: introduce new scheduler for choosing job count
This commit improves how test.py chohoses the default number of
parallele jobs.
This update keeps logic of selecting number of jobs from memory and cpu limits
but simplifies the heuristic so it is smoother, easier to reason about.
This avoids discontinuities such as neighboring machine sizes producing
unexpectedly different job counts, and behaves more predictably on asymmetric
machines where CPU and RAM do not scale together.

Compared to the current threshold-based version, this approach:
- avoids hard jumps around memory cutoffs
- avoids bucketed debug scaling based on CPU count
- keeps CPU and memory as separate constraints and combines them in one place
- avoids double-penalizing debug mode
- is easier to tune later by adjusting a few constants instead of rewriting branching logic

Closes scylladb/scylladb#28904
2026-04-01 11:11:15 +03:00
Avi Kivity
d438e35cdd test/cluster: fix race in test_insert_failure_standalone audit log query
get_audit_partitions_for_operation() returns None when no audit log
rows are found. In _test_insert_failure_doesnt_report_success_assign_nodes,
this None is passed to set(), causing TypeError: 'NoneType' object is
not iterable.

The audit log entry may not yet be visible immediately after executing
the INSERT, so use wait_for() from test.pylib.util with exponential
backoff to poll until the entry appears. Import it as wait_for_async
to avoid shadowing the existing wait_for from test.cluster.dtest.dtest_class,
which has a different signature (timeout vs deadline).

Fixes SCYLLADB-1330

Closes scylladb/scylladb#29289
2026-04-01 10:59:02 +03:00
Botond Dénes
2d2ff4fbda sstables: use chunked_managed_vector for promoted indexes in partition_index_page
Switch _promoted_indexes storage in partition_index_page from
managed_vector to chunked_managed_vector to avoid large contiguous
allocations.

Avoid allocation failure (or crashes with --abort-on-internal-error)
when large partitions have enough promoted index entries to trigger a
large allocation with managed_vector.

Fixes: SCYLLADB-1315

Closes scylladb/scylladb#29283
2026-03-31 18:43:57 +03:00
Piotr Smaron
2ce409dca0 test: clean up fuzzy_test_config and add comments
Remove the unused timeout field from fuzzy_test_config.  It was
declared, initialized per build mode, and logged, but never actually
enforced anywhere.

Document the intentionally small max_size (1024 bytes) passed to
read_partitions_with_paged_scan in run_fuzzy_test_scan: it forces
many pages per scan to stress the paging and result-merging logic.
2026-03-31 17:13:26 +02:00
Piotr Smaron
df2924b2a3 test: fix fuzzy_test timeout in release mode
The multishard_query_test/fuzzy_test was timing out (SIGKILL after
15 minutes) in release mode CI.

In release mode the test generates up to 64 partitions with up to
1000 clustering rows and 1000 range tombstones each.  With deeply
nested randomly-generated types (e.g. frozen<map<varint,
frozen<map<frozen<tuple<...>>>>>>), this volume of data can exceed
the 15-minute CI timeout.

Reduce the release-mode clustering-row and range-tombstone
distributions from 0-1000 to 0-200.  This caps the worst case at
~12,800 rows -- still 2x the devel-mode maximum (0-100) and
sufficient to exercise multi-partition paged scanning with many
pages.

Fixes: SCYLLADB-1270
2026-03-31 17:13:06 +02:00
Piotr Szymaniak
6d8ec8a0c0 alternator: fix flaky test_update_condition_unused_entries_short_circuit
The test was flaky because it stopped dc2_node immediately after an
LWT write, before cross-DC replication could complete. The LWT commit
uses LOCAL_QUORUM, which only guarantees persistence in the
coordinator's DC. Replication to the remote DC is async background
work, and CAS mutations don't store hints. Stopping dc2_node could
drop in-flight RPCs, leaving DC1 without the mutation.

Fix by polling both live DC1 nodes after the write to confirm
cross-DC replication completed before stopping dc2_node. Both nodes
must have the data so that the later ConsistentRead=True
(LOCAL_QUORUM) read on restarted node1 is guaranteed to succeed.

Fixes SCYLLADB-1267

Closes scylladb/scylladb#29287
2026-03-31 16:50:51 +03:00
Dawid Mędrek
f040f1b703 Merge 'raft: remake the read barrier optimization' from Patryk Jędrzejczak
The approach taken in 1ae2ae50a6 turned
out to be incorrect. The Raft member requesting a read barrier could
incorrectly advance its commit_idx and break linearizability. We revert that
commit in this PR.

We also remake the read barrier optimization with a completely new approach.
We make the leader replicate to the non-voting requester of a read barrier if
its `commit_idx` is behind.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-998

No backport: the issue is present only in master.

Closes scylladb/scylladb#29216

* github.com:scylladb/scylladb:
  raft: speed up read barrier requested by non-voters
  Revert "raft: read_barrier: update local commit_idx to read_idx when it's safe"
2026-03-31 15:11:56 +02:00
Avi Kivity
216d39883a Merge 'test: audit: fix audit test syslog race' from Dario Mirovic
Fix two independent race conditions in the syslog audit test that cause intermittent `assert 2 <= 1` failures in `assert_entries_were_added`.

**Datagram ordering race:**
`UnixSockerListener` used `ThreadingUnixDatagramServer`, where each datagram spawns a new thread. The notification barrier in `get_lines()` assumes FIFO handling, but the notification thread can win the lock before an audit entry thread, so `clear_audit_logs()` misses entries that arrive moments later. Fix: switch to sequential `UnixDatagramServer`.

**Config reload race:**
The live-update path used `wait_for_config` (REST API poll on shard 0) which can return before `broadcast_to_all_shards()` completes. Fix: wait for `"completed re-reading configuration file"` in the server log after each SIGHUP, which guarantees all shards have the new config.

Fixes SCYLLADB-1277

This is CI improvement for the latest code. No need for backport.

Closes scylladb/scylladb#29282

* github.com:scylladb/scylladb:
  test: cluster: wait for full config reload in audit live-update path
  test: cluster: fix syslog listener datagram ordering race
2026-03-31 13:53:01 +03:00
Tomasz Grabiec
b355bb70c2 dtest/alternator: stop concurrent-requests test when workers hit limit
`test_limit_concurrent_requests` could create far more tables than intended
because worker threads looped indefinitely and only the probe path terminated
the test. In practice, workers often hit `RequestLimitExceeded` first, but the
test kept running and creating tables, increasing memory pressure and causing
flakiness due to bad_alloc errors in logs.

Fix by replacing the old probe-driven termination with worker-driven
termination. Workers now run until any worker sees
`RequestLimitExceeded`.

Fixes SCYLLADB-1181

Closes scylladb/scylladb#29270
2026-03-31 13:35:50 +03:00
Patryk Jędrzejczak
b9f82f6f23 raft_group0: join_group0: fix join hang when node joins group 0 before post_server_start
A joining node hung forever if the topology coordinator added it to the
group 0 configuration before the node reached `post_server_start`. In
that case, `server->get_configuration().contains(my_id)` returned true
and the node broke out of the join loop early, skipping
`post_server_start`. `_join_node_group0_started` was therefore never set,
so the node's `join_node_response` RPC handler blocked indefinitely.
Meanwhile the topology coordinator's `respond_to_joining_node` call
(which has no timeout) hung forever waiting for the reply that never came.

Fix by only taking the early-break path when not starting as a follower
(i.e. when the node is the discovery leader or is restarting). A joining
node must always reach `post_server_start`.

We also provide a regression test. It takes 6s in dev mode.

Fixes SCYLLADB-959

Closes scylladb/scylladb#29266
2026-03-31 12:33:56 +02:00
Dario Mirovic
0cb63fb669 test: cluster: wait for full config reload in audit live-update path
_apply_config_to_running_servers used wait_for_config (REST API poll)
to confirm live config updates. The REST API reads from shard 0 only,
so it can return before broadcast_to_all_shards() completes — other
shards may still have stale audit config, generating unexpected entries.
Additionally, server_remove_config_option for absent keys sent separate
SIGHUPs before server_update_config, and the single wait_for_config at
the end could match a completion from an earlier SIGHUP.

Wait for "completed re-reading configuration file" in the server log
after each SIGHUP-producing operation. This message is logged only
after both read_config() and broadcast_to_all_shards() finish,
guaranteeing all shards have the new config. Each operation gets its
own mark+wait so no stale completion is matched.

Fixes SCYLLADB-1277
2026-03-31 02:27:11 +02:00
Dario Mirovic
1d623196eb test: cluster: fix syslog listener datagram ordering race
UnixSockerListener used ThreadingUnixDatagramServer, which spawns a
new thread per datagram. The notification barrier in get_lines() relies
on all prior datagrams being handled before the notification. With
threading, the notification handler can win the lock before an audit
entry handler, so get_lines() returns before the entry is appended.
clear_audit_logs() then clears an incomplete buffer, and the late
entry leaks into the next test's before/after diff.

Switch to sequential UnixDatagramServer. The server thread now handles
datagrams in kernel FIFO order, so the notification is always processed
after all preceding audit entries.

Refs SCYLLADB-1277
2026-03-31 02:27:11 +02:00
Patryk Jędrzejczak
ba54b2272b raft: speed up read barrier requested by non-voters
We achieve this by making the leader replicate to the non-voting requester
of a read barrier if its commit_idx is behind.

There are some corner cases where the new `replicate_to(*opt_progress, true);`
call will be a no-op, while the corresponding call in `tick_leader()` would
result in sending the AppendEntries RPC to the follower. These cases are:
- `progress.state == follower_progress::state::PROBE && progress.probe_sent`,
- `progress.state == follower_progress::state::PIPELINE
  && progress.in_flight == follower_progress::max_in_flight`.
We could try to improve the optimization by including some of the cases above,
but it would only complicate the code without noticeable benefits (at least
for group0).

Note: this is the second attempt for this optimization. The first approach
turned out to be incorrect and was reverted in the previous commit. The
performance improvement is the same as in the previous case.
2026-03-30 15:56:24 +02:00
Patryk Jędrzejczak
4913acd742 Revert "raft: read_barrier: update local commit_idx to read_idx when it's safe"
This reverts commit 1ae2ae50a6.

The reverted change turned out to be incorrect. The Raft member requesting
a read barrier could incorrectly advance its commit_idx and break
linearizability. More details in
https://scylladb.atlassian.net/browse/SCYLLADB-998?focusedCommentId=42935
2026-03-30 15:56:24 +02:00
Andrzej Jackowski
ab43420d30 test: use exclusive driver connection in test_limited_concurrency_of_writes
Use get_cql_exclusive(node1) so the driver only connects to node1 and
never attempts to contact the stopped node2. The test was flaky because
the driver received `Host has been marked down or removed` from node2.

Fixes: SCYLLADB-1227

Closes scylladb/scylladb#29268
2026-03-30 11:50:44 +02:00
Botond Dénes
068a7894aa test/cluster: fix flaky test_cleanup_stop by using asyncio.sleep
The test was using time.sleep(1) (a blocking call) to wait after
scheduling the stop_compaction task, intending to let it register on
the server before releasing the sstable_cleanup_wait injection point.

However, time.sleep() blocks the asyncio event loop entirely, so the
asyncio.create_task(stop_compaction) task never gets to run during the
sleep. After the sleep, the directly-awaited message_injection() runs
first, releasing the injection point before stop_compaction is even
sent. By the time stop_compaction reaches Scylla, the cleanup has
already completed successfully -- no exception is raised and the test
fails.

Fix by replacing time.sleep(1) with await asyncio.sleep(1), which
yields control to the event loop and allows the stop_compaction task
to actually send its HTTP request before message_injection is called.

Fixes: SCYLLADB-834

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Closes scylladb/scylladb#29202
2026-03-30 11:40:47 +03:00
Nadav Har'El
d32fe72252 Merge 'alternator: check concurrency limit before memory acquisition' from Łukasz Paszkowski
Fix the ordering of the concurrency limit check in the Alternator HTTP server so it happens before memory acquisition, and reduce test pressure to avoid LSA exhaustion on the memory-constrained test node.

The patch moves the concurrency check to right after the content-length early-out, before any memory acquisition or I/O. The check was originally placed before memory acquisition but was inadvertently moved after it during a refactoring. This allowed unlimited requests to pile up consuming memory, reading bodies, verifying signatures, and decompressing — all before being rejected. Restores the original ordering and mirrors the CQL transport (`transport/server.cc`).

Lowers `concurrent_requests_limit` from 5 to 3 and the thread multiplier from 5 to 2 (6 threads instead of 25). This is still sufficient to reliably trigger RequestLimitExceeded, while keeping flush pressure within what 512MB per shard can sustain.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1248
Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1181

The test started to fail quite recently. It affects master only. No backport is needed. We might want to consider backporting a commit moving the concurrency check earlier.

Closes scylladb/scylladb#29272

* github.com:scylladb/scylladb:
  test: reduce concurrent-request-limit test pressure to avoid LSA exhaustion
  alternator: check concurrency limit before memory acquisition
2026-03-29 11:08:28 +03:00
Łukasz Paszkowski
b8e3ef0c64 test: reduce concurrent-request-limit test pressure to avoid LSA exhaustion
The test_limit_concurrent_requests dtest uses concurrent CreateTable
requests to verify Alternator's concurrency limiting.  Each admitted
CreateTable triggers Raft consensus, schema mutations, and memtable
flushes—all of which consume LSA memory.  On the 1 GB test node
(2 SMP × 512 MB), the original settings (limit=5, 25 threads) created
enough flush pressure to exhaust the LSA emergency reserve, producing
logalloc::bad_alloc errors in the node log.  The test was always
marginal under these settings and became flaky as new system tables
increased baseline LSA usage over time.

Lower concurrent_requests_limit from 5 to 3 and the thread multiplier
from 5 to 2 (6 threads total).  This is still well above the limit and
sufficient to reliably trigger RequestLimitExceeded, while keeping flush
pressure within what 512 MB per shard can sustain.
2026-03-28 20:40:33 +01:00
Łukasz Paszkowski
a86928caa1 alternator: check concurrency limit before memory acquisition
The concurrency limit check in the Alternator server was positioned after
memory acquisition (get_units), request body reading (read_entire_stream),
signature verification, and decompression. This allowed unlimited requests
to pile up consuming memory before being rejected, exhausting LSA memory
and causing logalloc::bad_alloc errors that cascade into Raft applier
and topology coordinator failures, breaking subsequent operations.

Without this fix, test_limit_concurrent_requests on a 1GB node produces
50 logalloc::bad_alloc errors and cascading failures: reads from
system.scylla_local fail, the Raft applier fiber stops, the topology
coordinator stops, and all subsequent CreateTable operations fail with
InternalServerError (500). With this fix, the cascade is eliminated --
admitted requests may still cause LSA pressure on a memory-constrained
node, but the server remains functional.

Move the concurrency check to right after the content-length early-out,
before any memory acquisition or I/O. This mirrors the CQL transport
which correctly checks concurrency before memory acquisition
(transport/server.cc).

The concurrency check was originally added in 1b8c946ad7 (Sep 2020)
*before* memory acquisition, which at the time lived inside with_gate
(after the concurrency gate). The ordering was inverted by f41dac2a3a
(Mar 2021, "avoid large contiguous allocation for request body"), which
moved get_units() earlier in the function to reserve memory before
reading the newly-introduced content stream -- but inadvertently also
moved it before the concurrency check. c3593462a4 (Mar 2025) further
worsened the situation by adding a 16MB fallback reservation for
requests without Content-Length and ungzip/deflate decompression steps
-- all before the concurrency check -- greatly increasing the memory
consumed by requests that would ultimately be rejected.
2026-03-28 20:40:33 +01:00
Calle Wilund
f1b3bff4a5 dockerized_service: Convert log reader to pipes and push to test log
Refs: SCYLLADB-1106

Ensures any stderr logs from mock services will echo to the test log
regardless of the log file we write. To help debug failed CI.
2026-03-24 12:35:42 +01:00
Calle Wilund
38aaed1ed4 test::cluster::conftest::GSServer: Fix unpublish for when publish was not called
Use checked dict access to check the set vars.

Fixes: SCYLLADB-1106
2026-03-24 12:33:56 +01:00
Calle Wilund
b382f3593c scylla_cluster: Use thread safe future signalling 2026-03-24 12:33:56 +01:00
Calle Wilund
b36dc80835 scylla_cluster: Remove left-over debug printout 2026-03-23 11:07:59 +01:00
37 changed files with 478 additions and 626 deletions

View File

@@ -699,6 +699,17 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
// for such a size.
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
}
// Check the concurrency limit early, before acquiring memory and
// reading the request body, to avoid piling up memory from excess
// requests that will be rejected anyway. This mirrors the CQL
// transport which also checks concurrency before memory acquisition
// (transport/server.cc).
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
// JSON parsing can allocate up to roughly 2x the size of the raw
// document, + a couple of bytes for maintenance.
// If the Content-Length of the request is not available, we assume
@@ -760,12 +771,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
_executor._stats.unsupported_operations++;
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
}
if (_pending_requests.get_count() >= _max_concurrent_requests) {
_executor._stats.requests_shed++;
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
}
_pending_requests.enter();
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
executor::client_state client_state(service::client_state::external_tag(),
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
if (!username.empty()) {

View File

@@ -143,15 +143,6 @@ public:
return value_type();
}
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
cache_value_ptr vp = _cache.find(key.key());
if (!vp) {
return false;
}
(*vp)->update_result_metadata_id(std::move(metadata_id));
return true;
}
template <typename Pred>
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
void remove_if(Pred&& pred) {

View File

@@ -260,10 +260,6 @@ public:
return _prepared_cache.find(key);
}
bool update_prepared_result_metadata_id(const prepared_cache_key_type& key, cql_metadata_id_type metadata_id) {
return _prepared_cache.update_result_metadata_id(key, std::move(metadata_id));
}
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared(

View File

@@ -52,7 +52,6 @@ public:
std::vector<sstring> warnings;
private:
cql_metadata_id_type _metadata_id;
bool _result_metadata_is_empty;
public:
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
@@ -72,15 +71,6 @@ public:
void calculate_metadata_id();
cql_metadata_id_type get_metadata_id() const;
bool result_metadata_is_empty() const {
return _result_metadata_is_empty;
}
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
_metadata_id = std::move(metadata_id);
_result_metadata_is_empty = false;
}
};
}

View File

@@ -49,7 +49,6 @@ prepared_statement::prepared_statement(
, partition_key_bind_indices(std::move(partition_key_bind_indices))
, warnings(std::move(warnings))
, _metadata_id(bytes{})
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
{
statement->set_audit_info(std::move(audit_info));
}

View File

@@ -9,6 +9,7 @@
import os
import sys
import shlex
import argparse
import psutil
from pathlib import Path
@@ -103,16 +104,41 @@ if __name__ == '__main__':
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
swapfile.chmod(0o600)
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
mount_point = find_mount_point(swap_directory)
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
# Add DefaultDependencies=no to the swap unit to avoid getting the default
# Before=swap.target dependency. We apply this to all clouds, but the
# requirement came from Azure:
#
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
# the network (After=network-online.target). By extension, this means that
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
# then the swap unit would be part of the swap.target which other services
# assume to be a local boot target, so we would end up with dependency cycles
# such as:
#
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
#
# By removing the automatic Before=swap.target, the swap unit is no longer
# part of swap.target, avoiding such cycles. The swap will still be
# activated via WantedBy=multi-user.target.
unit_data = '''
[Unit]
Description=swapfile
DefaultDependencies=no
After={}
Conflicts=umount.target
Before=umount.target
[Swap]
What={}
[Install]
WantedBy=multi-user.target
'''[1:-1].format(swapfile)
'''[1:-1].format(mount_unit, swapfile)
with swapunit.open('w') as f:
f.write(unit_data)
systemd_unit.reload()

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
size 6505524
oid sha256:54662978b9ce4a6e25790b1b0a5099e6063173ffa95a399a6287cf474376ed09
size 6595952

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
size 6598648
oid sha256:0cf44ea1fb2ae20de45d26fe8095054e60cb8700cddcb2fd79ef79705484b18a
size 6603780

View File

@@ -1098,7 +1098,8 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
// Make sure that only a leader or a node that is part of the config can request read barrier
// Nodes outside of the config may never get the data, so they will not be able to read it.
if (requester != _my_id && leader_state().tracker.find(requester) == nullptr) {
follower_progress* opt_progress = leader_state().tracker.find(requester);
if (requester != _my_id && opt_progress == nullptr) {
throw std::runtime_error(fmt::format("Read barrier requested by a node outside of the configuration {}", requester));
}
@@ -1109,19 +1110,23 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
return {};
}
// Optimization for read barriers requested on non-voters. A non-voter doesn't receive the read_quorum message, so
// it might update its commit index only after another leader tick, which would slow down wait_for_apply() at the
// end of the read barrier. Prevent that by replicating to the non-voting requester here.
if (requester != _my_id && opt_progress->commit_idx < _commit_idx && opt_progress->match_idx == _log.last_idx()
&& !opt_progress->can_vote) {
logger.trace("start_read_barrier[{}]: replicate to {} because follower commit_idx={} < commit_idx={}, "
"follower match_idx={} == last_idx={}, and follower can_vote={}",
_my_id, requester, opt_progress->commit_idx, _commit_idx, opt_progress->match_idx,
_log.last_idx(), opt_progress->can_vote);
replicate_to(*opt_progress, true);
}
read_id id = next_read_id();
logger.trace("start_read_barrier[{}] starting read barrier with id {}", _my_id, id);
return std::make_pair(id, _commit_idx);
}
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
// read_idx from the leader might not be replicated to the local node yet.
const bool in_local_log = read_idx <= _log.last_idx();
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
advance_commit_idx(read_idx);
}
}
void fsm::stop() {
if (is_leader()) {
// Become follower to stop accepting requests

View File

@@ -480,15 +480,6 @@ public:
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
// read index belongs to the current term.
//
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
// Completeness Property). Hence, updating the commit index is safe.
void maybe_update_commit_idx_for_read(index_t read_idx);
size_t in_memory_log_size() const {
return _log.in_memory_size();
}

View File

@@ -1571,7 +1571,6 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
co_return stop_iteration::no;
}
read_idx = std::get<index_t>(res);
_fsm->maybe_update_commit_idx_for_read(read_idx);
co_return stop_iteration::yes;
});

View File

@@ -635,8 +635,10 @@ database::setup_metrics() {
sm::description("Counts sstables that survived the clustering key filtering. "
"High value indicates that bloom filter is not very efficient and still have to access a lot of sstables to get data.")),
// NOTE: dropped_view_updates is registered as a metric but never incremented in the current
// codebase. Consider removing it entirely if it is confirmed dead.
sm::make_counter("dropped_view_updates", _cf_stats.dropped_view_updates,
sm::description("Counts the number of view updates that have been dropped due to cluster overload. "))(basic_level),
sm::description("Counts the number of view updates that have been dropped due to cluster overload. "))(basic_level).set_skip_when_empty(),
sm::make_counter("view_building_paused", _cf_stats.view_building_paused,
sm::description("Counts the number of times view building process was paused (e.g. due to node unavailability). ")),
@@ -655,7 +657,7 @@ database::setup_metrics() {
sm::description("Counts write operations which were rejected on the replica side because the per-partition limit was reached."))(basic_level),
sm::make_counter("total_writes_rejected_due_to_out_of_space_prevention", _stats->total_writes_rejected_due_to_out_of_space_prevention,
sm::description("Counts write operations which were rejected due to disabled user tables writes."))(basic_level),
sm::description("Counts write operations which were rejected due to disabled user tables writes."))(basic_level).set_skip_when_empty(),
sm::make_counter("total_reads_rate_limited", _stats->total_reads_rate_limited,
sm::description("Counts read operations which were rejected on the replica side because the per-partition limit was reached.")),
@@ -704,11 +706,13 @@ database::setup_metrics() {
sm::make_counter("multishard_query_unpopped_bytes", _stats->multishard_query_unpopped_bytes,
sm::description("The total number of bytes that were extracted from the shard reader but were unconsumed by the query and moved back into the reader.")),
// NOTE: multishard_query_failed_reader_stops appears to have no increment site in the
// current codebase. Consider removing it entirely if it is confirmed dead.
sm::make_counter("multishard_query_failed_reader_stops", _stats->multishard_query_failed_reader_stops,
sm::description("The number of times the stopping of a shard reader failed.")),
sm::description("The number of times the stopping of a shard reader failed.")).set_skip_when_empty(),
sm::make_counter("multishard_query_failed_reader_saves", _stats->multishard_query_failed_reader_saves,
sm::description("The number of times the saving of a shard reader failed.")),
sm::description("The number of times the saving of a shard reader failed.")).set_skip_when_empty(),
sm::make_total_operations("counter_cell_lock_acquisition", _cl_stats->lock_acquisitions,
sm::description("The number of acquired counter cell locks.")),

View File

@@ -538,6 +538,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
group0_id = g0_info.group0_id;
raft::server_address my_addr{my_id, {}};
bool starting_server_as_follower = false;
if (server == nullptr) {
// This is the first time discovery is run. Create and start a Raft server for group 0 on this node.
raft::configuration initial_configuration;
@@ -565,6 +566,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
// trigger an empty snapshot transfer.
nontrivial_snapshot = true;
} else {
starting_server_as_follower = true;
co_await handshaker->pre_server_start(g0_info);
}
@@ -591,7 +593,9 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
}
SCYLLA_ASSERT(server);
if (server->get_configuration().contains(my_id)) {
co_await utils::get_local_injector().inject("join_group0_pause_before_config_check",
utils::wait_for_message(std::chrono::minutes{5}));
if (!starting_server_as_follower && server->get_configuration().contains(my_id)) {
// True if we started a new group or completed a configuration change initiated earlier.
group0_log.info("server {} already in group 0 (id {}) as {}", my_id, group0_id,
server->get_configuration().can_vote(my_id)? "voter" : "non-voter");

View File

@@ -239,11 +239,9 @@ public:
// The i-th element corresponds to the i-th entry in _entries.
// Can be smaller than _entries. If _entries[i] doesn't have a matching element in _promoted_indexes then
// that entry doesn't have a promoted index.
// It's not chunked, because promoted index is present only when there are large partitions in the page,
// which also means the page will have typically only 1 entry due to summary:data_file size ratio.
// Kept separately to avoid paying for storage cost in pages where no entry has a promoted index,
// which is typical in workloads with small partitions.
managed_vector<promoted_index> _promoted_indexes;
lsa::chunked_managed_vector<promoted_index> _promoted_indexes;
public:
partition_index_page() = default;
partition_index_page(partition_index_page&&) noexcept = default;

73
test.py
View File

@@ -11,6 +11,7 @@ from __future__ import annotations
import argparse
import asyncio
import math
import shlex
import textwrap
from random import randint
@@ -73,6 +74,51 @@ PYTEST_RUNNER_DIRECTORIES = [
launch_time = time.monotonic()
class ThreadsCalculator:
"""
The ThreadsCalculator class calculates the number of jobs that can be run concurrently based on system
memory and CPU constraints. It allows resource reservation and configurable parameters for
flexible job scheduling in various modes, such as `debug`.
"""
def __init__(self,
modes: list[str],
min_system_memory_reserve: float = 5e9,
max_system_memory_reserve: float = 8e9,
system_memory_reserve_fraction = 16,
max_test_memory: float = 5e9,
test_memory_fraction: float = 8.0,
debug_test_memory_multiplier: float = 1.5,
debug_cpus_per_test_job=1.5,
non_debug_cpus_per_test_job: float =1.0,
non_debug_max_test_memory: float = 4e9
):
sys_mem = int(os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES"))
test_mem = min(sys_mem / test_memory_fraction, max_test_memory)
if "debug" in modes:
test_mem *= debug_test_memory_multiplier
system_memory_reserve = int(min(
max(sys_mem / system_memory_reserve_fraction, min_system_memory_reserve),
max_system_memory_reserve,
))
available_mem = max(0, sys_mem - system_memory_reserve)
is_debug = "debug" in modes
test_mem = min(
sys_mem / test_memory_fraction,
max_test_memory if is_debug else non_debug_max_test_memory,
)
if is_debug:
test_mem *= debug_test_memory_multiplier
self.cpus_per_test_job = (
debug_cpus_per_test_job if is_debug else non_debug_cpus_per_test_job
)
self.default_num_jobs_mem = max(1, int(available_mem // test_mem))
def get_number_of_threads(self, nr_cpus: int) -> int:
default_num_jobs_cpu = max(1, math.ceil(nr_cpus / self.cpus_per_test_job))
return min(self.default_num_jobs_mem, default_num_jobs_cpu)
class TabularConsoleOutput:
"""Print test progress to the console"""
@@ -273,6 +319,13 @@ def parse_cmd_line() -> argparse.Namespace:
if args.skip_patterns and args.k:
parser.error(palette.fail('arguments --skip and -k are mutually exclusive, please use only one of them'))
if not args.modes:
try:
args.modes = get_configured_modes()
except Exception:
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
raise
if not args.jobs:
if not args.cpus:
nr_cpus = multiprocessing.cpu_count()
@@ -280,19 +333,7 @@ def parse_cmd_line() -> argparse.Namespace:
nr_cpus = int(subprocess.check_output(
['taskset', '-c', args.cpus, 'python3', '-c',
'import os; print(len(os.sched_getaffinity(0)))']))
cpus_per_test_job = 1
sysmem = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
testmem = 6e9 if os.sysconf('SC_PAGE_SIZE') > 4096 else 2e9
default_num_jobs_mem = ((sysmem - 4e9) // testmem)
args.jobs = min(default_num_jobs_mem, nr_cpus // cpus_per_test_job)
if not args.modes:
try:
args.modes = get_configured_modes()
except Exception:
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
raise
args.jobs = ThreadsCalculator(args.modes).get_number_of_threads(nr_cpus)
if not args.coverage_modes and args.coverage:
args.coverage_modes = list(args.modes)
@@ -350,16 +391,12 @@ def run_pytest(options: argparse.Namespace) -> tuple[int, list[SimpleNamespace]]
if options.list_tests:
args.extend(['--collect-only', '--quiet', '--no-header'])
else:
threads = int(options.jobs)
# debug mode is very CPU and memory hungry, so we need to lower the number of threads to be able to finish tests
if 'debug' in options.modes:
threads = int(threads * 0.5)
args.extend([
"--log-level=DEBUG", # Capture logs
f'--junit-xml={junit_output_file}',
"-rf",
'--test-py-init',
f'-n{threads}',
f'-n{options.jobs}',
f'--tmpdir={temp_dir}',
f'--maxfail={options.max_failures}',
f'--alluredir={report_dir / f"allure_{HOST_ID}"}',

View File

@@ -18,7 +18,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/manual_clock.hh>
#include <seastar/util/later.hh>
#include <seastar/core/timer.hh>
#include <seastar/util/defer.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/util/alloc_failure_injector.hh>
@@ -290,12 +290,17 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
m.set_expiring(id1);
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
m.barrier().get();
promise<> shard0_timer_expired;
timer<manual_clock> shard0_timer([&shard0_timer_expired] {
shard0_timer_expired.set_value();
});
shard0_timer.arm(manual_clock::now() + expiration_time);
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
manual_clock::advance(expiration_time);
BOOST_CHECK(!m.find(id1));
return smp::submit_to(0, []{}); // Ensure shard 0 notices timer is expired.
}).get();
shard0_timer_expired.get_future().get();
BOOST_CHECK(!m.find(id1));
// Expiring entries are replicated

View File

@@ -1045,7 +1045,6 @@ validate_result_size(size_t i, schema_ptr schema, const utils::chunked_vector<mu
struct fuzzy_test_config {
uint32_t seed;
std::chrono::seconds timeout;
unsigned concurrency;
unsigned scans;
};
@@ -1077,6 +1076,9 @@ run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, sharded<replica::database>&
testlog.debug("[scan#{}]: seed={}, is_stateful={}, prange={}, ckranges={}", i, seed, is_stateful, partition_range,
partition_slice.default_row_ranges());
// Use a small max_size to force many pages per scan, stressing the
// paging and result-merging logic. With the large row limit here,
// the byte limit is typically the tighter bound.
const auto [results, npages] = read_partitions_with_paged_scan(db, schema, 1000, 1024, is_stateful, partition_range, partition_slice);
const auto expected_partitions = slice_partitions(*schema, mutations, partition_index_range, partition_slice);
@@ -1160,21 +1162,27 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
std::uniform_int_distribution<size_t>(0, 100), // range-tombstones
#else
// Keep these values moderate: with complex randomly-generated
// schemas (deeply nested frozen collections/UDTs), large row
// counts cause data generation and paged scanning to be very
// slow, leading to CI timeouts. The test's value comes from
// schema variety and paging correctness, not from sheer data
// volume.
std::uniform_int_distribution<size_t>(32, 64), // partitions
std::uniform_int_distribution<size_t>(0, 1000), // clustering-rows
std::uniform_int_distribution<size_t>(0, 1000), // range-tombstones
std::uniform_int_distribution<size_t>(0, 200), // clustering-rows
std::uniform_int_distribution<size_t>(0, 200), // range-tombstones
#endif
tests::default_timestamp_generator());
#if defined DEBUG
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{8}, 1, 1};
auto cfg = fuzzy_test_config{seed, 1, 1};
#elif defined DEVEL
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 2, 4};
auto cfg = fuzzy_test_config{seed, 2, 4};
#else
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 4, 8};
auto cfg = fuzzy_test_config{seed, 4, 8};
#endif
testlog.info("Running test workload with configuration: seed={}, timeout={}s, concurrency={}, scans={}", cfg.seed, cfg.timeout.count(),
testlog.info("Running test workload with configuration: seed={}, concurrency={}, scans={}", cfg.seed,
cfg.concurrency, cfg.scans);
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(tbl.schema), &compacted_frozen_mutations = tbl.compacted_frozen_mutations] {

View File

@@ -906,9 +906,13 @@ SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
BOOST_REQUIRE(!entry.permit.get_abort_exception());
sleep(ttl_timeout_test_timeout * 2).get();
// Don't waste time retrying before the timeout is up
sleep(ttl_timeout_test_timeout).get();
eventually_true([&entry] {
return bool(entry.permit.get_abort_exception());
});
BOOST_REQUIRE(entry.permit.get_abort_exception());
BOOST_REQUIRE_THROW(std::rethrow_exception(entry.permit.get_abort_exception()), seastar::named_semaphore_timed_out);
}

View File

@@ -2644,7 +2644,10 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
return rd;
};
populate_range(cache, population_range);
{
memory::scoped_critical_alloc_section dfg;
populate_range(cache, population_range);
}
auto rd1_v1 = assert_that(make_reader(population_range));
mutation_reader_opt snap;
auto close_snap = defer([&snap] {

View File

@@ -29,7 +29,6 @@
#include "test/lib/exception_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/test_utils.hh"
#include "transport/response.hh"
BOOST_AUTO_TEST_SUITE(schema_change_test)
@@ -702,16 +701,6 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
return cql3::metadata{columns_specification}.calculate_metadata_id();
}
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
columns_specification.reserve(columns.size());
for (const auto& column : columns) {
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
}
return columns_specification;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
const auto c = std::make_pair("id", uuid_type);
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
@@ -762,39 +751,6 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
}
BOOST_AUTO_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
auto stale_response_metadata_id = empty_metadata_id;
auto columns_specification = make_columns_specification({{"role", utf8_type}});
cql3::metadata rows_metadata(columns_specification);
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
std::move(empty_metadata_id),
std::move(stale_response_metadata_id)), true);
auto body_stream = std::move(resp).extract_body();
auto body = body_stream.linearize();
const auto* ptr = reinterpret_cast<const char*>(body.begin());
const auto flags_mask = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
const auto column_count = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
BOOST_REQUIRE_EQUAL(column_count, 1);
const auto metadata_id_length = read_be<uint16_t>(ptr);
ptr += sizeof(uint16_t);
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
reinterpret_cast<const bytes::value_type*>(ptr)));
}
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
auto compute_metadata_id_for_type = [&](

View File

@@ -1,328 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import dataclasses
import hashlib
import socket
import struct
import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.util import unique_name
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
# ---------------------------------------------------------------------------
# Minimal raw CQL v4 socket helpers with SCYLLA_USE_METADATA_ID extension.
#
# The standard Python driver never negotiates SCYLLA_USE_METADATA_ID and
# therefore never includes result_metadata_id in EXECUTE requests for
# protocol v4. In CQL v5 result_metadata_id exchange is mandatory and
# built into the wire format; until Scylla implements v5, this extension
# provides the same semantics on v4. The helpers below implement just
# enough of the CQL wire protocol to exercise the server-side prepared
# metadata promotion path introduced for v5 compatibility.
# ---------------------------------------------------------------------------
# CQL opcodes
_OP_STARTUP = 0x01
_OP_AUTH_RESPONSE = 0x0F
_OP_PREPARE = 0x09
_OP_EXECUTE = 0x0A
_OP_READY = 0x02
_OP_AUTHENTICATE = 0x03
_OP_RESULT = 0x08
_OP_AUTH_SUCCESS = 0x10
# RESULT kind codes
_RESULT_KIND_ROWS = 0x00000002
_RESULT_KIND_PREPARED = 0x00000004
# Rows metadata flags (bit positions in the uint32 flags field)
_META_NO_METADATA = 1 << 2
_META_METADATA_CHANGED = 1 << 3
# EXECUTE options flags (1-byte field in CQL v4)
_FLAG_SKIP_METADATA = 0x02
_FRAME_HEADER_SIZE = 9 # version(1)+flags(1)+stream(2)+opcode(1)+length(4)
_CQL_VERSION = "3.0.0"
_DEFAULT_CONSISTENCY = 0x0006 # LOCAL_QUORUM
def _pack_short(v: int) -> bytes:
return struct.pack(">H", v)
def _pack_int(v: int) -> bytes:
return struct.pack(">I", v)
def _short_bytes(b: bytes) -> bytes:
"""CQL [short bytes]: uint16 length prefix + payload."""
return _pack_short(len(b)) + b
def _long_string(s: str) -> bytes:
"""CQL [long string]: uint32 length prefix + UTF-8 bytes."""
b = s.encode()
return _pack_int(len(b)) + b
def _string_map(d: dict[str, str]) -> bytes:
"""CQL [string map]: uint16 count + (uint16-prefixed-string, uint16-prefixed-string)*."""
out = _pack_short(len(d))
for k, v in d.items():
out += _short_bytes(k.encode())
out += _short_bytes(v.encode())
return out
def _frame(opcode: int, body: bytes, stream: int) -> bytes:
"""Build a CQL v4 request frame."""
return struct.pack(">BBHBI", 0x04, 0x00, stream, opcode, len(body)) + body
def _recv_frame(sock: socket.socket) -> tuple[int, int, bytes]:
"""Read one CQL v4 response frame; return (stream, opcode, body)."""
header = b""
while len(header) < _FRAME_HEADER_SIZE:
chunk = sock.recv(_FRAME_HEADER_SIZE - len(header))
assert chunk, "Connection closed while reading frame header"
header += chunk
_version, _flags = struct.unpack(">BB", header[0:2])
stream = struct.unpack(">H", header[2:4])[0]
opcode = header[4]
length = struct.unpack(">I", header[5:9])[0]
body = b""
while len(body) < length:
chunk = sock.recv(length - len(body))
assert chunk, "Connection closed while reading frame body"
body += chunk
return stream, opcode, body
@dataclasses.dataclass
class ExecuteResult:
"""Parsed outcome of a ROWS EXECUTE response."""
metadata_changed: bool
no_metadata: bool
column_count: int
result_metadata_id: bytes | None
def _cql_connect(host: str, port: int, username: str, password: str) -> socket.socket:
"""
Open a raw TCP socket to *host*:*port* and perform the CQL v4 handshake,
negotiating the SCYLLA_USE_METADATA_ID extension so that result_metadata_id
is exchanged on the wire — identical to the mandatory CQL v5 behaviour.
"""
sock = socket.create_connection((host, port))
stream = 1
# STARTUP with SCYLLA_USE_METADATA_ID enables the v5-style metadata_id
# exchange for this v4 connection.
startup_opts = {"CQL_VERSION": _CQL_VERSION, "SCYLLA_USE_METADATA_ID": ""}
sock.sendall(_frame(_OP_STARTUP, _string_map(startup_opts), stream))
_, opcode, payload = _recv_frame(sock)
if opcode == _OP_READY:
return sock
assert opcode == _OP_AUTHENTICATE, (
f"Expected AUTHENTICATE(0x{_OP_AUTHENTICATE:02x}), got 0x{opcode:02x}"
)
# PlainText SASL token: NUL + username + NUL + password
creds = b"\x00" + username.encode() + b"\x00" + password.encode()
stream += 1
sock.sendall(_frame(_OP_AUTH_RESPONSE, _short_bytes(creds), stream))
_, auth_op, _ = _recv_frame(sock)
assert auth_op == _OP_AUTH_SUCCESS, f"Authentication failed: opcode=0x{auth_op:02x}"
return sock
def _cql_prepare(sock: socket.socket, stream: int, query: str) -> bytes:
"""PREPARE *query* and return the server-assigned query_id."""
sock.sendall(_frame(_OP_PREPARE, _long_string(query), stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_PREPARED, f"Expected PREPARED kind, got {kind}"
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
return bytes(payload[pos : pos + id_len])
def _cql_execute_with_metadata_id(
sock: socket.socket,
stream: int,
query_id: bytes,
result_metadata_id: bytes,
consistency: int = _DEFAULT_CONSISTENCY,
) -> ExecuteResult:
"""
Send EXECUTE carrying *result_metadata_id* on the wire.
With SCYLLA_USE_METADATA_ID active the server reads result_metadata_id
immediately after query_id (before the options block), mirroring CQL v5
wire format. SKIP_METADATA is set so a normal response returns no column
specs; only the METADATA_CHANGED promotion path returns actual metadata.
"""
# options block: [consistency: uint16][flags: byte]
options = struct.pack(">HB", consistency, _FLAG_SKIP_METADATA)
body = _short_bytes(query_id) + _short_bytes(result_metadata_id) + options
sock.sendall(_frame(_OP_EXECUTE, body, stream))
_, opcode, payload = _recv_frame(sock)
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
pos = 0
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
assert kind == _RESULT_KIND_ROWS, f"Expected ROWS kind, got {kind}"
meta_flags = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
column_count = struct.unpack(">I", payload[pos : pos + 4])[0]
pos += 4
metadata_changed = bool(meta_flags & _META_METADATA_CHANGED)
no_metadata = bool(meta_flags & _META_NO_METADATA)
response_metadata_id: bytes | None = None
if metadata_changed:
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
pos += 2
response_metadata_id = bytes(payload[pos : pos + id_len])
return ExecuteResult(
metadata_changed=metadata_changed,
no_metadata=no_metadata,
column_count=column_count,
result_metadata_id=response_metadata_id,
)
def _prepare_and_execute(
host: str, query: str, stale_metadata_id: bytes
) -> ExecuteResult:
"""
Open a raw socket connection (negotiating SCYLLA_USE_METADATA_ID), prepare
*query*, execute it with *stale_metadata_id*, and return the parsed result.
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
"""
sock = _cql_connect(host, 9042, "cassandra", "cassandra")
try:
stream = 1
stream += 1
query_id = _cql_prepare(sock, stream, query)
stream += 1
return _cql_execute_with_metadata_id(sock, stream, query_id, stale_metadata_id)
finally:
sock.close()
@pytest.mark.asyncio
async def test_list_roles_of_prepared_metadata_promotion(
manager: ManagerClient,
) -> None:
"""Verify that the server promotes the prepared metadata_id for statements
whose PREPARE response carries empty result metadata (NO_METADATA).
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
does not know the result set schema because the statement implementation
builds the metadata dynamically at execute time. The server therefore
returns the metadata_id of empty metadata in the PREPARE response.
When the client later sends EXECUTE with SKIP_METADATA and the stale
empty metadata_id, the server should detect the mismatch (the actual rows
have real metadata) and respond with a ``METADATA_CHANGED`` result that
carries the real metadata_id so the client can update its cache. This is
the behaviour mandated by CQL v5; on CQL v4 it is exercised via the
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
wire-level exchange.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
# Any non-empty bytes that differ from the real metadata_id serves as the
# "stale" cache entry the client would send after a PREPARE that returned
# empty metadata.
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
result = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}", stale_metadata_id
)
assert result.metadata_changed, (
f"expected EXECUTE for LIST ROLES OF {role} to return METADATA_CHANGED "
f"after PREPARE returned an empty result_metadata_id"
)
assert not result.no_metadata, (
f"expected EXECUTE for LIST ROLES OF {role} to not have NO_METADATA flag "
f"when METADATA_CHANGED is set"
)
assert result.result_metadata_id is not None, (
f"expected EXECUTE for LIST ROLES OF {role} to return a result_metadata_id "
f"alongside METADATA_CHANGED"
)
@pytest.mark.asyncio
@pytest.mark.skip_mode(
mode="release", reason="error injection is disabled in release mode"
)
async def test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection(
manager: ManagerClient,
) -> None:
"""Verify that the ``skip_rows_metadata_changed_response`` error injection
suppresses the metadata promotion, leaving the response with NO_METADATA
and without METADATA_CHANGED.
This is the negative/regression counterpart of
``test_list_roles_of_prepared_metadata_promotion``: it confirms that the
happy-path test is not a false positive by showing that the promotion can
be disabled, and that the injection point itself works correctly.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
async with inject_error(
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
):
async with inject_error(
manager.api, server.ip_addr, "skip_rows_metadata_changed_response"
):
result = await asyncio.to_thread(
_prepare_and_execute,
server.ip_addr,
f"LIST ROLES OF {role}",
stale_metadata_id,
)
assert not result.metadata_changed, (
f"expected injected EXECUTE for LIST ROLES OF {role} to suppress "
f"METADATA_CHANGED, but the flag was set"
)
assert result.no_metadata, (
f"expected injected EXECUTE for LIST ROLES OF {role} to keep the "
f"stale NO_METADATA path, but no_metadata flag was not set"
)

View File

@@ -257,39 +257,44 @@ async def manager(request: pytest.FixtureRequest,
yield manager_client
# `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve
# test failure.
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
cluster_status = None
found_errors = {}
failed = False
failed_test_dir_path = None
if failed or found_errors:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
str.maketrans('[]', '()'))
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
try:
report = request.node.stash[PHASE_REPORT_KEY]
failed = report.when == "call" and report.failed
if failed:
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
urllib.parse.quote(dir_path_relative))
record_property("TEST_LOGS", full_url)
# Check if the test has the check_nodes_for_errors marker
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
cluster_status = await manager_client.after_test(test_case_name, not failed)
await manager_client.stop() # Stop client session and close driver after each test
if failed or found_errors:
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
# all related logs in one dir.
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
str.maketrans('[]', '()'))
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
if cluster_status["server_broken"] and not failed:
if failed:
await manager_client.gather_related_logs(
failed_test_dir_path,
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
)
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
f.write(report.longreprtext)
if request.config.getoption('artifacts_dir_url') is not None:
# get the relative path to the tmpdir for the failed directory
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
urllib.parse.quote(dir_path_relative))
record_property("TEST_LOGS", full_url)
cluster_status = await manager_client.after_test(test_case_name, not failed)
finally:
await manager_client.stop() # Stop client session and close driver after each test
if cluster_status is not None and cluster_status["server_broken"] and not failed:
failed = True
pytest.fail(
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"

View File

@@ -10,6 +10,7 @@ import random
import string
import tempfile
import time
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from pprint import pformat
@@ -273,6 +274,30 @@ class TesterAlternator(BaseAlternator):
logger.info("Testing and validating an update query using key condition expression")
logger.info(f"ConditionExpression update of short circuit is: {conditional_update_short_circuit}")
dc2_table.update_item(**conditional_update_short_circuit)
# Wait for cross-DC replication to reach both live DC1 nodes
# before stopping dc2_node. The LWT commit uses LOCAL_QUORUM,
# which only guarantees DC2 persistence; replication to DC1 is
# async background work. Without this wait, stopping dc2_node
# can drop in-flight RPCs to DC1 while CAS mutations don't
# store hints. We must confirm both live DC1 replicas have the
# data so that the later ConsistentRead=True (LOCAL_QUORUM)
# read on restarted node1 is guaranteed to succeed.
# See https://scylladb.atlassian.net/browse/SCYLLADB-1267
dc1_live_nodes = [
node for node in self.cluster.nodelist()
if node.data_center == node1.data_center and node.server_id != node1.server_id
]
dc1_live_tables = [self.get_table(table_name=TABLE_NAME, node=node) for node in dc1_live_nodes]
wait_for(
lambda: all(
t.get_item(
Key={self._table_primary_key: new_pk_val}, ConsistentRead=False
).get("Item", {}).get("c") == 3
for t in dc1_live_tables
),
timeout=60,
text="Waiting for cross-DC replication of conditional update to both live DC1 nodes",
)
dc2_node.stop()
node1.start()
@@ -481,28 +506,33 @@ class TesterAlternator(BaseAlternator):
2) Issue Alternator 'heavy' requests concurrently (create-table)
3) wait for RequestLimitExceeded error response.
"""
concurrent_requests_limit = 5
# Keep the limit low to avoid exhausting LSA memory on the 1GB test node
# when multiple CreateTable requests (Raft + schema + flush) run concurrently.
concurrent_requests_limit = 3
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
node1 = self.cluster.nodelist()[0]
create_tables_threads = []
for tables_num in range(concurrent_requests_limit * 5):
create_tables_threads.append(self.run_create_table_thread())
stop_workers = threading.Event()
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
def wait_for_create_table_request_failure():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error:
if "RequestLimitExceeded" in error.args[0]:
return
raise
raise ConcurrencyLimitNotExceededError
def run_create_table_until_limited() -> None:
while not stop_workers.is_set():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error: # noqa: BLE001
if "RequestLimitExceeded" in str(error):
stop_workers.set()
return
raise
wait_for_create_table_request_failure()
with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor:
create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)]
for thread in create_tables_threads:
thread.join()
if not stop_workers.wait(timeout=30):
raise ConcurrencyLimitNotExceededError
stop_workers.set()
for future in create_table_futures:
future.result(timeout=60)
@staticmethod
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):

View File

@@ -1182,9 +1182,9 @@ class TestAuth(Tester):
def get_session(self, node_idx=0, user=None, password=None, exclusive=True):
node = self.cluster.nodelist()[node_idx]
if exclusive:
conn = self.patient_exclusive_cql_connection(node, user=user, password=password, timeout=0.1)
conn = self.patient_exclusive_cql_connection(node, user=user, password=password)
else:
conn = self.patient_cql_connection(node, user=user, password=password, timeout=0.1)
conn = self.patient_cql_connection(node, user=user, password=password)
return conn
def assert_permissions_listed(self, expected, session, query, include_superuser=False):

View File

@@ -199,7 +199,7 @@ class GSServer(GSFront):
def unpublish(self):
for k in self.vars:
v = self.oldvars[k]
v = self.oldvars.get(k)
if v:
os.environ[k] = v
elif os.environ.get(k):

View File

@@ -215,6 +215,11 @@ async def test_node_ops_tasks_tree(manager: ManagerClient):
servers, vt_ids = await check_remove_node_tasks_tree(manager, tm, module_name, servers, vt_ids)
servers, vt_ids = await check_decommission_tasks_tree(manager, tm, module_name, servers, vt_ids)
# Reconnect the driver after topology changes (replace, removenode,
# decommission) so that the new_test_keyspace cleanup can reach a
# live node for DROP KEYSPACE.
await manager.driver_connect()
@pytest.mark.asyncio
async def test_node_ops_tasks_ttl(manager: ManagerClient):
"""Test node ops virtual tasks' ttl."""

View File

@@ -17,6 +17,7 @@ import socket
import socketserver
import tempfile
import threading
import time
import uuid
from collections import namedtuple
from contextlib import contextmanager
@@ -33,9 +34,9 @@ from test.cluster.dtest.dtest_class import create_ks, wait_for
from test.cluster.dtest.tools.assertions import assert_invalid
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
from test.cluster.test_config import wait_for_config
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier
from test.pylib.util import wait_for as wait_for_async
logger = logging.getLogger(__name__)
@@ -113,11 +114,10 @@ class AuditTester:
for k in AUTH_CONFIG:
await self.manager.server_remove_config_option(srv.server_id, k)
# Remove absent keys so the server reverts to compiled-in defaults.
for k in absent_keys:
await self.manager.server_remove_config_option(srv.server_id, k)
if needs_restart:
# Remove absent keys so the server reverts to compiled-in defaults.
for k in absent_keys:
await self.manager.server_remove_config_option(srv.server_id, k)
await self.manager.server_stop_gracefully(srv.server_id)
full_cfg = self._build_server_config(needed, enable_compact_storage, user)
await self.manager.server_update_config(srv.server_id, config_options=full_cfg)
@@ -127,10 +127,17 @@ class AuditTester:
# Server stays up — only push live-updatable keys.
live_cfg = {k: v for k, v in needed.items() if k in LIVE_AUDIT_KEYS}
live_cfg["enable_create_table_with_compact_storage"] = enable_compact_storage
log_file = await self.manager.server_open_log(srv.server_id)
# Each remove/update sends a SIGHUP. Wait for each one's
# "completed re-reading configuration file" before the next
# so we never match a stale message.
for k in absent_keys:
from_mark = await log_file.mark()
await self.manager.server_remove_config_option(srv.server_id, k)
await log_file.wait_for(r"completed re-reading configuration file", from_mark=from_mark, timeout=60)
from_mark = await log_file.mark()
await self.manager.server_update_config(srv.server_id, config_options=live_cfg)
for key in LIVE_AUDIT_KEYS:
if key in live_cfg:
await wait_for_config(self.manager, srv, key, live_cfg[key])
await log_file.wait_for(r"completed re-reading configuration file", from_mark=from_mark, timeout=60)
async def _start_fresh_servers(self, needed: dict[str, str],
enable_compact_storage: bool,
@@ -345,7 +352,7 @@ class UnixSockerListener:
elif data != "Initializing syslog audit backend.":
self.server.parent_instance.lines.append(data)
class UnixDatagramServer(socketserver.ThreadingUnixDatagramServer):
class UnixDatagramServer(socketserver.UnixDatagramServer):
def __init__(self, socket_path, handler, parent_instance, lock):
self.parent_instance = parent_instance
self.mutex = lock
@@ -1342,7 +1349,13 @@ class CQLAuditTester(AuditTester):
conn = await self.manager.get_cql_exclusive(srv)
stmt = SimpleStatement("INSERT INTO ks.test1 (k, v1) VALUES (1000, 1000)", consistency_level=ConsistencyLevel.THREE)
conn.execute(stmt)
audit_node_ips = await self.get_audit_partitions_for_operation(session, stmt.query_string)
# The audit log entry may not be visible immediately after the
# insert, so retry with exponential backoff until it appears.
audit_node_ips = await wait_for_async(
lambda: self.get_audit_partitions_for_operation(session, stmt.query_string),
deadline=time.time() + 10,
period=0.05,
label=f"audit entry for node {index}")
node_to_audit_nodes[index] = set(audit_node_ips)
all_addresses = set(srv.ip_addr for srv in servers)

View File

@@ -0,0 +1,70 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import asyncio
import time
import pytest
from test.cluster.util import get_current_group0_config
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import read_barrier
from test.pylib.util import wait_for
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_bootstrap_with_quick_group0_join(manager: ManagerClient):
"""Regression test for https://scylladb.atlassian.net/browse/SCYLLADB-959.
The bug was that when the bootstrapping node joined group0 before reaching
post_server_start, it skipped post_server_start and thus hung forever.
The test simulates the scenario by starting the second node with the
join_group0_pause_before_config_check injection. Without the fix, the
startup times out.
"""
logger.info("Adding first server")
s1 = await manager.server_add()
logger.info("Adding second server with join_group0_pause_before_config_check enabled")
s2 = await manager.server_add(start=False, config={
'error_injections_at_startup': ['join_group0_pause_before_config_check']
})
logger.info(f"Starting {s2}")
start_task = asyncio.create_task(manager.server_start(s2.server_id))
s2_log = await manager.server_open_log(s2.server_id)
await s2_log.wait_for("join_group0_pause_before_config_check: waiting for message", timeout=60)
s1_host_id = await manager.get_host_id(s1.server_id)
s2_host_id = await manager.get_host_id(s2.server_id)
async def s2_in_group0_config_on_s1():
config = await get_current_group0_config(manager, s1)
ids = {m[0] for m in config}
assert s1_host_id in ids # sanity check
return True if s2_host_id in ids else None
# Note: we would like to wait for s2 to see itself in the group0 config, but we can't execute
# get_current_group0_config for s2, as s2 doesn't handle CQL requests at this point. As a workaround, we wait for s1
# to see s2 and then perform a read barrier on s2.
logger.info(f"Waiting for {s1} to see {s2} in the group0 config")
await wait_for(s2_in_group0_config_on_s1, deadline=time.time() + 60, period=0.1)
logger.info(f"Performing read barrier on {s2} to make sure it sees itself in the group0 config")
await read_barrier(manager.api, s2.ip_addr)
logger.info(f"Unblocking {s2}")
await manager.api.message_injection(s2.ip_addr, 'join_group0_pause_before_config_check')
logger.info(f"Waiting for {s2} to complete bootstrap")
await asyncio.wait_for(start_task, timeout=60)

View File

@@ -100,7 +100,7 @@ async def test_limited_concurrency_of_writes(manager: ManagerClient):
})
node2 = await manager.server_add()
cql = manager.get_cql()
cql = await manager.get_cql_exclusive(node1)
async with new_test_keyspace(manager, "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}") as ks:
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")

View File

@@ -11,7 +11,6 @@ from test.cluster.util import check_token_ring_and_group0_consistency, new_test_
import pytest
import asyncio
import logging
import time
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@@ -53,7 +52,7 @@ async def test_cleanup_stop(manager: ManagerClient):
await s0_log.wait_for('sstable_cleanup_wait: waiting', from_mark=s0_mark)
stop_cleanup = asyncio.create_task(manager.api.stop_compaction(servers[0].ip_addr, "CLEANUP"))
time.sleep(1)
await asyncio.sleep(1)
await manager.api.message_injection(servers[0].ip_addr, "sstable_cleanup_wait")
await stop_cleanup

View File

@@ -2279,7 +2279,7 @@ async def test_split_stopped_on_shutdown(manager: ManagerClient):
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
await log.wait_for('Stopping.*ongoing compactions')
await log.wait_for('Stopping.*ongoing compactions', from_mark=log_mark)
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)

View File

@@ -6,6 +6,7 @@
import os
import random
import socket
import subprocess
import sys
import time
@@ -59,7 +60,14 @@ async def server_address(request, testpy_test: None|Test):
ip = await testpy_test.suite.hosts.lease_host()
else:
ip = f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
port = random.randint(10000, 65535)
# Ask the OS to pick a free port by binding to port 0. This avoids
# collisions with ports still in TIME_WAIT from a previous test module
# that used the same IP. SO_REUSEADDR is set on the probe socket so it
# can reclaim a TIME_WAIT port itself
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, 0))
port = s.getsockname()[1]
yield ServerAddress(ip, port)
if testpy_test is not None:
await testpy_test.suite.hosts.release_host(ip)

View File

@@ -257,7 +257,7 @@ async def run_server(ip, port):
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, ip, port)
site = aiohttp.web.TCPSite(runner, ip, port, reuse_address=True, reuse_port=True)
await site.start()
try:

View File

@@ -4,14 +4,18 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import shutil
import itertools
import asyncio
import pathlib
import re
import os
import subprocess
from typing import Callable
logger = logging.getLogger("DockerizedServer")
class DockerizedServer:
"""class for running an external dockerized service image, typically mock server"""
# pylint: disable=too-many-instance-attributes
@@ -37,6 +41,7 @@ class DockerizedServer:
self.port = None
self.proc = None
self.service_port = port
self.echo_thread = None
async def start(self):
"""Starts docker image on a random port"""
@@ -45,77 +50,107 @@ class DockerizedServer:
if exe is not None)).resolve()
sid = f"{os.getpid()}-{DockerizedServer.newid()}"
name = f'{self.logfilenamebase}-{sid}'
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
self.logfile = logfilename.open("wb")
docker_args = self.docker_args(self.host, self.service_port)
image_args = self.image_args(self.host, self.service_port)
args = [exe, "run", "--name", name, "--rm" ]
if self.service_port is None:
args = args + ["-P"]
else:
args = args + ["-p", str(self.service_port)]
args = args + docker_args + [self.image] + image_args
# This seems weird, using the blocking IO subprocess.
# However, we want to use a pipe reader so we can push the
# output into the test log (because we are bad at propagating
# log files etc from CI)
# But the pipe reader needs to read until EOF, otherwise the
# docker process will eventually hang. So we can't await a
# coroutine.
# We _can_, sort of, use pool.create_task(...) to send a coro
# to the background, and use a signal for waiting, like here,
# thus ensuring the coro runs forever, sort of... However,
# this currently breaks, probably due to some part of the
# machinery/tests that don't async fully, causing us to not
# process the log, and thus hand/fail, bla bla.
# The solution is to make the process synced, and use a
# background thread (execution pool) for the processing.
# This way we know the pipe reader will not suddenly get
# blocked at inconvinient times.
proc = subprocess.Popen(args, stderr=subprocess.PIPE)
loop = asyncio.get_running_loop()
ready_fut = loop.create_future()
def process_io():
f = ready_fut
try:
while True:
data = proc.stderr.readline()
if not data:
if f:
loop.call_soon_threadsafe(f.set_exception, RuntimeError("Log EOF"))
logger.debug("EOF received")
break
line = data.decode()
self.logfile.write(data)
logger.debug(line)
if f and self.is_success_line(line, self.service_port):
logger.info('Got start message: %s', line)
loop.call_soon_threadsafe(f.set_result, True)
f = None
if f and self.is_failure_line(line, self.service_port):
logger.info('Got fail message: %s', line)
loop.call_soon_threadsafe(f.set_result, False)
f = None
except Exception as e:
logger.error("Exception in log processing: %s", e)
if f:
loop.call_soon_threadsafe(f.set_exception, e)
self.echo_thread = loop.run_in_executor(None, process_io)
ok = await ready_fut
if not ok:
self.logfile.close()
proc.kill()
proc.wait()
raise RuntimeError("Could not parse expected launch message from container")
check_proc = await asyncio.create_subprocess_exec(exe
, *["container", "port", name]
, stdout=asyncio.subprocess.PIPE
)
while True:
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
self.logfile = logfilename.open("wb")
data = await check_proc.stdout.readline()
if not data:
break
s = data.decode()
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
if m:
self.port = int(m.group(1))
docker_args = self.docker_args(self.host, self.service_port)
image_args = self.image_args(self.host, self.service_port)
args = ["run", "--name", name, "--rm" ]
if self.service_port is None:
args = args + ["-P"]
else:
args = args + ["-p", str(self.service_port)]
args = args + docker_args + [self.image] + image_args
proc = await asyncio.create_subprocess_exec(exe, *args, stderr=self.logfile)
failed = False
# In any sane world we would just pipe stderr to a pipe and launch a background
# task to just readline from there to both check the start message as well as
# add it to the log (preferrably via logger).
# This works fine when doing this in a standalone python script.
# However, for some reason, when run in a pytest fixture, the pipe will fill up,
# without or reader waking up and doing anyhing, and for any test longer than very
# short, we will fill the stderr buffer and hang.
# I cannot figure out how to get around this, so we workaround it
# instead by directing stderr to a log file, and simply repeatedly
# try to read the info from this file until we are happy.
async with asyncio.timeout(120):
done = False
while not done and not failed:
with logfilename.open("r") as f:
for line in f:
if self.is_success_line(line, self.service_port):
print(f'Got start message: {line}')
done = True
break
if self.is_failure_line(line, self.service_port):
print(f'Got fail message: {line}')
failed = True
break
if failed:
self.logfile.close()
await proc.wait()
continue
check_proc = await asyncio.create_subprocess_exec(exe
, *["container", "port", name]
, stdout=asyncio.subprocess.PIPE
)
while True:
data = await check_proc.stdout.readline()
if not data:
break
s = data.decode()
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
if m:
self.port = int(m.group(1))
await check_proc.wait()
if not self.port:
proc.kill()
raise RuntimeError("Could not query port from container")
self.proc = proc
break
await check_proc.wait()
if not self.port:
proc.kill()
proc.wait()
raise RuntimeError("Could not query port from container")
self.proc = proc
async def stop(self):
"""Stops docker image"""
if self.proc:
logger.debug("Stopping docker process")
self.proc.terminate()
await self.proc.wait()
self.proc.wait()
self.proc = None
if self.echo_thread:
logger.debug("Waiting for IO thread")
await self.echo_thread
self.echo_thread = None
if self.logfile:
logger.debug("Closing log file")
self.logfile.close()
self.logfile = None

View File

@@ -747,6 +747,8 @@ class ScyllaServer:
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
self.notify_socket.bind(str(self.notify_socket_path))
self._received_serving = False
loop = asyncio.get_running_loop()
def poll_status(s: socket.socket, f: asyncio.Future, logger: Union[logging.Logger, logging.LoggerAdapter]):
# Try to read all available messages from the socket
while True:
@@ -756,7 +758,7 @@ class ScyllaServer:
message = data.decode('utf-8', errors='replace')
if 'STATUS=serving' in message:
logger.debug("Received sd_notify 'serving' message")
f.set_result(True)
loop.call_soon_threadsafe(f.set_result, True)
return
if 'STATUS=entering maintenance mode' in message:
logger.debug("Receive sd_notify 'entering maintenance mode'")
@@ -766,9 +768,9 @@ class ScyllaServer:
except Exception as e:
logger.debug("Error reading from notify socket: %s", e)
break
f.set_result(False)
loop.call_soon_threadsafe(f.set_result, False)
self.serving_signal = asyncio.get_running_loop().create_future()
self.serving_signal = loop.create_future()
t = threading.Thread(target=poll_status, args=[self.notify_socket, self.serving_signal, self.logger], daemon=True)
t.start()
@@ -892,7 +894,6 @@ class ScyllaServer:
return
await report_error("the node startup failed, but the log file doesn't contain the expected error")
await report_error("failed to start the node")
self.logger.info("Wait me %s expect %s is %s", self.server_id, expected_server_up_state, server_up_state)
if await self.try_get_host_id(api):
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED

View File

@@ -64,11 +64,25 @@ async def wait_for(
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
last_exception: Exception | None = None
while True:
elapsed = time.time() - start
assert time.time() < deadline, \
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
res = await pred()
if time.time() >= deadline:
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
if last_exception is not None:
timeout_msg += (
f"; last exception: {type(last_exception).__name__}: {last_exception}"
)
raise AssertionError(timeout_msg) from last_exception
raise AssertionError(timeout_msg)
try:
res = await pred()
last_exception = None
except Exception as exc:
res = None
last_exception = exc
if res is not None:
if retries > 0:
logger.debug(f"wait_for({tag}) completed "

View File

@@ -69,7 +69,6 @@
#include "message/messaging_service.hh"
#include "idl/forward_cql.dist.hh"
#include "utils/bit_cast.hh"
#include "utils/error_injection.hh"
#include "utils/labels.hh"
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
@@ -1634,26 +1633,13 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
}
tracing::trace(trace_state, "Processing a statement");
auto cache_key_for_metadata = cache_key;
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, prepared, std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id), &qp, cache_key = std::move(cache_key_for_metadata), prepared = std::move(prepared)] (auto msg) mutable {
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
if (msg->move_to_shard()) {
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
} else {
if (prepared->result_metadata_is_empty()
&& metadata_id.has_request_metadata_id()
&& !utils::get_local_injector().enter("skip_prepared_result_metadata_promotion")) {
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
auto rows_metadata_id = rows->rs().get_metadata().calculate_metadata_id();
clogger.debug("prepared result metadata promotion: request_metadata_id_present={}, calculated_rows_metadata_id_size={}",
metadata_id.has_request_metadata_id(), rows_metadata_id._metadata_id.size());
qp.local().update_prepared_result_metadata_id(cache_key, rows_metadata_id);
auto request_metadata_id = metadata_id.get_request_metadata_id();
metadata_id = cql_metadata_id_wrapper(std::move(request_metadata_id), std::move(rows_metadata_id));
}
}
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
}
@@ -2521,16 +2507,9 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
no_metadata, skip_rows_metadata_changed_response);
if (!skip_rows_metadata_changed_response) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
}