Compare commits

...

29 Commits

Author SHA1 Message Date
Wojciech Mitros
2b20bd887b db/view: track range tombstones in update stream during view update building
The view update builder ignored range tombstone changes from the update
stream when there all existing mutation fragments were already consumed.
The old code assumed range tombstones 'remove nothing pre-existing, so
we can ignore it', but this failed to update _update_current_tombstone.
Consequently, when a range delete and an insert within that range appeared
in the same batch, the range tombstone was not applied to the inserted row,
or was applied to a row outside the range that it covered causing it to
incorrectly survive/be deleted in the materialized view.

Fix by handling is_range_tombstone_change() fragments in the update-only
branch, updating _update_current_tombstone so subsequent clustering rows
correctly have the range tombstone applied to them.

Fixes SCYLLADB-1555

Closes scylladb/scylladb#29483

(cherry picked from commit 6011cb8a4c)

Closes scylladb/scylladb#29569

Closes scylladb/scylladb#29643
2026-04-25 18:45:26 +03:00
Wojciech Mitros
6c51548fd6 test/cluster: fix flaky test_hints_consistency_during_replace
The test creates a sync point immediately after writing 100 rows
with CL=ANY, without waiting for pending hint writes to complete.

store_hint() is fire-and-forget: it submits do_store_hint() to a gate
and returns immediately. do_store_hint() updates _last_written_rp only
after writing to the commitlog. If create_sync_point() is called before
all do_store_hint() coroutines complete, the captured replay position
is stale, and await_sync_point() returns DONE before all hints are
replayed, leaving some rows missing.

Fix by waiting for the size_of_hints_in_progress metric to reach zero
before creating the sync point, ensuring all in-flight hint writes have
completed and _last_written_rp is up to date. This follows the same
pattern already used in test_sync_point.

Fixes: SCYLLADB-1709

Closes scylladb/scylladb#29623

(cherry picked from commit 7634d3f7d4)

Closes scylladb/scylladb#29632

Closes scylladb/scylladb#29641
2026-04-25 18:45:02 +03:00
Dario Mirovic
e114fc8175 test: use DROP KEYSPACE IF EXISTS in new_test_keyspace cleanup
The new_test_keyspace context manager in test/cluster/util.py uses
DROP KEYSPACE without IF EXISTS during cleanup. The Python driver
has a known bug (scylladb/python-driver#317) where connection pool
renewal after concurrent node bootstraps causes double statement
execution. The DROP succeeds server-side, but the response is lost
when the old pool is closed. The driver retries on the new pool, and
gets ConfigurationException message "Cannot drop non existing keyspace".

The CREATE KEYSPACE in create_new_test_keyspace already uses IF NOT
EXISTS as a workaround for the same driver bug. This patch applies
the same approach to fix DROP KEYSPACE.

Fixes SCYLLADB-1538

Closes scylladb/scylladb#29487

(cherry picked from commit 40740104ab)

Closes scylladb/scylladb#29568

Closes scylladb/scylladb#29644
2026-04-25 18:44:39 +03:00
Botond Dénes
dfd2507f0d test/cluster/test_incremental_repair: fix flaky do_tablet_incremental_repair_and_ops
The log grep in get_sst_status searched from the beginning of the log
(no from_mark), so the second-repair assertions were checking cumulative
counts across both repairs rather than counts for the second repair alone.

The expected values (sst_add==2, sst_mark==2) relied on this cumulative
behaviour: 1 from the first repair + 1 from the second = 2. This works
when the second repair encounters exactly one unrepaired sstable, but
fails whenever the second repair sees two.

The second repair can see two unrepaired sstables when the 100 keys
inserted before it (via asyncio.gather) trigger a background auto-flush
before take_storage_snapshot runs. take_storage_snapshot always flushes
the memtable itself, so if an auto-flush already split the batch into two
sstables on disk, the second repair's snapshot contains both and logs
"Added sst" twice, making the cumulative count 3 instead of 2.

Fix: take a log mark per-server before each repair call and pass it to
get_sst_status so each check counts only the entries produced by that
repair. The expected values become 1/0/1 and 1/1/1 respectively,
independent of how many sstables happened to exist beforehand.

get_sst_status gains an optional from_mark parameter (default None)
which preserves existing call sites that intentionally grep from the
start of the log.

Fixes: SCYLLADB-1711

Closes scylladb/scylladb#29484

(cherry picked from commit d280517e27)

Closes scylladb/scylladb#29633

Closes scylladb/scylladb#29640
2026-04-25 18:43:47 +03:00
Botond Dénes
04e33d69da Merge 'service: tasks: return successful status if a table was dropped' from Aleksandra Martyniuk
tablet_virtual_task::wait throws if a table on which a tablet operation
was working is dropped.

Treat the tablet operation as successful if a table is dropped.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-494

Needs backport to all live releases

Closes scylladb/scylladb#28933

* github.com:scylladb/scylladb:
  test: add test_tablet_repair_wait_with_table_drop
  service: tasks: return successful status if a table was dropped

(cherry picked from commit 1e41db5948)

Closes scylladb/scylladb#29361
2026-04-24 18:06:09 +03:00
Marcin Maliszkiewicz
5468cd49da Merge 'ldap: fix double-free of LDAPMessage in poll_results()' from Andrzej Jackowski
In the unregistered-ID branch, ldap_msgfree() was called on a result
already owned by an RAII ldap_msg_ptr, causing a double-free on scope
exit. Remove the redundant manual free.

Fixes: SCYLLADB-1344

Backport: 2026.1, 2025.4, 2025.1 - it's a memory corruption, with a one-line fix, so better backport it everywhere.

Closes scylladb/scylladb#29302

* github.com:scylladb/scylladb:
  test: ldap: add regression test for double-free on unregistered message ID
  ldap: fix double-free of LDAPMessage in poll_results()

(cherry picked from commit 895fdb6d29)

Closes scylladb/scylladb#29393

Closes scylladb/scylladb#29454
2026-04-24 18:05:26 +03:00
Botond Dénes
52a3ed4312 Merge 'test: cluster: Deflake test_write_cl_any_to_dead_node_generates_hints' from Dawid Mędrek
Before these changes, we would send mutations to the node and
immediately query the metrics to see how many hints had been written.
However, that could lead to random failures of the test: even if the
mutations have finished executing, hints are stored asynchronously, so
we don't have a guarantee they have already been processed.

To prevent such failures, we rewrite the check: we will perform multiple
checks against the metrics until we have confirmed that the hints have
indeed been written or we hit the timeout.

We're generous with the timeout: we give the test 60 seconds. That
should be enough time to avoid flakiness even on super slow machines,
and if the test does fail, we will know something is really wrong.

As a bonus, we improve the test in general too. We explicitly express
the preconditions we rely on, as well as bump the log level. If the
test fails in the future, it might be very difficult do debug it
without this additional information.

Fixes SCYLLADB-1133

Backport: The test is present on all supported branches. To avoid
          running into more failures, we should backport these changes
          to them.

Closes scylladb/scylladb#29191

* github.com:scylladb/scylladb:
  test: cluster: Increase log level in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Await all mutations concurrently in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Specify min_tablet_count in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Use new_test_table in test_write_cl_any_to_dead_node_generates_hints
  test: cluster: Introduce auxiliary function keyspace_has_tablets
  test: cluster: Deflake test_write_cl_any_to_dead_node_generates_hints

(cherry picked from commit ffd58ca1f0)

Closes scylladb/scylladb#29246

Closes scylladb/scylladb#29596
2026-04-24 18:04:37 +03:00
Tomasz Grabiec
816526842e test/alternator: stop concurrent-requests test when workers hit limit
`test_limit_concurrent_requests` could create far more tables than intended
because worker threads looped indefinitely and only the probe path terminated
the test. In practice, workers often hit `RequestLimitExceeded` first, but the
test kept running and creating tables, increasing memory pressure and causing
flakiness due to bad_alloc errors in logs.

Fix by replacing the old probe-driven termination with worker-driven
termination. Workers now run until any worker sees
`RequestLimitExceeded`.

Fixes SCYLLADB-1181

Closes scylladb/scylladb#29270

(cherry picked from commit b355bb70c2)

Closes scylladb/scylladb#29292

Closes scylladb/scylladb#29610
2026-04-24 18:03:10 +03:00
Benny Halevy
73d7f9c1fe compaction_manager: fix use-after-free in postponed_compactions_reevaluation()
drain() signals the postponed_reevaluation condition variable to terminate
the postponed_compactions_reevaluation() coroutine but does not await its
completion. When enable() is called afterwards, it overwrites
_waiting_reevalution with a new coroutine, orphaning the old one. During
shutdown, really_do_stop() only awaits the latest coroutine via
_waiting_reevalution, leaving the orphaned coroutine still alive. After
sharded::stop() destroys the compaction_manager, the orphaned coroutine
resumes and reads freed memory (is_disabled() accesses _state).

Fix by introducing stop_postponed_compactions(), awaiting the reevaluation coroutine in
both drain() and stop() after signaling it, if postponed_compactions_reevaluation() is running.
It uses an std::optional<future<>> for _waiting_reevalution and std::exchange to leave
_waiting_reevalution disengaged when postponed_compactions_reevaluation() is not running.
This prevents a race between drain() and stop().

While at it, fix typo in _waiting_reevalution -> _waiting_reevaluation.

Fixes: SCYLLADB-1600
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#29443

(cherry picked from commit 05a00fe140)
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#29527

Closes scylladb/scylladb#29629
2026-04-24 18:02:31 +03:00
Ernest Zaslavsky
116a2f43ee sstables_loader: prevent use-after-free on table drop during streaming
sstables_loader::load_and_stream holds a replica::table& reference via
the sstable_streamer for the entire streaming operation.  If the table
is dropped concurrently (e.g. DROP TABLE or DROP KEYSPACE), the
reference becomes dangling and the next access crashes with SEGV.

This was observed in a longevity-50gb-12h-master test run where a
keyspace was dropped while load_and_stream was still streaming SSTables
from a previous batch.

Fix by acquiring a stream_in_progress() phaser guard in load_and_stream
before creating the streamer.  table::stop() calls
_pending_streams_phaser.close() which blocks until all outstanding
guards are released, keeping the table alive for the duration of the
streaming operation.

Fixes: SCYLLADB-1639

Closes scylladb/scylladb#29403

(cherry picked from commit e5e6608f20)

Closes scylladb/scylladb#29558

Closes scylladb/scylladb#29600
2026-04-24 10:33:51 +02:00
Emil Maskovsky
5d1a8b91cd encryption: cover system.raft table in system_info_encryption
Extend system_info_encryption to encrypt system.raft SSTables.
system.raft contains the Raft log, which may hold sensitive user data
(e.g. batched mutations), so it warrants the same treatment as
system.batchlog and system.paxos.

During upgrade, existing unencrypted system.raft SSTables remain
readable. Existing data is rewritten encrypted via compaction, or
immediately via nodetool upgradesstables -a.

Update the operator-facing system_info_encryption description to
mention system.raft and add a focused test that verifies the schema
extension is present on system.raft.

Fixes: CUSTOMER-268

Backport: 2026.1 - closes an encryption-at-rest coverage gap: system.raft may persist sensitive user-originated data unencrypted; backport to the current LTS.

Closes scylladb/scylladb#29242

(cherry picked from commit 91df3795fc)

Closes scylladb/scylladb#29526

Closes scylladb/scylladb#29582
2026-04-23 10:17:20 +02:00
Botond Dénes
c9ee67c85c Merge 'transport: improve memory accounting for big responses and slow network' from Marcin Maliszkiewicz
After obtaining the CQL response, check if its actual size exceeds the initially acquired memory permit. If so, acquire additional semaphore units and adopt them into the permit, ensuring accurate memory accounting for large responses.

Additionally, move the permit into a .then() continuation so that the semaphore units are kept alive until write_message finishes, preventing premature release of memory permit. This is especially important with slow networks and big responses when buffers can accumulate and deplete a node's memory.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1306
Related https://scylladb.atlassian.net/browse/SCYLLADB-740

Backport: all supported versions

Closes scylladb/scylladb#29288

* github.com:scylladb/scylladb:
  transport: add per-service-level pending response memory metric
  transport: hold memory permit until response write completes
  transport: account for response size exceeding initial memory estimate

(cherry picked from commit 86417d49de)

Closes scylladb/scylladb#29410

Closes scylladb/scylladb#29455
2026-04-21 12:38:40 +02:00
Piotr Dulikowski
04d8663052 Merge 'cql3: pin prepared cache entry in prepare() to avoid invalid weak handle race' from Alex Dathskovsky
query_processor::prepare() could race with prepared statement invalidation: after loading from the prepared cache, we converted the cached object to a checked weak pointer and then continued asynchronous work (including error-injection waitpoints). If invalidation happened in that window, the weak handle could no longer be promoted and the prepare path could fail nondeterministically.

This change keeps a strong cache entry reference alive across the whole critical section in prepare() by using a pinned cache accessor (get_pinned()), and only deriving the weak handle while the entry is pinned. This removes the lifetime gap without adding retry loops.

  Test coverage was extended in test/cluster/test_prepare_race.py:

  - reproduces the invalidation-during-prepare window with injection,
  - verifies prepare completes successfully,
  - then invalidates again and executes the same stale client prepared object,
  - confirms the driver transparently re-requests/re-prepares and execution succeeds.

  This change introduces:

  - no behavior change for normal prepare flow besides stronger lifetime guarantees,
  - no new protocol semantics,
  - preserves existing cache invalidation logic,
  - adds explicit cluster-level regression coverage for both the race and driver reprepare path.
  - pushes the re prepare operation twards the driver, the server will return unprepared error for the first time and the driver will have to re prepare during execution stage

Fixes: https://github.com/scylladb/scylladb/issues/27657

Backport to active branches recommended: No node crash, but user-visible PREPARE failures under rare schema-invalidation race; low-risk timeout-bounded retry improves robustness.

Closes scylladb/scylladb#28952

* github.com:scylladb/scylladb:
  transport/messages: hold pinned prepared entry in PREPARE result
  cql3: pin prepared cache entry in prepare() to avoid invalid weak handle race

(cherry picked from commit d9a277453e)

Closes scylladb/scylladb#29001

Closes scylladb/scylladb#29195
2026-04-20 12:59:53 +02:00
Jenkins Promoter
72cd145990 Update ScyllaDB version to: 2025.4.8 2026-04-17 01:18:47 +03:00
Botond Dénes
41e2c2d1c4 Merge 'tasks: do not fail the wait request if rpc fails' from Aleksandra Martyniuk
During decommission, we first mark a topology request as done, then shut
down a node and in the following steps we remove node from the topology.
Thus,  finished request does not imply that a node is removed from
the topology.

Due to that, in node_ops_virtual_task::wait, while gathering children
from the whole cluster, we may hit the connection exception - because
a node is still in topology, even though it is down.

Modify the get_children method to ignore the exception and warn
about the failure instead.

Keep token_metadata_ptr in get_children to prevent topology from changing.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-867

Needs backports to all versions

Closes scylladb/scylladb#29035

* github.com:scylladb/scylladb:
  tasks: fix indentation
  tasks: do not fail the wait request if rpc fails
  tasks: pass token_metadata_ptr to task_manager::virtual_task::impl::get_children

(cherry picked from commit 2e47fd9f56)

Closes scylladb/scylladb#29193
2026-04-16 21:57:08 +03:00
Pavel Emelyanov
2d1fdce790 object_storage_endpoint_param: Make it formattable for real
Currently the formatter converts it to json and then tries to emit into
the output context with the "...{{}}" format string. The intent was to
have the "...{<json text>}" output. However, the double curly brace in
format string means "print a curly brace", so the output of the above
formatting is "...{}", literally.

Fix by keeping a single curly brace. The "<json text>" thing will have
its own surrounding curly braces.

Fixes #27718

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes #27687

(reworked version of commit a6618f2, the formatter is in db/config.cc)

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#27733
2026-04-16 21:54:23 +03:00
Pavel Emelyanov
1e0487bd57 table: Add formatter for group_id argument in tablet merge exception message
Fixes: SCYLLADB-1432

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#29143

(cherry picked from commit 78f5bab7cf)

Closes scylladb/scylladb#29412

Closes scylladb/scylladb#29453
2026-04-16 10:57:37 +03:00
Michał Chojnowski
18fc2eff31 test: add a missing reconnect_driver in test_sstable_compression_dictionaries_upgrade.py
Need to work around https://github.com/scylladb/python-driver/issues/295,
lest a CQL query fail spuriously after the cluster restart.

Fixes: SCYLLADB-1114

Closes scylladb/scylladb#29118

(cherry picked from commit 6b18d95dec)

Closes scylladb/scylladb#29146

Closes scylladb/scylladb#29366
2026-04-16 10:56:59 +03:00
Botond Dénes
340369a4d4 Merge 'Alternator: add per-table batch latency metrics and test coverage' from Amnon Heiman
This series fixes a metrics visibility gap in Alternator and adds regression coverage.

Until now, BatchGetItem and BatchWriteItem updated global latency histograms but did not consistently update per-table latency histograms. As a result, table-level latency dashboards could miss batch traffic.

It updates the batch read/write paths to compute request duration once and record it in both global and per-table latency metrics.

Add the missing tests, including a metric-agnostic helper and a dedicated per-table latency test that verifies latency counters increase for item and batch operations.

This change is metrics-only (no API/behavior change for requests) and improves observability consistency between global and per-table views.

Fixes #28721

**We assume the alternator per-table metrics exist, but the batch ones are not updated**

Closes scylladb/scylladb#28732

* github.com:scylladb/scylladb:
  test(alternator): add per-table latency coverage for item and batch ops
  alternator: track per-table latency for batch get/write operations

(cherry picked from commit 035aa90d4b)

Closes scylladb/scylladb#29067

Closes scylladb/scylladb#29365
2026-04-16 10:56:16 +03:00
Pavel Emelyanov
9041f70f34 s3: Don't rearm credential timers when credentials are not refreshed
The update_credentials_and_rearm() may get "empty" credentials from
_creds_provider_chain.get_aws_credentials() -- it doesn't throw, but
returns default-initialized value. In that case the expires_at will be
set to time_point::min, and it's probably not a good idea to arm the
refresh timer and, even worse idea, to subtract 1h from it.

Fixes #29056

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#29057

(cherry picked from commit 961fc9e041)

Closes scylladb/scylladb#29158

Closes scylladb/scylladb#29364
2026-04-16 10:55:25 +03:00
Nikos Dragazis
7ed772866e scylla_swap_setup: Remove Before=swap.target dependency from swap unit
When a Scylla node starts, the scylla-image-setup.service invokes the
`scylla_swap_setup` script to provision swap. This script allocates a
swap file and creates a swap systemd unit to delegate control to
systemd. By default, systemd injects a Before=swap.target dependency
into every swap unit, allowing other services to use swap.target to wait
for swap to be enabled.

On Azure, this doesn't work so well because we store the swap file on
the ephemeral disk [1] which has network dependencies (`_netdev` mount
option, configured by cloud-init [2]). This makes the swap.target
indirectly depend on the network, leading to dependency cycles such as:

swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target
-> network.target -> systemd-resolved.service -> tmp.mount -> swap.target

This patch breaks the cycle by removing the swap unit from swap.target
using DefaultDependencies=no. The swap unit will still be activated via
WantedBy=multi-user.target, just not during early boot.

Although this problem is specific to Azure, this patch applies the fix
to all clouds to keep the code simple.

Fixes #26519.
Fixes SCYLLADB-1257

[1] https://github.com/scylladb/scylla-machine-image/pull/426
[2] https://github.com/canonical/cloud-init/pull/1213#issuecomment-1026065501

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>

Closes scylladb/scylladb#28504

(cherry picked from commit 6d50e67bd2)

Closes scylladb/scylladb#29339

Closes scylladb/scylladb#29354
2026-04-16 10:53:52 +03:00
Andrzej Jackowski
7b97fe4a92 reader_concurrency_semaphore: fix leak workaround
`e4da0afb8d5491bf995cbd1d7a7efb966c79ac34` introduces a protection
against resources that are "made up" of thin air to
`reader_concurrency_semaphore`. If there are more `_resources` than
the `_initial_resources`, it means there is a negative leak, and
`on_internal_error_noexcept` is called. In addition to it,
`_resources` is set to `std::max(_resources, _initial_resources)`.

However, the commit message of `e4da0afb8d5491bf995cbd1d7a7efb966c79ac34`
states the opposite: "The detection also clamps the
_resources to _initial_resources, to prevent any damage".

Before this commit, the protection mechanism doesn't clamp
`_resources` to `_initial_resources` but instead keeps `_resources` high,
possibly even indefinitely growing. This commit changes `std::max` to
`std::min` to make the code behave as intended.

Fixes: SCYLLADB-1014
Refs: SCYLLADB-163

Closes scylladb/scylladb#28982

(cherry picked from commit 9247dff8c2)

Closes scylladb/scylladb#28988

Closes scylladb/scylladb#29196
2026-04-15 11:53:42 +02:00
Jenkins Promoter
ba3b7360e0 Update pgo profiles - aarch64 2026-04-15 04:44:13 +03:00
Jenkins Promoter
06e9ecab9b Update pgo profiles - x86_64 2026-04-15 03:56:26 +03:00
Avi Kivity
c6d356e7cc Merge '[Backport 2025.4] vector_search: fix race condition on connection timeout' from Scylladb[bot]
vector_search: fix race condition on connection timeout

When a `with_connect` operation timed out, the underlying connection
attempt continued to run in the reactor. This could lead to a crash
if the connection was established/rejected after the client object had
already been destroyed. This issue was observed during the teardown
phase of a upcoming high-availability test case.

This commit fixes the race condition by ensuring the connection attempt
is properly canceled on timeout.

 Additionally, the explicit TLS handshake previously forced during the
 connection is now deferred to the first I/O operation, which is the
 default and preferred behavior.

 Fixes: SCYLLADB-832

Backports to 2026.1 and 2025.4 are required, as this issue also exists on those branches and is causing CI flakiness.

- (cherry picked from commit 3107d9083e)

Parent PR: #29031

Closes scylladb/scylladb#29360

* github.com:scylladb/scylladb:
  vector_search: test: fix flaky test
  vector_search: fix race condition on connection timeout
2026-04-12 14:24:57 +03:00
Botond Dénes
30c2f03749 Merge 'cql3: fix null handling in data_value formatting' from Dario Mirovic
`data_value::to_parsable_string()` crashes with a null pointer dereference when called on a `null` data_value. Return `"null"` instead.

Added tests after the fix. Manually checked that tests fail without the fix.

Fixes SCYLLADB-1350

This is a fix that prevents format crash. No known occurrence in production, but backport is desirable.

Closes scylladb/scylladb#29262

* github.com:scylladb/scylladb:
  test: boost: test null data value to_parsable_string
  cql3: fix null handling in data_value formatting

(cherry picked from commit 816f2bf163)

Closes scylladb/scylladb#29384

Closes scylladb/scylladb#29434
2026-04-12 14:23:23 +03:00
Pavel Emelyanov
17075bf3f9 Merge 'encryption: fix deadlock in encrypted_data_source::get()' from Ernest Zaslavsky
When encrypted_data_source::get() caches a trailing block in _next, the next call takes it directly — bypassing input_stream::read(), which checks _eof. It then calls input_stream::read_exactly() on the already-drained stream. Unlike read(), read_up_to(), and consume(), read_exactly() does not check _eof when the buffer is empty, so it calls _fd.get() on a source that already returned EOS.

In production this manifested as stuck encrypted SSTable component downloads during tablet restore: the underlying chunked_download_source hung forever on the post-EOS get(), causing 4 tablets to never complete. The stuck files were always block-aligned sizes (8k, 12k) where _next gets populated and the source is fully consumed in the same call.

Fix by checking _input.eof() before calling read_exactly(). When the stream already reached EOF, buf2 is known to be empty, so the call is skipped entirely.

A comprehensive test is added that uses a strict_memory_source which fails on post-EOS get(), reproducing the exact code path that caused the production deadlock.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-1128

Backport to 2025.3/4 and 2026.1 is needed since it fixes a bug that may bite us in production, to be on the safe side

Closes scylladb/scylladb#29110

* github.com:scylladb/scylladb:
  encryption: fix deadlock in encrypted_data_source::get()
  Fix formatting after previous patch
  Fix indentation after previous patch

(cherry picked from commit 3b9398dfc8)

Closes scylladb/scylladb#29198

Closes scylladb/scylladb#29359
2026-04-12 14:20:37 +03:00
Karol Nowacki
f5111bfc9b vector_search: test: fix flaky test
The test assumes that the sleep duration will be at least the value of
the sleep parameter. However, the actual sleep time can be slightly less
than requested (e.g., a 100ms sleep request might result in a 99ms
sleep).

This commit adjusts the test's time comparison to be more lenient,
preventing test flakiness.
2026-04-09 13:41:23 +02:00
Karol Nowacki
6b5de6394b vector_search: fix race condition on connection timeout
When a `with_connect` operation timed out, the underlying connection
attempt continued to run in the reactor. This could lead to a crash
if the connection was established/rejected after the client object had
already been destroyed. This issue was observed during the teardown
phase of a upcoming high-availability test case.

This commit fixes the race condition by ensuring the connection attempt
is properly canceled on timeout.

Additionally, the explicit TLS handshake previously forced during the
connection is now deferred to the first I/O operation, which is the
default and preferred behavior.

Fixes: SCYLLADB-832
2026-04-09 13:19:34 +02:00
49 changed files with 870 additions and 172 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2025.4.7
VERSION=2025.4.8
if test -f version
then

View File

@@ -3331,7 +3331,11 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
if (should_add_wcu) {
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
}
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_write_item_latency.mark(duration);
for (const auto& w : per_table_wcu) {
w.first->api_operations.batch_write_item_latency.mark(duration);
}
co_return rjson::print(std::move(ret));
}
@@ -4815,7 +4819,12 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
if (!some_succeeded && eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_get_item_latency.mark(duration);
for (const table_requests& rs : requests) {
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
}
if (is_big(response)) {
co_return make_streamed(std::move(response));
} else {

View File

@@ -1119,7 +1119,10 @@ void compaction_manager::enable() {
_compaction_submission_timer.cancel();
_compaction_submission_timer.arm_periodic(periodic_compaction_submission_interval());
_waiting_reevalution = postponed_compactions_reevaluation();
if (_waiting_reevaluation) {
on_internal_error(cmlog, "postponed compactions reevaluation is already running when enabling compaction manager");
}
_waiting_reevaluation.emplace(postponed_compactions_reevaluation());
cmlog.info("Enabled");
}
@@ -1167,6 +1170,16 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept {
_postponed_reevaluation.signal();
}
future<> compaction_manager::stop_postponed_compactions() noexcept {
auto waiting_reevaluation = std::exchange(_waiting_reevaluation, std::nullopt);
if (!waiting_reevaluation) {
return make_ready_future();
}
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
reevaluate_postponed_compactions();
return std::move(*waiting_reevaluation);
}
void compaction_manager::postpone_compaction_for_table(compaction_group_view* t) {
_postponed.insert(t);
}
@@ -1250,8 +1263,7 @@ future<> compaction_manager::drain() {
_compaction_submission_timer.cancel();
// Stop ongoing compactions, if the request has not been sent already and wait for them to stop.
co_await stop_ongoing_compactions("drain");
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
reevaluate_postponed_compactions();
co_await stop_postponed_compactions();
cmlog.info("Drained");
}
@@ -1289,8 +1301,7 @@ future<> compaction_manager::really_do_stop() noexcept {
if (!_tasks.empty()) {
on_fatal_internal_error(cmlog, format("{} tasks still exist after being stopped", _tasks.size()));
}
reevaluate_postponed_compactions();
co_await std::move(_waiting_reevalution);
co_await stop_postponed_compactions();
co_await _sys_ks.close();
_weight_tracker.clear();
_compaction_submission_timer.cancel();

View File

@@ -124,7 +124,7 @@ private:
// a sstable from being compacted twice.
std::unordered_set<sstables::shared_sstable> _compacting_sstables;
future<> _waiting_reevalution = make_ready_future<>();
std::optional<future<>> _waiting_reevaluation;
condition_variable _postponed_reevaluation;
// tables that wait for compaction but had its submission postponed due to ongoing compaction.
std::unordered_set<compaction::compaction_group_view*> _postponed;
@@ -232,6 +232,7 @@ private:
future<> postponed_compactions_reevaluation();
void reevaluate_postponed_compactions() noexcept;
future<> stop_postponed_compactions() noexcept;
// Postpone compaction for a table that couldn't be executed due to ongoing
// similar-sized compaction.
void postpone_compaction_for_table(compaction::compaction_group_view* t);

View File

@@ -105,6 +105,7 @@ public:
static const std::chrono::minutes entry_expiry;
using key_type = prepared_cache_key_type;
using pinned_value_type = cache_value_ptr;
using value_type = checked_weak_ptr;
using statement_is_too_big = typename cache_type::entry_is_too_big;
@@ -116,9 +117,14 @@ public:
: _cache(size, entry_expiry, logger)
{}
template <typename LoadFunc>
future<pinned_value_type> get_pinned(const key_type& key, LoadFunc&& load) {
return _cache.get_ptr(key.key(), [load = std::forward<LoadFunc>(load)] (const cache_key_type&) { return load(); });
}
template <typename LoadFunc>
future<value_type> get(const key_type& key, LoadFunc&& load) {
return _cache.get_ptr(key.key(), [load = std::forward<LoadFunc>(load)] (const cache_key_type&) { return load(); }).then([] (cache_value_ptr v_ptr) {
return get_pinned(key, std::forward<LoadFunc>(load)).then([] (cache_value_ptr v_ptr) {
return make_ready_future<value_type>((*v_ptr)->checked_weak_from_this());
});
}

View File

@@ -682,7 +682,7 @@ future<::shared_ptr<cql_transport::messages::result_message::prepared>>
query_processor::prepare(sstring query_string, const service::client_state& client_state, cql3::dialect d) {
try {
auto key = compute_id(query_string, client_state.get_raw_keyspace(), d);
auto prep_ptr = co_await _prepared_cache.get(key, [this, &query_string, &client_state, d] {
auto prep_entry = co_await _prepared_cache.get_pinned(key, [this, &query_string, &client_state, d] {
auto prepared = get_statement(query_string, client_state, d);
prepared->calculate_metadata_id();
auto bound_terms = prepared->statement->get_bound_terms();
@@ -696,13 +696,13 @@ query_processor::prepare(sstring query_string, const service::client_state& clie
return make_ready_future<std::unique_ptr<statements::prepared_statement>>(std::move(prepared));
});
const auto& warnings = prep_ptr->warnings;
const auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_ptr),
co_await utils::get_local_injector().inject(
"query_processor_prepare_wait_after_cache_get",
utils::wait_for_message(std::chrono::seconds(60)));
auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_entry),
client_state.is_protocol_extension_set(cql_transport::cql_protocol_extension::LWT_ADD_METADATA_MARK));
for (const auto& w : warnings) {
msg->add_warning(w);
}
co_return ::shared_ptr<cql_transport::messages::result_message::prepared>(std::move(msg));
co_return std::move(msg);
} catch(typename prepared_statements_cache::statement_is_too_big&) {
throw prepared_statement_is_too_big(query_string);
}

View File

@@ -1672,7 +1672,7 @@ auto fmt::formatter<db::error_injection_at_startup>::format(const db::error_inje
auto fmt::formatter<db::object_storage_endpoint_param>::format(const db::object_storage_endpoint_param& e, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{{}}", e.to_json_string());
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{}", e.to_json_string());
}
namespace utils {

View File

@@ -1594,9 +1594,10 @@ future<stop_iteration> view_update_builder::on_results() {
return should_stop_updates() ? stop() : advance_existings();
}
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
if (_update && !_update->is_end_of_partition()) {
if (_update->is_clustering_row()) {
if (_update->is_range_tombstone_change()) {
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
} else if (_update->is_clustering_row()) {
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
});

View File

@@ -9,6 +9,7 @@
import os
import sys
import shlex
import argparse
import psutil
from pathlib import Path
@@ -103,16 +104,41 @@ if __name__ == '__main__':
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
swapfile.chmod(0o600)
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
mount_point = find_mount_point(swap_directory)
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
# Add DefaultDependencies=no to the swap unit to avoid getting the default
# Before=swap.target dependency. We apply this to all clouds, but the
# requirement came from Azure:
#
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
# the network (After=network-online.target). By extension, this means that
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
# then the swap unit would be part of the swap.target which other services
# assume to be a local boot target, so we would end up with dependency cycles
# such as:
#
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
#
# By removing the automatic Before=swap.target, the swap unit is no longer
# part of swap.target, avoiding such cycles. The swap will still be
# activated via WantedBy=multi-user.target.
unit_data = '''
[Unit]
Description=swapfile
DefaultDependencies=no
After={}
Conflicts=umount.target
Before=umount.target
[Swap]
What={}
[Install]
WantedBy=multi-user.target
'''[1:-1].format(swapfile)
'''[1:-1].format(mount_unit, swapfile)
with swapunit.open('w') as f:
f.write(unit_data)
systemd_unit.reload()

View File

@@ -727,7 +727,12 @@ public:
// now we need one page more to be able to save one for next lap
auto fill_size = align_up(buf1.size(), block_size) + block_size - buf1.size();
auto buf2 = co_await _input.read_exactly(fill_size);
// If the underlying stream is already at EOF (e.g. buf1 came from
// cached _next while the previous read_exactly drained the source),
// skip the read_exactly call — it would return empty anyway.
auto buf2 = _input.eof()
? temporary_buffer<char>()
: co_await _input.read_exactly(fill_size);
temporary_buffer<char> output(buf1.size() + buf2.size());

View File

@@ -1045,7 +1045,7 @@ future<seastar::shared_ptr<encryption_context>> register_extensions(const db::co
// Since we are in pre-init phase, this should be safe.
co_await smp::invoke_on_all([&opts, &exts] () mutable {
auto& f = exts.schema_extensions().at(encryption_attribute);
for (auto& s : { db::system_keyspace::paxos(), db::system_keyspace::batchlog(), db::system_keyspace::dicts() }) {
for (auto& s : { db::system_keyspace::paxos(), db::system_keyspace::batchlog(), db::system_keyspace::dicts(), db::system_keyspace::raft() }) {
exts.add_extension_to_schema(s, encryption_attribute, f(*opts));
}
});

View File

@@ -38,7 +38,7 @@ This directory should have 700 permissions and belong to the scylla user)foo")
R"foo(System information encryption settings
If enabled, system tables that may contain sensitive information (system.batchlog,
system.paxos), hints files and commit logs are encrypted with the
system.paxos, system.raft), hints files and commit logs are encrypted with the
encryption settings below.
When enabling system table encryption on a node with existing data, run

View File

@@ -437,7 +437,6 @@ void ldap_connection::poll_results() {
const auto found = _msgid_to_promise.find(id);
if (found == _msgid_to_promise.end()) {
mylog.error("poll_results: got valid result for unregistered id {}, dropping it", id);
ldap_msgfree(result);
} else {
found->second.set_value(std::move(result_ptr));
_msgid_to_promise.erase(found);

View File

@@ -101,7 +101,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status_help
.entity = "",
.progress_units = "",
.progress = tasks::task_manager::task::progress{},
.children = started ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{}
.children = started ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{}
};
}

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:a0222f3a92cdaa1af5290e74fd86fb664db0f4d254af4c7abeab6c44ae991ecf
size 6300272
oid sha256:9e5a3737655db66bc8c963154c38462d3efcd444ff2efa2623e0ab15e49207f7
size 6297128

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:aee13f1b8ce82992e0a5929c62baa3af28d2d0381c188d515538eb5664766903
size 6311280
oid sha256:71512bffe908b40a4a0d8fd89a6ee48e7c7e7093f88e3a59ece3de31a72bffa1
size 6302908

View File

@@ -1023,8 +1023,8 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
on_internal_error_noexcept(rcslog,
format("reader_concurrency_semaphore::signal(): semaphore {} detected resource leak, available {} exceeds initial {}", _name,
_resources, _initial_resources));
_resources.count = std::max(_resources.count, _initial_resources.count);
_resources.memory = std::max(_resources.memory, _initial_resources.memory);
_resources.count = std::min(_resources.count, _initial_resources.count);
_resources.memory = std::min(_resources.memory, _initial_resources.memory);
}
maybe_wake_execution_loop();
}

View File

@@ -2957,7 +2957,7 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
auto it = _storage_groups.find(group_id);
if (it == _storage_groups.end()) {
throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id));
throw std::runtime_error(format("Unable to find sibling tablet of id {} for table {}", group_id, table_id));
}
auto& sg = it->second;
sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) {

View File

@@ -454,7 +454,7 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
auto ps_ptr = qp.get_prepared(cache_key);
if (!ps_ptr) {
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
ps_ptr = std::move(msg_ptr->get_prepared());
ps_ptr = msg_ptr->get_prepared();
if (!ps_ptr) {
on_internal_error(paxos_state::logger, "prepared statement is null");
}

View File

@@ -140,14 +140,19 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
auto task_type = hint.get_task_type();
auto tablet_id_opt = tablet_id_provided(task_type) ? std::make_optional(hint.get_tablet_id()) : std::nullopt;
size_t tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
const auto& tablets = _ss.get_token_metadata().tablets();
size_t tablet_count = tablets.has_tablet_map(table) ? tablets.get_tablet_map(table).tablet_count() : 0;
auto res = co_await get_status_helper(id, std::move(hint));
if (!res) {
co_return std::nullopt;
}
tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished");
co_await utils::get_local_injector().inject("tablet_virtual_task_wait", utils::wait_for_message(60s));
co_await _ss._topology_state_machine.event.wait([&] {
if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) {
return true;
}
auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table);
if (is_resize_task(task_type)) { // Resize task.
return tmap.resize_task_info().tablet_task_id.uuid() != id.uuid();
@@ -161,6 +166,10 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
});
res->status.state = tasks::task_manager::task_state::done; // Failed repair task is retried.
if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) {
res->status.end_time = db_clock::now();
co_return res->status;
}
if (is_migration_task(task_type)) {
auto& replicas = _ss.get_token_metadata().tablets().get_tablet_map(table).get_tablet_info(tablet_id_opt.value()).replicas;
auto migration_failed = std::all_of(replicas.begin(), replicas.end(), [&] (const auto& replica) { return res->pending_replica.has_value() && replica != res->pending_replica.value(); });
@@ -168,9 +177,9 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
} else if (is_resize_task(task_type)) {
auto new_tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
res->status.state = new_tablet_count == tablet_count ? tasks::task_manager::task_state::suspended : tasks::task_manager::task_state::done;
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{};
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{};
} else {
res->status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()));
res->status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr());
}
res->status.end_time = db_clock::now(); // FIXME: Get precise end time.
co_return res->status;
@@ -243,7 +252,15 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
status_helper res;
auto table = hint.get_table_id();
auto task_type = hint.get_task_type();
auto schema = _ss._db.local().get_tables_metadata().get_table(table).schema();
auto table_ptr = _ss._db.local().get_tables_metadata().get_table_if_exists(table);
if (!table_ptr) {
co_return tasks::task_status {
.task_id = id,
.kind = tasks::task_kind::cluster,
.is_abortable = co_await is_abortable(std::move(hint)),
};
}
auto schema = table_ptr->schema();
res.status = {
.task_id = id,
.kind = tasks::task_kind::cluster,
@@ -276,7 +293,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
}
return make_ready_future();
});
res.status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()));
res.status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr());
} else if (is_migration_task(task_type)) { // Migration task.
auto tablet_id = hint.get_tablet_id();
res.pending_replica = tmap.get_tablet_transition_info(tablet_id)->pending_replica;
@@ -290,7 +307,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
if (task_info.tablet_task_id.uuid() == id.uuid()) {
update_status(task_info, res.status, sched_nr);
res.status.state = tasks::task_manager::task_state::running;
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{};
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{};
co_return res;
}
}

View File

@@ -18,6 +18,13 @@ class service_permit {
friend service_permit empty_service_permit();
public:
size_t count() const { return _permit ? _permit->count() : 0; };
// Merge additional semaphore units into this permit.
// Used to grow the permit after the actual resource cost is known.
void adopt(seastar::semaphore_units<>&& units) {
if (_permit) {
_permit->adopt(std::move(units));
}
}
};
inline service_permit make_service_permit(seastar::semaphore_units<>&& permit) {

View File

@@ -28,6 +28,7 @@
#include "locator/abstract_replication_strategy.hh"
#include "message/messaging_service.hh"
#include "service/storage_service.hh"
#include "utils/error_injection.hh"
#include <cfloat>
#include <algorithm>
@@ -406,6 +407,9 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::
size_t nr_sst_current = 0;
while (!sstables.empty()) {
co_await utils::get_local_injector().inject("load_and_stream_before_streaming_batch",
utils::wait_for_message(60s));
const size_t batch_sst_nr = std::min(16uz, sstables.size());
auto sst_processed = sstables
| std::views::reverse
@@ -576,6 +580,16 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
// throughout its lifetime.
auto erm = co_await await_topology_quiesced_and_get_erm(table_id);
// Obtain a phaser guard to prevent the table from being destroyed
// while streaming is in progress. table::stop() calls
// _pending_streams_phaser.close() which blocks until all outstanding
// stream_in_progress() guards are released, so holding this guard
// keeps the table alive for the entire streaming operation.
// find_column_family throws no_such_column_family if the table was
// already dropped before we got here.
auto& tbl = _db.local().find_column_family(table_id);
auto stream_guard = tbl.stream_in_progress();
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
_messaging, _db.local(), table_id, std::move(erm), std::move(sstables),
primary, unlink_sstables(unlink), scope);

View File

@@ -74,7 +74,7 @@ future<bool> table_helper::try_prepare(bool fallback, cql3::query_processor& qp,
auto& stmt = fallback ? _insert_cql_fallback.value() : _insert_cql;
try {
shared_ptr<cql_transport::messages::result_message::prepared> msg_ptr = co_await qp.prepare(stmt, qs.get_client_state(), dialect);
_prepared_stmt = std::move(msg_ptr->get_prepared());
_prepared_stmt = msg_ptr->get_prepared();
shared_ptr<cql3::cql_statement> cql_stmt = _prepared_stmt->statement;
_insert_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(cql_stmt);
_is_fallback_stmt = fallback;

View File

@@ -400,7 +400,7 @@ task_manager::virtual_task::impl::impl(module_ptr module) noexcept
: _module(std::move(module))
{}
future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, std::function<bool(locator::host_id)> is_host_alive) {
future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr) {
auto ms = module->get_task_manager()._messaging;
if (!ms) {
auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id);
@@ -417,19 +417,18 @@ future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::g
tmlogger.info("tasks_vt_get_children: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{10});
});
co_return co_await map_reduce(nodes, [ms, parent_id, is_host_alive = std::move(is_host_alive)] (auto host_id) -> future<utils::chunked_vector<task_identity>> {
if (is_host_alive(host_id)) {
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
return resp | std::views::transform([host_id] (auto id) {
return task_identity{
.host_id = host_id,
.task_id = id
};
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
});
} else {
return make_ready_future<utils::chunked_vector<task_identity>>();
}
co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future<utils::chunked_vector<task_identity>> {
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
return resp | std::views::transform([host_id] (auto id) {
return task_identity{
.host_id = host_id,
.task_id = id
};
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
}).handle_exception_type([host_id, parent_id] (const rpc::closed_error& ex) {
tmlogger.warn("Failed to get children of virtual task with id={} from node {}: {}", parent_id, host_id, ex);
return utils::chunked_vector<task_identity>{};
});
}, utils::chunked_vector<task_identity>{}, [] (auto a, auto&& b) {
std::move(b.begin(), b.end(), std::back_inserter(a));
return a;

View File

@@ -19,6 +19,7 @@
#include "db_clock.hh"
#include "utils/log.hh"
#include "locator/host_id.hh"
#include "locator/token_metadata_fwd.hh"
#include "schema/schema_fwd.hh"
#include "tasks/types.hh"
#include "utils/chunked_vector.hh"
@@ -281,7 +282,7 @@ public:
impl& operator=(impl&&) = delete;
virtual ~impl() = default;
protected:
static future<utils::chunked_vector<task_identity>> get_children(module_ptr module, task_id parent_id, std::function<bool(locator::host_id)> is_host_alive);
static future<utils::chunked_vector<task_identity>> get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr);
public:
virtual task_group get_group() const noexcept = 0;
// Returns std::nullopt if an operation with task_id isn't tracked by this virtual_task.

View File

@@ -372,14 +372,17 @@ def test_streams_operations(test_table_s, dynamodbstreams, metrics):
# to update latencies for one kind of operation (#17616, and compare #9406),
# and to do that checking that ..._count increases for that op is enough.
@contextmanager
def check_sets_latency(metrics, operation_names):
def check_sets_latency_by_metric(metrics, operation_names, metric_name):
the_metrics = get_metrics(metrics)
saved_latency_count = { x: get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': x}, the_metrics) for x in operation_names }
saved_latency_count = { x: get_metric(metrics, f'{metric_name}_count', {'op': x}, the_metrics) for x in operation_names }
yield
the_metrics = get_metrics(metrics)
for op in operation_names:
# The total "count" on all shards should strictly increase
assert saved_latency_count[op] < get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': op}, the_metrics)
assert saved_latency_count[op] < get_metric(metrics, f'{metric_name}_count', {'op': op}, the_metrics)
def check_sets_latency(metrics, operation_names):
return check_sets_latency_by_metric(metrics, operation_names, 'scylla_alternator_op_latency')
# Test latency metrics for PutItem, GetItem, DeleteItem, UpdateItem.
# We can't check what exactly the latency is - just that it gets updated.
@@ -395,6 +398,18 @@ def test_item_latency(test_table_s, metrics):
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
def test_item_latency_per_table(test_table_s, metrics):
with check_sets_latency_by_metric(metrics, ['DeleteItem', 'GetItem', 'PutItem', 'UpdateItem', 'BatchWriteItem', 'BatchGetItem'], 'scylla_alternator_table_op_latency'):
p = random_string()
test_table_s.put_item(Item={'p': p})
test_table_s.get_item(Key={'p': p})
test_table_s.delete_item(Key={'p': p})
test_table_s.update_item(Key={'p': p})
test_table_s.meta.client.batch_write_item(RequestItems = {
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
# Test latency metrics for GetRecords. Other Streams-related operations -
# ListStreams, DescribeStream, and GetShardIterator, have an operation
# count (tested above) but do NOT currently have a latency histogram.

View File

@@ -25,6 +25,7 @@
#include "test/lib/exception_utils.hh"
#include "utils/io-wrappers.hh"
using namespace encryption;
static tmpdir dir;
@@ -595,6 +596,123 @@ SEASTAR_TEST_CASE(test_encrypted_data_source_simple) {
co_await test_random_data_source(sizes);
}
// Reproduces the production deadlock where encrypted SSTable component downloads
// got stuck during restore. The encrypted_data_source::get() caches a block in
// _next, then on the next call bypasses input_stream::read()'s _eof check and
// calls input_stream::read_exactly() — which does NOT check _eof when _buf is
// empty. This causes a second get() on the underlying source after EOS.
//
// In production the underlying source was chunked_download_source whose get()
// hung forever. Here we simulate it with a strict source that fails the test.
//
// The fix belongs in seastar's input_stream::read_exactly(): check _eof before
// calling _fd.get(), consistent with read(), read_up_to(), and consume().
static future<> test_encrypted_source_copy(size_t plaintext_size) {
testlog.info("test_encrypted_source_copy: plaintext_size={}", plaintext_size);
key_info info{"AES/CBC", 256};
auto k = ::make_shared<symmetric_key>(info);
// Step 1: Encrypt the plaintext into memory buffers
auto plaintext = generate_random<char>(plaintext_size);
std::vector<temporary_buffer<char>> encrypted_bufs;
{
data_sink sink(make_encrypted_sink(create_memory_sink(encrypted_bufs), k));
co_await sink.put(plaintext.clone());
co_await sink.close();
}
// Flatten encrypted buffers into a single contiguous buffer
size_t encrypted_total = 0;
for (const auto& b : encrypted_bufs) {
encrypted_total += b.size();
}
temporary_buffer<char> encrypted(encrypted_total);
size_t pos = 0;
for (const auto& b : encrypted_bufs) {
std::copy(b.begin(), b.end(), encrypted.get_write() + pos);
pos += b.size();
}
// Step 2: Create a data source from the encrypted data that fails on
// post-EOS get() — simulating a source like chunked_download_source
// that would hang forever in this situation.
// A simple data source that serves data from a temporary_buffer in
// chunks of at most chunk_size, then returns EOF. It also asserts
// that get() is never called after EOF — simulating a source like
// chunked_download_source that would hang forever in that case.
class strict_memory_source final : public data_source_impl {
temporary_buffer<char> _data;
size_t _chunk_size;
bool _eof = false;
public:
strict_memory_source(temporary_buffer<char> data, size_t chunk_size)
: _data(std::move(data))
, _chunk_size(chunk_size) {}
future<temporary_buffer<char>> get() override {
BOOST_REQUIRE_MESSAGE(!_eof,
"get() called on source after it already returned EOS — "
"this is the production deadlock: read_exactly() does not "
"check _eof before calling _fd.get()");
if (_data.empty()) {
_eof = true;
co_return temporary_buffer<char>{};
}
auto n = std::min(_chunk_size, _data.size());
auto result = _data.share(0, n);
_data.trim_front(n);
co_return result;
}
};
// Step 3: Wrap in encrypted_data_source and drain via consume() —
// the exact code path used by seastar::copy() which is what
// sstables_loader_helpers::download_sstable() calls.
// Try multiple chunk sizes to hit different alignment scenarios.
for (size_t chunk_size : {1ul, 7ul, 4096ul, 8192ul, encrypted_total, encrypted_total + 1}) {
if (chunk_size == 0) continue;
auto src = data_source(make_encrypted_source(
data_source(std::make_unique<strict_memory_source>(encrypted.clone(), chunk_size)), k));
auto in = input_stream<char>(std::move(src));
// consume() is what seastar::copy() uses internally. It calls
// encrypted_data_source::get() via _fd.get() until EOF.
size_t total_decrypted = 0;
co_await in.consume([&total_decrypted](temporary_buffer<char> buf) {
total_decrypted += buf.size();
return make_ready_future<consumption_result<char>>(continue_consuming{});
});
co_await in.close();
BOOST_REQUIRE_EQUAL(total_decrypted, plaintext_size);
}
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_8k) {
co_await test_encrypted_source_copy(8192);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_4k) {
co_await test_encrypted_source_copy(4096);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_small) {
co_await test_encrypted_source_copy(100);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_12k) {
co_await test_encrypted_source_copy(12288);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_unaligned) {
co_await test_encrypted_source_copy(8193);
}
SEASTAR_TEST_CASE(test_encrypted_source_copy_1byte) {
co_await test_encrypted_source_copy(1);
}
SEASTAR_TEST_CASE(test_encrypted_data_source_fuzzy) {
std::mt19937_64 rand_gen(std::random_device{}());

View File

@@ -38,6 +38,7 @@
#include "test/lib/proc_utils.hh"
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/system_keyspace.hh"
#include "db/commitlog/commitlog.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "init.hh"
@@ -794,6 +795,39 @@ static auto make_commitlog_config(const test_provider_args& args, const std::uno
return std::make_tuple(cfg, ext);
}
SEASTAR_TEST_CASE(test_system_info_encryption_includes_raft_tables) {
tmpdir tmp;
auto sysdir = tmp.path() / "system_keys";
auto syskey = sysdir / "system" / "system_table_keytab";
auto yaml = fmt::format("system_key_directory: {}", sysdir.string());
co_await create_key_file(syskey, { { "AES/CBC/PKCSPadding", 128 }});
test_provider_args args{
.tmp = tmp,
.extra_yaml = yaml,
};
auto [cfg, ext] = make_commitlog_config(args, {});
co_await do_with_cql_env_thread(
[](cql_test_env& env) {
auto check_has_encryption = [&](schema_ptr s) {
auto it = s->extensions().find("scylla_encryption_options");
BOOST_REQUIRE_MESSAGE(it != s->extensions().end(),
fmt::format("Expected encryption extension on {}.{}",
s->ks_name(), s->cf_name()));
BOOST_REQUIRE_MESSAGE(!it->second->is_placeholder(),
fmt::format("Encryption extension on {}.{} "
"should not be a placeholder",
s->ks_name(), s->cf_name()));
};
check_has_encryption(db::system_keyspace::raft());
},
cfg, {}, cql_test_init_configurables{ *ext });
}
static future<> test_encrypted_commitlog(const test_provider_args& args, std::unordered_map<std::string, std::string> scopts = {}) {
fs::path clback = args.tmp.path() / "commitlog_back";

View File

@@ -57,6 +57,20 @@ BOOST_AUTO_TEST_CASE(test_null_is_not_empty) {
BOOST_REQUIRE(empty != null);
}
BOOST_AUTO_TEST_CASE(test_null_data_value_to_parsable_string) {
auto null_utf8 = data_value::make_null(utf8_type);
BOOST_REQUIRE_EQUAL(null_utf8.to_parsable_string(), "null");
auto null_int = data_value::make_null(int32_type);
BOOST_REQUIRE_EQUAL(null_int.to_parsable_string(), "null");
auto null_list = data_value::make_null(list_type_impl::get_instance(int32_type, true));
BOOST_REQUIRE_EQUAL(null_list.to_parsable_string(), "null");
auto null_map = data_value::make_null(map_type_impl::get_instance(utf8_type, int32_type, true));
BOOST_REQUIRE_EQUAL(null_map.to_parsable_string(), "null");
}
BOOST_AUTO_TEST_CASE(test_bytes_type_string_conversions) {
BOOST_REQUIRE(bytes_type->equal(bytes_type->from_string("616263646566"), bytes_type->decompose(data_value(bytes{"abcdef"}))));
}

View File

@@ -9,6 +9,7 @@ import os
import random
import string
import tempfile
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from pprint import pformat
@@ -476,24 +477,27 @@ class TesterAlternator(BaseAlternator):
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
node1 = self.cluster.nodelist()[0]
create_tables_threads = []
for tables_num in range(concurrent_requests_limit * 5):
create_tables_threads.append(self.run_create_table_thread())
stop_workers = threading.Event()
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
def wait_for_create_table_request_failure():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error:
if "RequestLimitExceeded" in error.args[0]:
return
raise
raise ConcurrencyLimitNotExceededError
def run_create_table_until_limited() -> None:
while not stop_workers.is_set():
try:
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
except Exception as error: # noqa: BLE001
if "RequestLimitExceeded" in str(error):
stop_workers.set()
return
raise
wait_for_create_table_request_failure()
with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor:
create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)]
for thread in create_tables_threads:
thread.join()
if not stop_workers.wait(timeout=30):
raise ConcurrencyLimitNotExceededError
stop_workers.set()
for future in create_table_futures:
future.result(timeout=60)
@staticmethod
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):

View File

@@ -255,27 +255,3 @@ async def test_node_ops_task_wait(manager: ManagerClient):
await decommission_task
await waiting_task
@pytest.mark.asyncio
async def test_get_children(manager: ManagerClient):
module_name = "node_ops"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline) for _ in range(2)]
injection = "tasks_vt_get_children"
handler = await inject_error_one_shot(manager.api, servers[0].ip_addr, injection)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
bootstrap_task = [task for task in await tm.list_tasks(servers[0].ip_addr, module_name) if task.kind == "cluster"][0]
async def _decommission():
await log.wait_for('tasks_vt_get_children: waiting', from_mark=mark)
await manager.decommission_node(servers[1].server_id)
await handler.message()
async def _get_status():
await tm.get_task_status(servers[0].ip_addr, bootstrap_task.task_id)
await asyncio.gather(*(_decommission(), _get_status()))

View File

@@ -96,6 +96,50 @@ async def test_tablet_repair_task(manager: ManagerClient):
await asyncio.gather(repair_task(), check_and_abort_repair_task(manager, tm, servers, module_name, ks))
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_repair_wait_with_table_drop(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
injection = "tablet_virtual_task_wait"
cmdline = [
'--logger-log-level', 'debug_error_injection=debug',
]
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, cmdline=cmdline)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
token = -1
await enable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=ks)
task = repair_tasks[0]
assert task.scope == "table"
assert task.keyspace == ks
assert task.table == "test"
assert task.state in ["created", "running"]
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
await enable_injection(manager, [servers[0]], injection)
async def wait_for_task():
status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
assert status_wait.state == "done"
async def drop_table():
await log.wait_for(f'"{injection}"', from_mark=mark)
await disable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
await manager.get_cql().run_async(f"DROP TABLE {ks}.test")
await manager.api.message_injection(servers[0].ip_addr, injection)
await asyncio.gather(wait_for_task(), drop_table())
await disable_injection(manager, servers, injection)
async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str):
def get_task_with_id(repair_tasks, task_id):
tasks_with_id1 = [task for task in repair_tasks if task.task_id == task_id]

View File

@@ -17,10 +17,10 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
from test.pylib.tablets import get_tablet_replicas
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import wait_for
from test.pylib.util import gather_safely, wait_for
from test.cluster.conftest import skip_mode
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
logger = logging.getLogger(__name__)
@@ -52,28 +52,42 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
@pytest.mark.asyncio
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
node_count = 2
servers = await manager.servers_add(node_count)
cmdline = ["--logger-log-level", "hints_manager=trace"]
servers = await manager.servers_add(node_count, cmdline=cmdline)
async def wait_for_hints_written(min_hint_count: int, timeout: int):
async def aux():
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
if hints_written >= min_hint_count:
return True
return None
assert await wait_for(aux, time.time() + timeout)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.t"
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
uses_tablets = await keyspace_has_tablets(manager, ks)
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
# Otherwise, it could happen that all mutations would target servers[0] only, which would
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
# distributed more or less uniformly!
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
await manager.server_stop_gracefully(servers[1].server_id)
await manager.server_stop_gracefully(servers[1].server_id)
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ANY
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
for i in range(100):
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
# Some of the inserts will be targeted to the dead node.
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
# Verify hints are written
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
assert hints_after > hints_before
# Verify hints are written
await wait_for_hints_written(hints_before + 1, timeout=60)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
# For dropping the keyspace
await manager.server_start(servers[1].server_id)
@pytest.mark.asyncio
async def test_limited_concurrency_of_writes(manager: ManagerClient):
@@ -269,6 +283,18 @@ async def test_hints_consistency_during_replace(manager: ManagerClient):
# Write 100 rows with CL=ANY. Some of the rows will only be stored as hints because of RF=1
for i in range(100):
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i + 1})", consistency_level=ConsistencyLevel.ANY))
# Hint writes are fire-and-forget (store_hint() submits do_store_hint()
# asynchronously via a gate). Wait for all pending hint writes to complete
# before creating the sync point, otherwise it may capture a stale
# replay position and miss some hints.
async def no_pending_hint_writes():
size = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "size_of_hints_in_progress")
if size == 0:
return True
return None
await wait_for(no_pending_hint_writes, time.time() + 30)
sync_point = await create_sync_point(manager.api.client, servers[0].ip_addr)
await manager.server_add(replace_cfg=ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True))

View File

@@ -44,10 +44,10 @@ def get_sstables(workdir, ks, table):
sstables = glob.glob(base_pattern)
return sstables
async def get_sst_status(run, log):
sst_add = await log.grep(rf'.*Added sst.*for incremental repair')
sst_skip = await log.grep(rf'.*Skipped adding sst.*for incremental repair')
sst_mark = await log.grep(rf'.*Marking.*for incremental repair')
async def get_sst_status(run, log, from_mark=None):
sst_add = await log.grep(rf'.*Added sst.*for incremental repair', from_mark=from_mark)
sst_skip = await log.grep(rf'.*Skipped adding sst.*for incremental repair', from_mark=from_mark)
sst_mark = await log.grep(rf'.*Marking.*for incremental repair', from_mark=from_mark)
logging.info(f'{run=}: {sst_add=} {sst_skip=} {sst_mark=}');
logging.info(f'{run=}: {len(sst_add)=} {len(sst_skip)=} {len(sst_mark)=}');
return sst_add, sst_skip, sst_mark
@@ -331,10 +331,11 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
token = -1
marks = [await log.mark() for log in logs]
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 0 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
for log, mark in zip(logs, marks):
sst_add, sst_skip, sst_mark = await get_sst_status("First", log, from_mark=mark)
assert len(sst_add) == 1
assert len(sst_skip) == 0
assert len(sst_mark) == 1
@@ -357,13 +358,14 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
else:
assert False # Wrong ops
marks = [await log.mark() for log in logs]
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# 1 add 1 skip 1 mark
for log in logs:
sst_add, sst_skip, sst_mark = await get_sst_status("Second", log)
assert len(sst_add) == 2
assert len(sst_mark) == 2
for log, mark in zip(logs, marks):
sst_add, sst_skip, sst_mark = await get_sst_status("Second", log, from_mark=mark)
assert len(sst_add) == 1
assert len(sst_mark) == 1
assert len(sst_skip) == 1
@pytest.mark.asyncio

View File

@@ -0,0 +1,65 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
from test.cluster.util import new_test_keyspace, new_test_table
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode")
async def test_prepare_fails_if_cached_statement_is_invalidated_mid_prepare(manager: ManagerClient):
server = await manager.server_add()
cql = manager.get_cql()
log = await manager.server_open_log(server.server_id)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY") as table:
query = f"SELECT * FROM {table} WHERE pk = ?"
loop = asyncio.get_running_loop()
await cql.run_async(f"INSERT INTO {table} (pk) VALUES (7)")
await cql.run_async(f"INSERT INTO {table} (pk) VALUES (8)")
handler = await inject_error_one_shot(manager.api, server.ip_addr, "query_processor_prepare_wait_after_cache_get")
mark = await log.mark()
prepare_future = loop.run_in_executor(None, lambda: cql.prepare(query))
await log.wait_for("query_processor_prepare_wait_after_cache_get: waiting for message", from_mark=mark, timeout=60)
# Trigger table schema update (metadata-only) to invalidate prepared statements while PREPARE is paused.
await cql.run_async(f"ALTER TABLE {table} WITH comment = 'invalidate-prepared-race'")
await handler.message()
done, _ = await asyncio.wait({prepare_future}, timeout=15)
if not done:
pytest.fail("Timed out waiting for PREPARE to complete after signaling injection")
result = done.pop().result()
print(f"PREPARE succeeded as expected: {result!r}")
rows = cql.execute(result, [7])
row = rows.one()
assert row is not None and row.pk == 7
# Invalidate prepared statements again, then execute the same prepared object.
# The driver should transparently re-prepare and re-request execution.
await cql.run_async(f"ALTER TABLE {table} WITH comment = 'invalidate-prepared-race-again'")
reprepare_handler = await inject_error_one_shot(manager.api, server.ip_addr, "query_processor_prepare_wait_after_cache_get")
reprepare_mark = await log.mark()
execute_future = loop.run_in_executor(None, lambda: cql.execute(result, [8]))
await log.wait_for("query_processor_prepare_wait_after_cache_get: waiting for message", from_mark=reprepare_mark, timeout=60)
await reprepare_handler.message()
execute_done, _ = await asyncio.wait({execute_future}, timeout=15)
if not execute_done:
pytest.fail("Timed out waiting for driver execute to finish after re-prepare signaling")
retried_rows = execute_done.pop().result()
retried_row = retried_rows.one()
assert retried_row is not None and retried_row.pk == 8

View File

@@ -15,6 +15,7 @@ from test.pylib.manager_client import ManagerClient, ServerInfo
from test.pylib.rest_client import read_barrier, HTTPError
from test.pylib.scylla_cluster import ScyllaVersionDescription
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
from test.cluster.util import reconnect_driver
from cassandra.cluster import ConsistencyLevel
from cassandra.policies import FallthroughRetryPolicy
from cassandra.protocol import ServerError
@@ -162,6 +163,7 @@ async def test_upgrade_and_rollback(manager: ManagerClient, scylla_2025_1: Scyll
)
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature")
cql = await reconnect_driver(manager)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, h, time.time() + 60) for h in hosts))

View File

@@ -1,8 +1,16 @@
import asyncio
import logging
import os
import shutil
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
from test.pylib.manager_client import ManagerClient
from test.pylib.util import unique_name
import pytest
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_drop_table_during_streaming_receiver_side(manager: ManagerClient):
servers = [await manager.server_add(config={
@@ -24,3 +32,146 @@ async def test_drop_table_during_flush(manager: ManagerClient):
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_drop_table_during_load_and_stream(manager: ManagerClient):
"""Verify that dropping a table while load_and_stream is in progress
does not crash. The stream_in_progress() phaser guard acquired in
sstables_loader::load_and_stream keeps the table object alive until
streaming completes, so table::stop() blocks until the guard is
released — preventing a use-after-free on the replica::table&
reference held by the streamer.
Uses the 'load_and_stream_before_streaming_batch' error injection
to pause load_and_stream inside the streaming loop (after the
streamer is created and holds a replica::table& reference), then
issues DROP TABLE concurrently and verifies both operations complete
gracefully.
A single node is sufficient: load_and_stream streams SSTables to
the natural replicas (the local node in this case) via RPC.
"""
server = await manager.server_add()
cql = manager.get_cql()
ks = unique_name("ks_")
cf = "test"
await cql.run_async(
f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}")
try:
await cql.run_async(f"CREATE TABLE {ks}.{cf} (pk int PRIMARY KEY, c int)")
# Insert data and flush to create SSTables on disk
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{cf} (pk, c) VALUES ({k}, {k})") for k in range(64)])
await manager.api.flush_keyspace(server.ip_addr, ks)
# Take snapshot so we have SSTables to copy into upload dir
snap_name = unique_name("snap_")
await manager.api.take_snapshot(server.ip_addr, ks, snap_name)
# Copy snapshot SSTables into the upload directory.
# load_and_stream will read these and stream to replicas.
workdir = await manager.server_get_workdir(server.server_id)
cf_dir = os.listdir(f"{workdir}/data/{ks}")[0]
cf_path = os.path.join(f"{workdir}/data/{ks}", cf_dir)
upload_dir = os.path.join(cf_path, "upload")
os.makedirs(upload_dir, exist_ok=True)
snapshots_dir = os.path.join(cf_path, "snapshots", snap_name)
exclude_list = ["manifest.json", "schema.cql"]
for item in os.listdir(snapshots_dir):
if item not in exclude_list:
shutil.copy2(os.path.join(snapshots_dir, item), os.path.join(upload_dir, item))
# Enable injection that pauses load_and_stream inside the streaming
# loop, after the streamer is created with a replica::table& reference.
# one_shot=True: the test only needs one pause to demonstrate the race;
# after firing once per shard the injection disables itself, avoiding
# blocking on subsequent batches.
await manager.api.enable_injection(server.ip_addr, "load_and_stream_before_streaming_batch", one_shot=True)
server_log = await manager.server_open_log(server.server_id)
log_mark = await server_log.mark()
# Start load_and_stream in the background — it will pause at the injection.
refresh_task = asyncio.create_task(
manager.api.load_new_sstables(server.ip_addr, ks, cf, load_and_stream=True))
# Wait until at least one shard hits the injection point
await server_log.wait_for("load_and_stream_before_streaming_batch: waiting for message", from_mark=log_mark)
logger.info("load_and_stream paused at injection point")
# Drop the table while streaming is paused. With the stream_in_progress
# guard the DROP will block until the guard is released.
drop_task = asyncio.ensure_future(cql.run_async(f"DROP TABLE {ks}.{cf}"))
# Give the DROP a moment to be submitted and reach the server.
# A log-based wait would be more robust but there is no dedicated log
# message for "DROP blocked on phaser"; the sleep is acceptable here.
await asyncio.sleep(1)
# Release the injection — streaming resumes.
# With the fix, the phaser guard keeps the table object alive and
# streaming completes (or fails gracefully). Without the fix,
# the table is already destroyed and the node crashes
# (use-after-free).
await manager.api.message_injection(server.ip_addr, "load_and_stream_before_streaming_batch")
logger.info("Released injection, waiting for load_and_stream to complete")
# Wait for both operations with a timeout — if the node crashed the
# REST call / CQL query will never return.
refresh_error = None
try:
await asyncio.wait_for(refresh_task, timeout=30)
logger.info("load_and_stream completed")
except asyncio.TimeoutError:
refresh_error = "load_and_stream timed out — node likely crashed"
logger.info(refresh_error)
except Exception as e:
refresh_error = str(e)
logger.info(f"load_and_stream finished with error: {e}")
drop_error = None
try:
await asyncio.wait_for(drop_task, timeout=30)
logger.info("DROP TABLE completed")
except asyncio.TimeoutError:
drop_error = "DROP TABLE timed out"
logger.info(drop_error)
except Exception as e:
drop_error = str(e)
logger.info(f"DROP TABLE finished with error: {e}")
# The critical assertion: the node must still be alive.
# Without the stream_in_progress() guard, the table is destroyed
# while streaming holds a dangling reference, causing a crash
# (SEGV or ASAN heap-use-after-free).
crash_matches = await server_log.grep(
r"Segmentation fault|AddressSanitizer|heap-use-after-free|ABORTING",
from_mark=log_mark)
assert not crash_matches, \
"Node crashed during load_and_stream — " \
"stream_in_progress() guard is needed to keep the table alive"
# DROP TABLE must complete.
assert not drop_error, f"DROP TABLE failed unexpectedly: {drop_error}"
# load_and_stream may fail with a "column family not found" error:
# database::drop_table() removes the table from metadata (so
# find_column_family() fails) before cleanup_drop_table_on_all_shards()
# awaits the phaser. When streaming resumes and opens an RPC channel,
# the receiver-side handler calls find_column_family() which throws.
# This is the expected graceful failure — the important thing is
# no crash (checked above).
if refresh_error:
assert "Can't find a column family" in refresh_error, \
f"load_and_stream failed with unexpected error: {refresh_error}"
finally:
# Clean up keyspace if it still exists
try:
await cql.run_async(f"DROP KEYSPACE IF EXISTS {ks}")
except Exception:
pass

View File

@@ -534,7 +534,9 @@ async def new_test_keyspace(manager: ManagerClient, opts, host=None):
logger.info(f"Error happened while using keyspace '{keyspace}', the keyspace is left in place for investigation")
raise
else:
await manager.get_cql().run_async("DROP KEYSPACE " + keyspace, host=host)
# Use DROP KEYSPACE IF EXISTS as a workaround for
# https://github.com/scylladb/python-driver/issues/317
await manager.get_cql().run_async("DROP KEYSPACE IF EXISTS " + keyspace, host=host)
previously_used_table_names = []
@asynccontextmanager
@@ -579,6 +581,17 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
"""
Checks whether the given keyspace uses tablets.
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
"""
cql = manager.get_cql()
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
rows = list(rows_iter)
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
async def get_raft_log_size(cql, host) -> int:
query = "select count(\"index\") from system.raft"
return (await cql.run_async(query, host=host))[0][0]

View File

@@ -1497,6 +1497,42 @@ def test_views_with_future_tombstones(cql, test_keyspace):
assert [] == list(cql.execute(f'select * from {table}'))
assert [] == list(cql.execute(f'select * from {mv}'))
# Test that a range delete in the same batch as an insert correctly covers
# rows within the deleted range in the materialized view and that it doesn't
# cover rows outside the deleted range. The view update builder must track
# range tombstone changes from the update stream so that all range tombstones
# are applied to the clustering rows that they cover.
# Without this, an inserted row within the range incorrectly survives in the
# view or is incorrectly deleted.
# Reproduces SCYLLADB-1555.
def test_mv_range_delete_and_insert_in_same_batch(cql, test_keyspace):
# Case 1: Insert within the range-deleted interval. The range tombstone
# should shadow the insert, leaving both base and view empty.
with new_test_table(cql, test_keyspace,
'p int, c int, v int, w int, primary key (p, c)') as table:
with new_materialized_view(cql, table, '*', 'v, p, c',
'v is not null and p is not null and c is not null') as mv:
cql.execute(f"BEGIN BATCH "
f"DELETE FROM {table} WHERE p = 1 AND c >= 1 AND c <= 3; "
f"INSERT INTO {table} (p, c, v) VALUES (1, 3, 3); "
f"APPLY BATCH")
assert [] == list(cql.execute(f"SELECT * FROM {table}"))
assert [] == list(cql.execute(f"SELECT * FROM {mv}"))
# Case 2: A pre-existing row within the range, and an insert outside it.
# The range delete should remove the existing row, but the new row at c=4
# falls outside the range and should survive in both base and view.
with new_test_table(cql, test_keyspace,
'p int, c int, v int, w int, primary key (p, c)') as table:
with new_materialized_view(cql, table, '*', 'v, p, c',
'v is not null and p is not null and c is not null') as mv:
cql.execute(f"INSERT INTO {table} (p, c, v) VALUES (1, 2, 1)")
cql.execute(f"BEGIN BATCH "
f"DELETE FROM {table} WHERE p = 1 AND c >= 1 AND c <= 3; "
f"INSERT INTO {table} (p, c, v) VALUES (1, 4, 3); "
f"APPLY BATCH")
assert [] != list(cql.execute(f"SELECT * FROM {table}"))
assert [] != list(cql.execute(f"SELECT * FROM {mv}"))
# Test view representation in system.* tables
def test_view_in_system_tables(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int") as base:

View File

@@ -235,7 +235,7 @@ SEASTAR_THREAD_TEST_CASE(multiple_outstanding_operations_on_failing_connection)
mylog.trace("multiple_outstanding_operations_on_failing_connection");
with_ldap_connection(local_fail_inject_address, [] (ldap_connection& c) {
mylog.trace("multiple_outstanding_operations_on_failing_connection: invoking bind");
bind(c).handle_exception(&ignore).get();;
bind(c).handle_exception(&ignore).get();
std::vector<future<ldap_msg_ptr>> results_base;
for (size_t i = 0; i < 10; ++i) {
@@ -291,3 +291,31 @@ SEASTAR_THREAD_TEST_CASE(severed_connection_yields_exceptional_future) {
}
});
}
// Requires ASAN or valgrind to reliably detect the double-free.
SEASTAR_THREAD_TEST_CASE(unregistered_msgid_double_free) {
set_defbase();
with_ldap_connection(local_ldap_address, [] (ldap_connection& c) {
const auto bind_res = bind(c).get();
BOOST_REQUIRE_EQUAL(LDAP_RES_BIND, ldap_msgtype(bind_res.get()));
// Bypass the public API to send a search without registering its
// message ID, so poll_results() hits the unregistered-ID branch.
int msgid = -1;
const int rc = ldap_search_ext(c.get_ldap(), const_cast<char*>(base_dn), LDAP_SCOPE_SUBTREE,
/*filter=*/nullptr,
/*attrs=*/nullptr,
/*attrsonly=*/0,
/*serverctrls=*/nullptr,
/*clientctrls=*/nullptr,
/*timeout=*/nullptr,
/*sizelimit=*/0, &msgid);
BOOST_REQUIRE_EQUAL(LDAP_SUCCESS, rc);
BOOST_REQUIRE_NE(-1, msgid);
// A public-API search forces poll_results() to process the
// unregistered response before returning.
const auto dummy = search(c, base_dn).get();
BOOST_REQUIRE(dummy.get());
});
}

View File

@@ -154,11 +154,6 @@ struct unreachable_socket {
conn.shutdown_output();
co_await conn.wait_input_shutdown();
}
// There is currently no effective way to abort an ongoing connect in Seastar.
// Timing out connect by with_timeout, remains pending coroutine in the reactor.
// To prevent resource leaks, we close the unreachable socket and sleep,
// allowing the pending connect coroutines to fail and release their resources.
co_await seastar::sleep(3s);
}
};

View File

@@ -920,18 +920,20 @@ SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_request
// Verify backoff timing between status check connections.
// Skip the first connection (ANN request) and analyze status check intervals.
// Allow small tolerance for timer imprecision: measured intervals can be slightly shorter than the programmed sleep duration.
constexpr auto TIMER_TOLERANCE = std::chrono::milliseconds(10);
auto duration_between_1st_and_2nd_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
unavail_s->connections().at(2).timestamp - unavail_s->connections().at(1).timestamp);
BOOST_CHECK_GE(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(100));
BOOST_CHECK_GE(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(100) - TIMER_TOLERANCE);
BOOST_CHECK_LT(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(200));
auto duration_between_2nd_and_3rd_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
unavail_s->connections().at(3).timestamp - unavail_s->connections().at(2).timestamp);
// Max backoff time reached at 200ms, so subsequent status checks use fixed 200ms intervals.
BOOST_CHECK_GE(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(200)); // 200ms = 100ms * 2
BOOST_CHECK_GE(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(200) - TIMER_TOLERANCE); // 200ms = 100ms * 2
BOOST_CHECK_LT(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(400));
auto duration_between_3rd_and_4th_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
unavail_s->connections().at(4).timestamp - unavail_s->connections().at(3).timestamp);
BOOST_CHECK_GE(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(200));
BOOST_CHECK_GE(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(200) - TIMER_TOLERANCE);
BOOST_CHECK_LT(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(400));
},
cfg)

View File

@@ -67,14 +67,17 @@ void result_message::visitor_base::visit(const result_message::exception& ex) {
ex.throw_me();
}
result_message::prepared::prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt)
: _prepared(std::move(prepared))
result_message::prepared::prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
: _prepared_entry(std::move(prepared_entry))
, _metadata(
_prepared->bound_names,
_prepared->partition_key_bind_indices,
support_lwt_opt ? _prepared->statement->is_conditional() : false)
, _result_metadata{extract_result_metadata(_prepared->statement)}
(*_prepared_entry)->bound_names,
(*_prepared_entry)->partition_key_bind_indices,
support_lwt_opt ? (*_prepared_entry)->statement->is_conditional() : false)
, _result_metadata{extract_result_metadata((*_prepared_entry)->statement)}
{
for (const auto& w : (*_prepared_entry)->warnings){
add_warning(w);
}
}
::shared_ptr<const cql3::metadata> result_message::prepared::extract_result_metadata(::shared_ptr<cql3::cql_statement> statement) {

View File

@@ -13,6 +13,7 @@
#include <concepts>
#include "cql3/result_set.hh"
#include "cql3/prepared_statements_cache.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/query_options.hh"
@@ -30,14 +31,14 @@ namespace messages {
class result_message::prepared : public result_message {
private:
cql3::statements::prepared_statement::checked_weak_ptr _prepared;
cql3::prepared_statements_cache::pinned_value_type _prepared_entry;
cql3::prepared_metadata _metadata;
::shared_ptr<const cql3::metadata> _result_metadata;
protected:
prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt);
prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt);
public:
cql3::statements::prepared_statement::checked_weak_ptr& get_prepared() {
return _prepared;
cql3::statements::prepared_statement::checked_weak_ptr get_prepared() {
return (*_prepared_entry)->checked_weak_from_this();
}
const cql3::prepared_metadata& metadata() const {
@@ -49,7 +50,7 @@ public:
}
cql3::cql_metadata_id_type get_metadata_id() const {
return _prepared->get_metadata_id();
return (*_prepared_entry)->get_metadata_id();
}
class cql;
@@ -166,8 +167,8 @@ std::ostream& operator<<(std::ostream& os, const result_message::set_keyspace& m
class result_message::prepared::cql : public result_message::prepared {
bytes _id;
public:
cql(const bytes& id, cql3::statements::prepared_statement::checked_weak_ptr p, bool support_lwt_opt)
: result_message::prepared(std::move(p), support_lwt_opt)
cql(const bytes& id, cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
: result_message::prepared(std::move(prepared_entry), support_lwt_opt)
, _id{id}
{ }

View File

@@ -243,6 +243,12 @@ void cql_sg_stats::register_metrics()
);
}
transport_metrics.emplace_back(
sm::make_gauge("cql_pending_response_memory", [this] { return _pending_response_memory; },
sm::description("Holds the total memory in bytes consumed by responses waiting to be sent."),
{{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
);
new_metrics.add_group("transport", std::move(transport_metrics));
_metrics = std::exchange(new_metrics, {});
}
@@ -831,6 +837,8 @@ future<> cql_server::connection::process_request() {
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
try {
auto& sg_stats = _server.get_cql_sg_stats();
size_t pending_response_size = 0;
if (response_f.failed()) {
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
clogger.error("{}: {}", _client_state.get_remote_address(), message);
@@ -838,9 +846,22 @@ future<> cql_server::connection::process_request() {
message,
tracing::trace_state_ptr()));
} else {
write_response(response_f.get(), std::move(mem_permit), _compression);
auto response = response_f.get();
// Account for response body size exceeding the initial estimate.
auto resp_size = response->size();
auto permit_size = mem_permit.count();
if (resp_size > permit_size) {
auto extra = resp_size - permit_size;
auto extra_units = consume_units(_server._memory_available, extra);
mem_permit.adopt(std::move(extra_units));
}
pending_response_size = resp_size;
sg_stats._pending_response_memory += pending_response_size;
write_response(std::move(response), _compression);
}
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] {
sg_stats._pending_response_memory -= pending_response_size;
});
} catch (...) {
clogger.error("{}: request processing failed: {}",
_client_state.get_remote_address(), std::current_exception());
@@ -1754,9 +1775,9 @@ cql_server::connection::make_schema_change_event(const event::schema_change& eve
return response;
}
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit, cql_compression compression)
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, cql_compression compression)
{
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response), permit = std::move(permit)] () mutable {
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response)] () mutable {
utils::result_with_exception_ptr<scattered_message<char>> message = response->make_message(_version, compression);
if (!message) [[unlikely]] {
return make_exception_future<>(std::move(message).assume_error());

View File

@@ -138,6 +138,10 @@ struct cql_sg_stats {
request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) { return _cql_requests_stats[static_cast<uint8_t>(op)]; }
void register_metrics();
void rename_metrics();
// Track total memory consumed by responses waiting to be sent.
// Incremented when a response is queued, decremented when the write completes.
int64_t _pending_response_memory = 0;
private:
bool _use_metrics = false;
seastar::metrics::metric_groups _metrics;
@@ -229,8 +233,11 @@ public:
service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept;
service::migration_listener* get_migration_listener() const noexcept;
qos::qos_configuration_change_subscriber* get_qos_configuration_listener() const noexcept;
cql_sg_stats& get_cql_sg_stats() {
return scheduling_group_get_specific<cql_sg_stats>(_stats_key);
}
cql_sg_stats::request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) {
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
return get_cql_sg_stats().get_cql_opcode_stats(op);
}
future<utils::chunked_vector<client_data>> get_client_data();
@@ -358,7 +365,7 @@ private:
process_on_shard(shard_id shard, uint16_t stream, fragmented_temporary_buffer::istream is, service::client_state& cs,
tracing::trace_state_ptr trace_state, cql3::dialect dialect, cql3::computed_function_values&& cached_vals, Process process_fn);
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none);
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, cql_compression compression = cql_compression::none);
friend event_notifier;
};

View File

@@ -3778,6 +3778,10 @@ data_value::data_value(empty_type_representation e) : data_value(make_new(empty_
}
sstring data_value::to_parsable_string() const {
if (is_null()) {
return "null";
}
// For some reason trying to do it using fmt::format refuses to compile
// auto to_parsable_str_transform = std::views::transform([](const data_value& dv) -> sstring {
// return dv.to_parsable_string();

View File

@@ -145,8 +145,10 @@ shared_ptr<client> client::make(std::string endpoint, endpoint_config_ptr cfg, s
future<> client::update_credentials_and_rearm() {
_credentials = co_await _creds_provider_chain.get_aws_credentials();
_creds_invalidation_timer.rearm(_credentials.expires_at);
_creds_update_timer.rearm(_credentials.expires_at - 1h);
if (_credentials) {
_creds_invalidation_timer.rearm(_credentials.expires_at);
_creds_update_timer.rearm(_credentials.expires_at - 1h);
}
}
future<> client::authorize(http::request& req) {

View File

@@ -8,6 +8,7 @@
#include "client.hh"
#include "utils.hh"
#include "utils/composite_abort_source.hh"
#include "utils/exceptions.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/rjson.hh"
@@ -18,6 +19,7 @@
#include <seastar/coroutine/as_future.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/core/with_timeout.hh>
#include <seastar/core/abort_on_expiry.hh>
#include <chrono>
#include <fmt/format.h>
#include <netinet/tcp.h>
@@ -28,6 +30,39 @@ using namespace std::chrono_literals;
namespace vector_search {
namespace {
bool is_ip_address(const sstring& host) {
return net::inet_address::parse_numerical(host).has_value();
}
future<connected_socket> connect_with_as(socket_address addr, shared_ptr<tls::certificate_credentials> creds, sstring host, abort_source& as) {
as.check();
auto sock = make_socket();
auto sub = as.subscribe([&sock]() noexcept {
sock.shutdown();
});
auto f = co_await coroutine::as_future(sock.connect(addr));
if (as.abort_requested()) {
f.ignore_ready_future();
throw abort_requested_exception();
}
auto cs = co_await std::move(f);
if (creds) {
tls::tls_options opts;
if (!is_ip_address(host)) {
opts.server_name = host;
}
auto tls_cs = co_await tls::wrap_client(creds, std::move(cs), std::move(opts));
co_return tls_cs;
}
co_return cs;
}
bool is_request_aborted(std::exception_ptr& err) {
return try_catch<abort_requested_exception>(err) != nullptr;
}
class client_connection_factory : public http::experimental::connection_factory {
client::endpoint_type _endpoint;
shared_ptr<tls::certificate_credentials> _creds;
@@ -41,27 +76,35 @@ public:
}
future<connected_socket> make([[maybe_unused]] abort_source* as) override {
auto deadline = std::chrono::steady_clock::now() + timeout();
auto socket = co_await with_timeout(deadline, connect());
auto t = timeout();
auto socket = co_await connect(t, as);
socket.set_nodelay(true);
socket.set_keepalive_parameters(get_keepalive_parameters(timeout()));
socket.set_keepalive_parameters(get_keepalive_parameters(t));
socket.set_keepalive(true);
unsigned int timeout_ms = timeout().count();
unsigned int timeout_ms = t.count();
socket.set_sockopt(IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_ms, sizeof(timeout_ms));
co_return socket;
}
private:
future<connected_socket> connect() {
auto addr = socket_address(_endpoint.ip, _endpoint.port);
if (_creds) {
auto socket = co_await tls::connect(_creds, addr, tls::tls_options{.server_name = _endpoint.host});
// tls::connect() only performs the TCP handshake — the TLS handshake is deferred until the first I/O operation.
// Force the TLS handshake to happen here so that the connection timeout applies to it.
co_await tls::check_session_is_resumed(socket);
co_return socket;
future<connected_socket> connect(std::chrono::milliseconds timeout, abort_source* as) {
abort_on_expiry timeout_as(seastar::lowres_clock::now() + timeout);
utils::composite_abort_source composite_as;
composite_as.add(timeout_as.abort_source());
if (as) {
composite_as.add(*as);
}
co_return co_await seastar::connect(addr, {}, transport::TCP);
auto f = co_await coroutine::as_future(
connect_with_as(socket_address(_endpoint.ip, _endpoint.port), _creds, _endpoint.host, composite_as.abort_source()));
if (f.failed()) {
auto err = f.get_exception();
// When the connection abort was triggered by our own deadline rethrow as timed_out_error.
if (is_request_aborted(err) && timeout_as.abort_source().abort_requested()) {
co_await coroutine::return_exception(timed_out_error{});
}
co_await coroutine::return_exception_ptr(std::move(err));
}
co_return co_await std::move(f);
}
std::chrono::milliseconds timeout() const {
@@ -84,10 +127,6 @@ bool is_server_problem(std::exception_ptr& err) {
return is_server_unavailable(err) || try_catch<tls::verification_error>(err) != nullptr || try_catch<timed_out_error>(err) != nullptr;
}
bool is_request_aborted(std::exception_ptr& err) {
return try_catch<abort_requested_exception>(err) != nullptr;
}
future<client::request_error> map_err(std::exception_ptr& err) {
if (is_server_problem(err)) {
co_return service_unavailable_error{};