Compare commits

...

45 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
5515d5fb7d test/alternator: fix delete_item_no_ts test, add LWT rejection tests for delete ops, simplify assertions, update docs
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-05 17:31:41 +00:00
copilot-swe-agent[bot]
328c263aed alternator: add custom timestamp support to DeleteItem and BatchWriteItem DeleteRequest
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-05 16:16:27 +00:00
copilot-swe-agent[bot]
a6d36a480d test/alternator: add explicit only_rmw_uses_lwt isolation to test_table_ts and test_table_ts_ss
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 22:17:17 +00:00
copilot-swe-agent[bot]
85354ae26a test/alternator: restore test_table_ts_lwt with always isolation; update docs
- Restore test_table_ts_lwt fixture with system:write_isolation=always to
  explicitly test that the timestamp attribute is rejected in LWT_ALWAYS mode
- Add test_timestamp_attribute_lwt_always_rejected which verifies that even
  a plain PutItem with a timestamp is rejected when always_use_lwt is set
- Keep test_timestamp_attribute_with_condition_rejected using test_table_ts
  (with the test runner's default only_rmw_uses_lwt isolation) to test
  that a ConditionExpression triggers LWT rejection
- Update docs: fix item 4 (non-numeric now rejected), improve Limitations
  section to clearly state always_use_lwt is incompatible with the feature
  and recommend system:write_isolation=only_rmw_uses_lwt

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 22:12:07 +00:00
copilot-swe-agent[bot]
786fa68faa test/alternator: address review comments on test_timestamp_attribute
- Use scope="module" instead of testpy_test_fixture_scope in fixtures
- Rename test_table_ts_sc to test_table_ts_ss (ss = string+string keys)
- Remove test_table_ts_lwt; use test_table_ts for LWT-rejection test
  (the test server runs with only_rmw_uses_lwt, so conditions trigger LWT)
- Add comment that fixtures make tests implicitly Scylla-only
- Change non-numeric timestamp attribute behavior: reject with
  ValidationException instead of silently storing (test + C++ implementation)
- Add test_timestamp_attribute_microseconds: verifies the timestamp unit
  is microseconds and tests interaction with default server timestamps
- Add import time for the new microseconds test

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 21:58:34 +00:00
copilot-swe-agent[bot]
a57f781852 test: refactor test_timestamp_attribute to use shared fixtures
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 21:38:52 +00:00
copilot-swe-agent[bot]
7f79b90e91 Fix timestamp_attribute: non-numeric handling, tag visibility, and clean up
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 20:35:48 +00:00
copilot-swe-agent[bot]
175b8a8a5e Add system:timestamp_attribute feature for custom write timestamps in Alternator
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 20:27:26 +00:00
copilot-swe-agent[bot]
c2ef8075ee Initial plan 2026-02-25 20:04:37 +00:00
Botond Dénes
9dff9752b4 Merge 'Fix regression in Alternator TTL with tablets and node going down' from Nadav Har'El
Recently we suffered a regression on how Alternator TTL behaves when a node goes down when tablets are used.

Usually, expiration of data in a particular tablet are handled by this tablet's "primary replica". However, if that node is down, we want another node to perform these expiration until the primary replica goes back online. We created a function `tablet_map::get_secondary_replica()` to select that "other node". We don't care too much what the "secondary replica" means, but we do care that it's different from the primary replica - if it's the same the expiration of that tablet will never be done.

It turns out that recently, in commits 817fdad and d88036d, the implementation of get_primary_replica() changed without a corresponding change to get_secondary_replica(). After those changes, the two functions are mismatched, and sometimes return the same node for both primary and secondary replica.

Unfortunately, although we had a dtest for the handling of a dead node in Alternator TTL, it failed to reproduce this bug, so this regression was missed - nothing else besides Alternator TTL ever used the get_secondary_replica() function.

So this series, in addition to fixing the bug, we add two tests that reproduce this bug (fail before the fix, pass with the fix):

1. A unit test that checks that get_secondary_replica() always returns a different node from get_primary_replica()
2. A cluster test based on the original dtest, which does reproduce this bug in Alternator TTL where some of the data was never expired (but only failed in release build, for an unknown reason).

Fixes SCYLLADB-777.

Closes scylladb/scylladb#28771

* github.com:scylladb/scylladb:
  test: add unit test for tablet_map::get_secondary_replica()
  test, alternator: add test for TTL expiration with a node down
  locator: fix get_secondary_replica() to match get_primary_replica()
2026-02-25 10:13:55 +02:00
Andrei Chekun
1b92b140ee test.py: improve stdout output for boost test
The current way of checking the boost's stdout can have a race
condition when pytest will try to read the file before it was really
flushed. So this PR should eliminate this possibility.

Closes scylladb/scylladb#28783
2026-02-25 00:50:25 +01:00
Ferenc Szili
f70ca9a406 load_stats: implement the optimized sum of tablet sizes
PR #28703 was merged into master but not with the latest version of the
changes. This patch is an incremental fix for this.

Currently, the elements of the tablet_sizes_per_shard vector are
incremented in separate shards. This is prone to false sharing of cache
lines, and ping-pong of memory, which leads to reduced performance.

In this patch, in order to avoid cache line collisions while updating
the sum of tablet sizes per shard, we align the counter to 64 bytes.

Fixes: SCYLLADB-678

Closes scylladb/scylladb#28757
2026-02-24 22:19:25 +01:00
Alex
5557770b59 test_mv_build_during_shutdown started two async CREATE MATERIALIZED VIEW operations and never awaited them (asyncio.gather(...) without await).
This pr adds await for each one of the tasks to wait for the MV schema to be added successfully
and then to start the server shutdown
With this change we dont need will not get the shutdown races.

Closes scylladb/scylladb#28774
2026-02-24 17:25:05 +01:00
Anna Stuchlik
64b1798513 doc: remove reduntant Java-related information
This commit removes:
- Instructions to install scylla-jmx (and all references)
- The Java 11 requirement for Ubuntu.

Fixes https://github.com/scylladb/scylladb/issues/28249
Fixes https://github.com/scylladb/scylladb/issues/28252

Closes scylladb/scylladb#28254
2026-02-24 14:37:39 +01:00
Anna Stuchlik
e2333a57ad doc: remove the tablets limitation for Alternator
This commit removes the information that Alternator doesn't support tablets.
The limitation is no longer valid.

Fixes SCYLLADB-778

Closes scylladb/scylladb#28781
2026-02-24 14:24:30 +02:00
Andrzej Jackowski
cd4caed3d3 test: fix configuration of test_autoretrain_dict
`test_autoretrain_dict` sporadically fails because the default
compression algorithm was changed after the test was written.

`9ffa62a986815709d0a09c705d2d0caf64776249` was an attempt to fix it by
changing the compression configuration during node startup. However,
the configuration change had an incorrect YAML format and was
ignored by ScyllaDB. This commit fixes it.

Fixes: scylladb/scylladb#28204

Closes scylladb/scylladb#28746
2026-02-24 12:08:44 +01:00
Botond Dénes
067bb5f888 test/scylla_gdb: skip coroutine tests if coroutine frame is not found
For a while, we have seen coroutine related tests (those that use the
coroutine_task fixture) fail occasionally, because no coroutine frame is
found. Multiple attempts were made to make this problem self-diagnosing
and dump enough information to be able to debug this post-mortem. To no
avail so far. A lot of time was invested into this this benign issue:
See the long discussion at https://github.com/scylladb/scylladb/issues/22501.

It is not known if the bug is in gdb, or the gdb script trying to find
the coroutine frame. In any case, both are only used for debugging, so
we can tolerate occasional failures -- we are forced to do so when
working with gdb anyway.
Instead of piling on more effor there, just skip these tests when the
problem occurs. This solves the CI flakyness.

Fixes: #22501

Closes scylladb/scylladb#28745
2026-02-24 10:12:03 +01:00
Marcin Maliszkiewicz
d5684b98c8 test: cluster: add continue-after-error to perf tool tests
Add --continue-after-error true to perf-cql-raw and perf-alternator
tests, and --stop-on-error false to perf-simple-query test, so that
tests don't abort on the first error.

Reason for this is that tests are flaky with example failure:
Perf test failed: std::runtime_error (server returned ERROR to EXECUTE)

When CPU is starved on CI we can return timeouts and/or other errors.

The change should make tests more robust on the expense of smaller test
scope. But those tests were written mostly to test startup sequence
as it differs from Scylla's starup.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-759

Closes scylladb/scylladb#28767
2026-02-24 11:08:34 +02:00
Avi Kivity
0add130b9d lua: avoid undefined behavior when converting doubles to integers
Lua doesn't have separate integer and floating point numbers,
so we check if a number can fit in an integer and if so convert
it to an integer.

The conversion routine invokes undefined behavior (and even
acknowledges it!). More recent compilers changed their behavior
when casting infinities, breaking test_user_function_double_return
which tests this conversion.

Fix by tightening the conversion to not invoke undefined behavior.

Closes scylladb/scylladb#28503
2026-02-24 10:41:21 +02:00
Botond Dénes
1d5b8cc562 Merge 'Fix use after free in auth cache' from Marcin Maliszkiewicz
This patchset:
- ensures the loading semaphore is acquired in cross-shard callbacks
- fixes iterator invalidation problem when reloading all cached permissions

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-780
Backport: no, affected code not released yet

Closes scylladb/scylladb#28766

* github.com:scylladb/scylladb:
  auth: cache: fix permissions iterator invalidation in reload_all_permissions
  auth/cache: acquire _loading_sem in cross-shard callbacks
2026-02-24 10:35:46 +02:00
Pavel Emelyanov
5a5eb67144 vector_search/dns: Use newer seastar get_host_by_name API
The hostent::addr_list is deprecated in favor of address_entry::addr
field that contains the very same addresses.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#28565
2026-02-23 21:28:43 +02:00
Pavel Emelyanov
6b02b50e3d Merge 'object_storage: add retryable machinery to object storage' from Ernest Zaslavsky
- add an overload to the rest http client to accept retry strategy instance as an argument
- remove hand rolled error handling from object storage client and replace with common machinery that supports handling and retrying when appropriate

No backport neede since it is only refactoring

Closes scylladb/scylladb#28161

* github.com:scylladb/scylladb:
  object_storage: add retryable machinery to object storage
  rest_client: add `simple_send` overload
2026-02-23 21:28:51 +03:00
Nadav Har'El
e463d528fe test: add unit test for tablet_map::get_secondary_replica()
This patch adds a unit test for tablet_map::get_secondary_replica().
It was never officially defined how the "primary" and "secondary"
replicas were chosen, and their implementation changed over time,
but the one invariant that this test verifies is that the secondary
replica and the primary replica must be a different node.

This test reproduces issue SCYLLADB-777, where we discovered that
the get_primary_replica() changed without a corresponding change to
get_primary_replica(). So before the previous patch, this test failed,
and after the previous patch - it passes.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2026-02-23 16:19:43 +02:00
Nadav Har'El
0c7f499750 test, alternator: add test for TTL expiration with a node down
We have many single-node functional tests for Alternator TTL in
test/alternator/test_ttl.py. This patch adds a multi-node test in
test/cluster/test_alternator.py. The new test verifies that:

 1. Even though Alternator TTL splits the work of scanning and expiring
    items between nodes, all the items get correctly expired.
 2. When one node is down, all the items still expire because the
    "secondary" owner of each token range takes over expiring the
   items in this range while the "primary" owner is down.

This new test is actually a port of a test we already had in dtest
(alternator_ttl_tests.py::test_multinode_expiration). This port is
faster and smaller then the original (fewer nodes, fewer rows), but it
still found a regression (SCYLLADB-777) that dtest missed - the new test
failed when running with tablets and in release build mode.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2026-02-23 16:19:43 +02:00
Nadav Har'El
9ab3d5b946 locator: fix get_secondary_replica() to match get_primary_replica()
The function tablet_map::get_secondary_replica() is used by Alternator
TTL to choose a node different from get_primary_replica(). Unfortunately,
recently (commits 817fdad and d88037d) the implementation of the latter
function changed, without changing the former. So this patch changes
the former to match.

The next two patches will have two tests that fail before this patch,
and pass with it:

1. A unit test that checks that get_secondary_replica() returns a
   different node than get_primary_replica().

2. An Alternator TTL test that checks that when a node is down,
   expirations still happen because the secondary replica takes over
   the primary replica's work.

Fixes SCYLLADB-777

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2026-02-23 16:19:30 +02:00
Botond Dénes
dcd8de86ee Merge 'docs: update a documentation of adding/removing DC and rebuilding a node' from Aleksandra Martyniuk
Describe a procedure to convert tablet keyspace replication factor
to rack list. Update the procedures of adding and removing a node
to consider tablet keyspaces.

Fixes: [SCYLLADB-398](https://scylladb.atlassian.net/browse/SCYLLADB-398)
Fixes: https://github.com/scylladb/scylladb/issues/28306.
Fixes: https://github.com/scylladb/scylladb/issues/28307.
Fixes: https://github.com/scylladb/scylladb/issues/28270.

Needs backport to all live branches as they all include tablets.

[SCYLLADB-398]: https://scylladb.atlassian.net/browse/SCYLLADB-398?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

Closes scylladb/scylladb#28521

* github.com:scylladb/scylladb:
  docs: update nodetool rebuild docs
  docs: update a procedure of decommissioning a DC
  docs: update a procedure of adding a DC
  docs: describe upgrade to enforce_rack_list option
  docs: describe conversion to rack-list RF
2026-02-23 16:15:16 +02:00
Andrei Chekun
6ae58c6fa6 test.py: move storage tests to cluster subdirectory
Move the storage test suite from test/storage/ to test/cluster/storage/
to consolidate related cluster-based tests.This removes the standalone
test/storage/suite.yaml as the tests will use the cluster's test configuration.
Initially these tests were in cluster, but to use unshare at first
iteration they were moved outside. Now they are using another way to
handle volumes without unshare, they should be in cluster

Closes scylladb/scylladb#28634
2026-02-23 16:14:15 +02:00
Marcin Maliszkiewicz
c5dc086baf Merge 'vector_search: return NaN for similarity_cosine with all-zero vectors' from Dawid Pawlik
The ANN vector queries with all-zero vectors are allowed even on vector indexes with similarity function set to cosine.
When enabling the rescoring option, those queries would fail as the rescoring calls `similarity_cosine` function underneath, causing an `InvalidRequest` exception as all-zero vectors were not allowed matching Cassandra's behaviour.

To eliminate the discrepancy we want the all-zero vector `similarity_cosine` calls to pass, but return the NaN as the cosine similarity for zero vectors is mathematically incorrect. We decided not to use arbitrary values contrary to USearch, for which the distance (not to be confused with similarity) is defined as cos(0, 0) = 0, cos(0, x) = 1 while supporting the range of values [0, 2].
If we wanted to convert that to similarity, that would mean sim_cos(0, x) = 0.5, which does not support mathematical reasoning why that would be more similar than for example vectors marking obtuse angles.
It's safe to assume that all-zero vectors for cosine similarity shouldn't make any impact, therefore we return NaN and eliminate them from best results.

Adjusted the tests accordingly to check both proper Cassandra and Scylla's behaviour.

Fixes: SCYLLADB-456

Backport to 2026.1 needed, as it fixes the bug for ANN vector queries using rescoring introduced there.

Closes scylladb/scylladb#28609

* github.com:scylladb/scylladb:
  test/vector_search: add reproducer for rescoring with zero vectors
  vector_search: return NaN for similarity_cosine with all-zero vectors
2026-02-23 13:10:44 +01:00
Aleksandra Martyniuk
9ccc95808f docs: update nodetool rebuild docs
Update nodetool rebuild docs to mention that the command does not
work for tablet keyspaces.

Fixes: https://github.com/scylladb/scylladb/issues/28270.
2026-02-23 12:45:01 +01:00
Aleksandra Martyniuk
e4c42acd8f docs: update a procedure of decommissioning a DC
Update a procedure of decommissioning a DC for tablet keyspaces.

Fixes: https://github.com/scylladb/scylladb/issues/28307.
2026-02-23 12:45:01 +01:00
Aleksandra Martyniuk
1c764cf6ea docs: update a procedure of adding a DC
Update a procedure of adding a DC for tablet keyspaces.

Fixes: https://github.com/scylladb/scylladb/issues/28306.
2026-02-23 12:45:01 +01:00
Aleksandra Martyniuk
e08ac60161 docs: describe upgrade to enforce_rack_list option 2026-02-23 12:44:57 +01:00
Aleksandra Martyniuk
eefe66b2b2 docs: describe conversion to rack-list RF
Fixes: SCYLLADB-398
2026-02-23 12:41:33 +01:00
Marcin Maliszkiewicz
54dca90e8c Merge 'test: move dtest/guardrails_test.py to test_guardrails.py' from Andrzej Jackowski
This patch series moves `test/cluster/dtest/guardrails_test.py`
to `test/cluster/test_guardrails.py`, and migrates it from `cluster/dtest/`
to `cluster/` framework.

There are two motivations for moving the test:
 - Execution time reduction (from 12s to 9s in 'dev' in my env)
 - Facilitate adding new tests to the `guardrails_test.py` file

No backport, `dtest/guardrails_test.py` is only on master

Closes scylladb/scylladb#28737

* github.com:scylladb/scylladb:
  test: move dtest/guardrails_test.py to test_guardrails.py
  test: prepare guardrails_test.py to be moved to test/cluster/
2026-02-23 12:34:43 +01:00
Marcin Maliszkiewicz
1293b94039 auth: cache: fix permissions iterator invalidation in reload_all_permissions
The inner loops in reload_all_permissions iterate role's permissions
and _anonymous_permissions maps across yield points. Concurrent
load_permissions calls (which don't hold _loading_sem) can emplace
into those same maps during a yield, potentially triggering a rehash
that invalidates the active iterator.

We want to avoid adding semaphore acquire in load_permissions
because it's on a common path (get_permissions).

Fixing by snapshotting the keys into a vector before iterating with
yields, so no long-lived map iterator is held across suspension
points.
2026-02-23 12:14:22 +01:00
Piotr Dulikowski
a4c389413c Merge 'Hardens MV shutdown behavior by fixing lifecycle tracking for detached view-builder callbacks' from Alex Dathskovsky
This series hardens MV shutdown behavior by fixing lifecycle tracking for detached view-builder callbacks and aligning update handling with the same async dispatch style used by create/drop.

Patch 1 refactors on_update_view to use a dedicated coroutine dispatcher (dispatch_update_view), keeping update logic serialized under the existing view-builder lock and consistent with the callback architecture already used for create/drop paths.

Patch 2 adds explicit callback lifetime coordination in view_builder:

  - introduce a seastar::gate member
  - acquire _ops_gate.hold() when launching detached create/update/drop dispatch futures
  - keep the hold alive until each detached future resolves
  - close the gate during view_builder::drain() so shutdown waits for in-flight callback work before final teardown

Together, these changes reduce shutdown race exposure in MV event handling while preserving existing behavior for normal operation.

Testing:
  - pytest --test-py-init test/cluster/mv (47 passed, 7 skipped)

backport: not required started happening in master

fixes: SCYLLADB-687

Closes scylladb/scylladb#28648

* github.com:scylladb/scylladb:
  db/view: gate detached view-builder callbacks during shutdown
  db:view: refactor on_update_view to use coroutine dispatcher
2026-02-23 11:28:37 +01:00
Marcin Maliszkiewicz
75d4bc26d3 auth/cache: acquire _loading_sem in cross-shard callbacks
distribute_role() modifies _roles on non-zero shards via
invoke_on_others() without holding _loading_sem. Similarly, load_all()'s
invoke_on_others() callback calls prune_all() without the semaphore.
When these run concurrently with reload_all_permissions(), which
iterates _roles across yield points, an insertion can trigger
absl::flat_hash_map::resize(), freeing the backing storage while
an iterator still references it.

Fix by acquiring _loading_sem on the target shard in both
distribute_role()'s and load_all()'s invoke_on_others callbacks,
serializing all _roles mutations with coroutines that iterate
the map.
2026-02-23 10:30:03 +01:00
Ernest Zaslavsky
321d4caf0c object_storage: add retryable machinery to object storage
remove hand rolled error handling from object storage client
and replace with common machinery that supports exception
handling and retrying when appropriate
2026-02-22 14:00:44 +02:00
Ernest Zaslavsky
24972da26d rest_client: add simple_send overload
add an overload to rest client `simple_send` to accept a retry_strategy for http's make_request
2026-02-22 14:00:44 +02:00
Andrzej Jackowski
eb5a564df2 test: move dtest/guardrails_test.py to test_guardrails.py
This commit moves `guardrails_test.py`, prepared in the previous
commit of this patch series, to `test/cluster/test_guardrails.py`.
It also cleans up `suite.yaml`.
2026-02-20 11:39:52 +01:00
Andrzej Jackowski
9df426d2ae test: prepare guardrails_test.py to be moved to test/cluster/
Disable `test/cluster/dtest/guardrails_test.py` in `suite.yaml` and
make it compatible with the `test/cluster/` framework. This will
allow moving this file from `test/cluster/dtest/` to `test/cluster/`
in the next commit of this patch series.

There are two motivations for moving the test:
 - Execution time reduction (from 12s to 9s in 'dev' in my env)
 - Facilitate adding new tests to the `guardrails_test.py` file
2026-02-20 11:39:43 +01:00
Alex
c44ad31d44 db/view: gate detached view-builder callbacks during shutdown
Detached migration callbacks (on_create_view, on_update_view, on_drop_view)
  can race with view_builder::drain() teardown.

  Add a lifetime gate to view_builder and wire callback launches through
  _ops_gate.hold() so each detached dispatch future is tracked until it
  completes (finally keeps the hold alive). During shutdown, drain()
  now waits for all tracked callback work with _ops_gate.close().

  This ensures drain does not proceed past callback lifetime while shutdown is in
  progress, and ignores only gate_closed_exception at callback entry as the
  expected shutdown path.
2026-02-18 11:56:41 +02:00
Alex
75e25493c1 db:view: refactor on_update_view to use coroutine dispatcher
on_update_view() currently runs its serialized logic inline via with_semaphore()
  from a detached callback path, while create/drop already use dedicated async
  dispatchers.

  Refactor update handling to follow the same pattern:

  - add dispatch_update_view(sstring ks_name, sstring view_name)
  - move update logic into that coroutine
  - acquire the existing view-builder lock via get_or_adopt_view_builder_lock()
  - keep existing behavior for missing base/view state
  - keep background invocation semantics from on_update_view()

  This aligns update/create/drop flow and keeps async lifecycle handling and a first step to fix shutdown issue.
2026-02-15 18:50:32 +02:00
Dawid Pawlik
4e32502bb3 test/vector_search: add reproducer for rescoring with zero vectors
Add reproducer for the SCYLLADB-456 issue following exception
on ANN vector queries with rescoring with similarity cosine.
2026-02-11 13:41:09 +01:00
Dawid Pawlik
af0889d194 vector_search: return NaN for similarity_cosine with all-zero vectors
The ANN vector queries with all-zero vectors are allowed even on vector
indexes with similarity function set to cosine.
When enabling the rescoring option, those queries would fail as the rescoring
calls `similarity_cosine` function underneath, causing an `InvalidRequest` exception
as all-zero vectors were not allowed matching Cassandra's behaviour.

To eliminate the discrepancy we want the all-zero vector `similarity_cosine` calls to pass,
but return the NaN as the cosine similarity for zero vectors is mathematically incorrect.
We decided not to use arbitrary values contrary to USearch, for which the distance
(not to be confused with similarity) is defined as cos(0, 0) = 0, cos(0, x) = 1 while
supporting the range of values [0, 2].
If we wanted to convert that to similarity, that would mean sim_cos(0, x) = 0.5,
which does not support mathematical reasoning why that would be more similar than
for example vectors marking obtuse angles.
It's safe to assume that all-zero vectors for cosine similarity shouldn't make any impact,
therefore we return NaN and eliminate them from best results.

Adjusted the tests accordingly to check both proper Cassandra and Scylla's behaviour.

Fixes: SCYLLADB-456
2026-02-11 12:31:47 +01:00
51 changed files with 1453 additions and 490 deletions

View File

@@ -7,6 +7,7 @@
*/
#include <fmt/ranges.h>
#include <cstdlib>
#include <seastar/core/on_internal_error.hh>
#include "alternator/executor.hh"
#include "alternator/consumed_capacity.hh"
@@ -108,6 +109,16 @@ const sstring TABLE_CREATION_TIME_TAG_KEY("system:table_creation_time");
// configured by UpdateTimeToLive to be the expiration-time attribute for
// this table.
extern const sstring TTL_TAG_KEY("system:ttl_attribute");
// If this tag is present, it stores the name of an attribute whose numeric
// value (in microseconds since the Unix epoch) is used as the write timestamp
// for PutItem and UpdateItem operations. When the named attribute is present
// in a PutItem or UpdateItem request, its value is used as the timestamp of
// the write, and the attribute itself is NOT stored in the item. This allows
// users to control write ordering for last-write-wins semantics. Because LWT
// does not allow setting a custom write timestamp, operations using this
// feature are incompatible with conditions (which require LWT), and with
// the LWT_ALWAYS write isolation mode; such operations are rejected.
static const sstring TIMESTAMP_TAG_KEY("system:timestamp_attribute");
// This will be set to 1 in a case, where user DID NOT specify a range key.
// The way GSI / LSI is implemented by Alternator assumes user specified keys will come first
// in materialized view's key list. Then, if needed missing keys are added (current implementation
@@ -1337,13 +1348,14 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
// Alternator uses tags whose keys start with the "system:" prefix for
// internal purposes. Those should not be readable by ListTagsOfResource,
// nor writable with TagResource or UntagResource (see #24098).
// Only a few specific system tags, currently only "system:write_isolation"
// and "system:initial_tablets", are deliberately intended to be set and read
// by the user, so are not considered "internal".
// Only a few specific system tags, currently only "system:write_isolation",
// "system:initial_tablets", and "system:timestamp_attribute", are deliberately
// intended to be set and read by the user, so are not considered "internal".
static bool tag_key_is_internal(std::string_view tag_key) {
return tag_key.starts_with("system:")
&& tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY
&& tag_key != INITIAL_TABLETS_TAG_KEY;
&& tag_key != INITIAL_TABLETS_TAG_KEY
&& tag_key != TIMESTAMP_TAG_KEY;
}
enum class update_tags_action { add_tags, delete_tags };
@@ -2298,8 +2310,11 @@ public:
// After calling pk_from_json() and ck_from_json() to extract the pk and ck
// components of a key, and if that succeeded, call check_key() to further
// check that the key doesn't have any spurious components.
static void check_key(const rjson::value& key, const schema_ptr& schema) {
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
// allow_extra_attribute: set to true when the key may contain one extra
// non-key attribute (e.g., the timestamp pseudo-attribute for DeleteItem).
static void check_key(const rjson::value& key, const schema_ptr& schema, bool allow_extra_attribute = false) {
const unsigned expected = (schema->clustering_key_size() == 0 ? 1 : 2) + (allow_extra_attribute ? 1 : 0);
if (key.MemberCount() != expected) {
throw api_error::validation("Given key attribute not in schema");
}
}
@@ -2346,6 +2361,57 @@ void validate_value(const rjson::value& v, const char* caller) {
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
// Convert a DynamoDB number (big_decimal) to an api::timestamp_type
// (microseconds since the Unix epoch). Fractional microseconds are truncated.
// Returns nullopt if the value is negative or zero.
static std::optional<api::timestamp_type> bigdecimal_to_timestamp(const big_decimal& bd) {
if (bd.unscaled_value() <= 0) {
return std::nullopt;
}
if (bd.scale() == 0) {
// Fast path: integer value, no decimal adjustment needed
return static_cast<api::timestamp_type>(bd.unscaled_value());
}
// General case: adjust for decimal scale.
// big_decimal stores value as unscaled_value * 10^(-scale).
// scale > 0 means divide by 10^scale (truncate fractional part).
// scale < 0 means multiply by 10^|scale| (add trailing zeros).
auto str = bd.unscaled_value().str();
if (bd.scale() > 0) {
int len = str.length();
if (len <= bd.scale()) {
return std::nullopt; // Number < 1
}
str = str.substr(0, len - bd.scale());
} else {
if (bd.scale() < -18) {
// Too large to represent as int64_t
return std::nullopt;
}
for (int i = 0; i < -bd.scale(); i++) {
str.push_back('0');
}
}
long long result = strtoll(str.c_str(), nullptr, 10);
if (result <= 0) {
return std::nullopt;
}
return static_cast<api::timestamp_type>(result);
}
// Try to extract a write timestamp from a DynamoDB-typed value.
// The value should be a number ({"N": "..."}), representing microseconds
// since the Unix epoch. Returns nullopt if the value is not a valid number
// or doesn't represent a valid timestamp.
static std::optional<api::timestamp_type> try_get_timestamp(const rjson::value& attr_value) {
std::optional<big_decimal> n = try_unwrap_number(attr_value);
if (!n) {
return std::nullopt;
}
return bigdecimal_to_timestamp(*n);
}
class put_or_delete_item {
private:
partition_key _pk;
@@ -2361,11 +2427,17 @@ private:
// that length can have different meaning depends on the operation but the
// the calculation of length in bytes to WCU is the same.
uint64_t _length_in_bytes = 0;
// If the table has a system:timestamp_attribute tag, and the named
// attribute was found in the item with a valid numeric value, this holds
// the extracted timestamp. The attribute is not added to _cells.
std::optional<api::timestamp_type> _custom_timestamp;
public:
struct delete_item {};
struct put_item {};
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes);
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
// put_or_delete_item doesn't keep a reference to schema (so it can be
// moved between shards for LWT) so it needs to be given again to build():
mutation build(schema_ptr schema, api::timestamp_type ts) const;
@@ -2380,11 +2452,32 @@ public:
bool is_put_item() noexcept {
return _cells.has_value();
}
// Returns the custom write timestamp extracted from the timestamp attribute,
// if any. If not set, the caller should use api::new_timestamp() instead.
std::optional<api::timestamp_type> custom_timestamp() const noexcept {
return _custom_timestamp;
}
};
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item)
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item, const std::optional<bytes>& timestamp_attribute)
: _pk(pk_from_json(key, schema)), _ck(ck_from_json(key, schema)) {
check_key(key, schema);
if (timestamp_attribute) {
// The timestamp attribute may be provided as a "pseudo-key": it is
// not a real key column, but can be included in the "Key" object to
// carry the custom write timestamp. If found, extract the timestamp
// and don't store it in the item.
const rjson::value* ts_val = rjson::find(key, to_string_view(*timestamp_attribute));
if (ts_val) {
if (auto t = try_get_timestamp(*ts_val)) {
_custom_timestamp = t;
} else {
throw api_error::validation(fmt::format(
"The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)",
to_string_view(*timestamp_attribute)));
}
}
}
check_key(key, schema, _custom_timestamp.has_value());
}
// find_attribute() checks whether the named attribute is stored in the
@@ -2471,7 +2564,8 @@ static inline void validate_value_if_index_key(
}
}
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes)
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute)
: _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) {
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
@@ -2480,6 +2574,17 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
// If this is the timestamp attribute, it must be a valid numeric value
// (microseconds since epoch). Use it as the write timestamp and do not
// store it in the item data. Reject the write if the value is non-numeric.
if (timestamp_attribute && column_name == *timestamp_attribute) {
if (auto t = try_get_timestamp(it->value)) {
_custom_timestamp = t;
// The attribute is consumed as timestamp, not stored in _cells.
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*timestamp_attribute)));
}
_length_in_bytes += column_name.size();
if (!cdef) {
// This attribute may be a key column of one of the GSI or LSI,
@@ -2671,6 +2776,13 @@ rmw_operation::rmw_operation(service::storage_proxy& proxy, rjson::value&& reque
// _pk and _ck will be assigned later, by the subclass's constructor
// (each operation puts the key in a slightly different location in
// the request).
const auto tags_ptr = db::get_tags_of_table(_schema);
if (tags_ptr) {
auto it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (it != tags_ptr->end() && !it->second.empty()) {
_timestamp_attribute = to_bytes(it->second);
}
}
}
std::optional<mutation> rmw_operation::apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) {
@@ -2815,6 +2927,21 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
.alternator = true,
.alternator_streams_increased_compatibility = schema()->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
// If the operation uses a custom write timestamp (from the
// system:timestamp_attribute tag), LWT is incompatible because LWT
// requires the timestamp to be set by the Paxos protocol. Reject the
// operation if it would need to use LWT.
if (has_custom_timestamp()) {
bool would_use_lwt = _write_isolation == write_isolation::LWT_ALWAYS ||
(needs_read_before_write &&
_write_isolation != write_isolation::FORBID_RMW &&
_write_isolation != write_isolation::UNSAFE_RMW);
if (would_use_lwt) {
throw api_error::validation(
"Using the system:timestamp_attribute is not compatible with "
"conditional writes or the 'always' write isolation policy.");
}
}
if (needs_read_before_write) {
if (_write_isolation == write_isolation::FORBID_RMW) {
throw api_error::validation("Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
@@ -2913,7 +3040,8 @@ public:
put_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{},
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name()))) {
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name())),
_timestamp_attribute) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -2945,6 +3073,9 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -2962,7 +3093,10 @@ public:
} else {
_return_attributes = {};
}
return _mutation_builder.build(_schema, ts);
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
}
virtual ~put_item_operation() = default;
};
@@ -3014,7 +3148,7 @@ public:
parsed::condition_expression _condition_expression;
delete_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}) {
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}, _timestamp_attribute) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -3045,6 +3179,9 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept override {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -3065,7 +3202,10 @@ public:
if (_consumed_capacity._total_bytes == 0) {
_consumed_capacity._total_bytes = 1;
}
return _mutation_builder.build(_schema, ts);
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
}
virtual ~delete_item_operation() = default;
};
@@ -3252,10 +3392,13 @@ future<> executor::do_batch_write(
// Do a normal write, without LWT:
utils::chunked_vector<mutation> mutations;
mutations.reserve(mutation_builders.size());
api::timestamp_type now = api::new_timestamp();
api::timestamp_type default_ts = api::new_timestamp();
bool any_cdc_enabled = false;
for (auto& b : mutation_builders) {
mutations.push_back(b.second.build(b.first, now));
// Use custom timestamp from the timestamp attribute if available,
// otherwise use the default timestamp for all items in this batch.
api::timestamp_type ts = b.second.custom_timestamp().value_or(default_ts);
mutations.push_back(b.second.build(b.first, ts));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
@@ -3355,6 +3498,16 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
1, primary_key_hash{schema}, primary_key_equal{schema});
// Look up the timestamp attribute tag once per table (shared by all
// PutRequests and DeleteRequests for this table).
std::optional<bytes> ts_attr;
const auto tags_ptr = db::get_tags_of_table(schema);
if (tags_ptr) {
auto tag_it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (tag_it != tags_ptr->end() && !tag_it->second.empty()) {
ts_attr = to_bytes(tag_it->second);
}
}
for (auto& request : it->value.GetArray()) {
auto& r = get_single_member(request, "RequestItems element");
const auto r_name = rjson::to_string_view(r.name);
@@ -3363,7 +3516,8 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
validate_is_object(item, "Item in PutRequest");
auto&& put_item = put_or_delete_item(
item, schema, put_or_delete_item::put_item{},
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())));
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())),
ts_attr);
mutation_builders.emplace_back(schema, std::move(put_item));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -3374,7 +3528,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
const rjson::value& key = get_member(r.value, "Key", "DeleteRequest");
validate_is_object(key, "Key in DeleteRequest");
mutation_builders.emplace_back(schema, put_or_delete_item(
key, schema, put_or_delete_item::delete_item{}));
key, schema, put_or_delete_item::delete_item{}, ts_attr));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(),
mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -3983,6 +4137,10 @@ public:
virtual ~update_item_operation() = default;
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override;
bool needs_read_before_write() const;
// Returns true if the timestamp attribute is being set in this update
// (via AttributeUpdates PUT or UpdateExpression SET). Used to detect
// whether a custom write timestamp will be used.
bool has_custom_timestamp() const noexcept;
private:
void delete_attribute(bytes&& column_name, const std::unique_ptr<rjson::value>& previous_item, const api::timestamp_type ts, deletable_row& row,
@@ -4117,6 +4275,44 @@ update_item_operation::needs_read_before_write() const {
(_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::UPDATED_NEW);
}
bool
update_item_operation::has_custom_timestamp() const noexcept {
if (!_timestamp_attribute) {
return false;
}
// Check if the timestamp attribute is being set via AttributeUpdates PUT
// with a valid numeric value.
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
// Only consider it a custom timestamp if the value is numeric
if (try_get_timestamp((it->value)["Value"])) {
return true;
}
}
break;
}
}
}
// Check if the timestamp attribute is being set via UpdateExpression SET.
// We can't check the actual value type without resolving the expression
// (which requires previous_item), so we conservatively return true if the
// attribute appears in a SET action, and handle the non-numeric case in apply().
// A non-numeric value will cause apply() to throw a ValidationException.
if (!_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
return std::holds_alternative<parsed::update_expression::action::set>(action._action);
}
}
return false;
}
// action_result() returns the result of applying an UpdateItem action -
// this result is either a JSON object or an unset optional which indicates
// the action was a deletion. The caller (update_item_operation::apply()
@@ -4392,6 +4588,17 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
}
std::string action = rjson::to_string((it->value)["Action"]);
// If this is the timestamp attribute being PUT, it must be a valid
// numeric value (microseconds since epoch). Use it as the write
// timestamp and skip storing it. Reject if the value is non-numeric.
if (_timestamp_attribute && column_name == *_timestamp_attribute && action == "PUT") {
if (it->value.HasMember("Value")) {
if (try_get_timestamp((it->value)["Value"])) {
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -4495,6 +4702,20 @@ inline void update_item_operation::apply_update_expression(const std::unique_ptr
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(fmt::format("UpdateItem cannot update key column {}", column_name));
}
// If this is the timestamp attribute being set via UpdateExpression SET,
// it must be a valid numeric value (microseconds since epoch). Use it as
// the write timestamp and skip storing it. Reject if non-numeric.
if (_timestamp_attribute && to_bytes(column_name) == *_timestamp_attribute &&
actions.second.has_value() &&
std::holds_alternative<parsed::update_expression::action::set>(actions.second.get_value()._action)) {
std::optional<rjson::value> result = action_result(actions.second.get_value(), previous_item.get());
if (result) {
if (try_get_timestamp(*result)) {
continue; // Skip - already used as timestamp
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (actions.second.has_value()) {
// An action on a top-level attribute column_name. The single
// action is actions.second.get_value(). We can simply invoke
@@ -4543,6 +4764,44 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
return {};
}
// If the table has a timestamp attribute, look for it in the update
// (AttributeUpdates PUT or UpdateExpression SET). If found with a valid
// numeric value, use it as the write timestamp instead of the provided ts.
api::timestamp_type effective_ts = ts;
if (_timestamp_attribute) {
bool found_ts = false;
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
if (auto t = try_get_timestamp((it->value)["Value"])) {
effective_ts = *t;
found_ts = true;
}
}
break;
}
}
}
if (!found_ts && !_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
if (std::holds_alternative<parsed::update_expression::action::set>(action._action)) {
std::optional<rjson::value> result = action_result(action, previous_item.get());
if (result) {
if (auto t = try_get_timestamp(*result)) {
effective_ts = *t;
}
}
}
}
}
}
// In the ReturnValues=ALL_NEW case, we make a copy of previous_item into
// _return_attributes and parts of it will be overwritten by the new
// updates (in do_update() and do_delete()). We need to make a copy and
@@ -4571,10 +4830,10 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
auto& row = m.partition().clustered_row(*_schema, _ck);
auto modified_attrs = attribute_collector();
if (!_update_expression.empty()) {
apply_update_expression(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
apply_update_expression(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
}
if (_attribute_updates) {
apply_attribute_updates(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
apply_attribute_updates(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
}
if (!modified_attrs.empty()) {
auto serialized_map = modified_attrs.to_mut().serialize(*attrs_type());
@@ -4585,7 +4844,7 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
// marker. An update with only DELETE operations must not add a row marker
// (this was issue #5862) but any other update, even an empty one, should.
if (any_updates || !any_deletes) {
row.apply(row_marker(ts));
row.apply(row_marker(effective_ts));
} else if (_returnvalues == returnvalues::ALL_NEW && !previous_item) {
// There was no pre-existing item, and we're not creating one, so
// don't report the new item in the returned Attributes.

View File

@@ -18,6 +18,7 @@
#include "executor.hh"
#include "tracing/trace_state.hh"
#include "keys/keys.hh"
#include "bytes.hh"
namespace alternator {
@@ -72,6 +73,11 @@ protected:
clustering_key _ck = clustering_key::make_empty();
write_isolation _write_isolation;
mutable wcu_consumed_capacity_counter _consumed_capacity;
// If the table has a "system:timestamp_attribute" tag, this holds the
// name of the attribute (converted to bytes) whose numeric value should
// be used as the write timestamp instead of the current time. The
// attribute itself is NOT stored in the item data.
std::optional<bytes> _timestamp_attribute;
// All RMW operations can have a ReturnValues parameter from the following
// choices. But note that only UpdateItem actually supports all of them:
enum class returnvalues {
@@ -113,6 +119,9 @@ public:
// Convert the above apply() into the signature needed by cas_request:
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override;
virtual ~rmw_operation() = default;
// Returns true if the operation will use a custom write timestamp (from the
// system:timestamp_attribute tag). Subclasses override this as needed.
virtual bool has_custom_timestamp() const noexcept { return false; }
const wcu_consumed_capacity_counter& consumed_capacity() const noexcept { return _consumed_capacity; }
schema_ptr schema() const { return _schema; }
const rjson::value& request() const { return _request; }

View File

@@ -767,7 +767,7 @@ static future<bool> scan_table(
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet, erm->get_topology()); // throws if no secondary replica
if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) {
if (!gossiper.is_alive(tablet_primary_replica.host)) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);

View File

@@ -110,15 +110,23 @@ future<> cache::prune(const resource& r) {
future<> cache::reload_all_permissions() noexcept {
SCYLLA_ASSERT(_permission_loader);
auto units = co_await get_units(_loading_sem, 1, _as);
auto copy_keys = [] (const std::unordered_map<resource, permission_set>& m) {
std::vector<resource> keys;
keys.reserve(m.size());
for (const auto& [res, _] : m) {
keys.push_back(res);
}
return keys;
};
const role_or_anonymous anon;
for (auto& [res, perms] : _anonymous_permissions) {
perms = co_await _permission_loader(anon, res);
for (const auto& res : copy_keys(_anonymous_permissions)) {
_anonymous_permissions[res] = co_await _permission_loader(anon, res);
}
for (auto& [role, entry] : _roles) {
auto& perms_cache = entry->cached_permissions;
auto r = role_or_anonymous(role);
for (auto& [res, perms] : perms_cache) {
perms = co_await _permission_loader(r, res);
for (const auto& res : copy_keys(perms_cache)) {
perms_cache[res] = co_await _permission_loader(r, res);
}
}
logger.debug("Reloaded auth cache with {} entries", _roles.size());
@@ -228,6 +236,7 @@ future<> cache::load_all() {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
auto units = co_await get_units(c._loading_sem, 1, c._as);
c._current_version = _current_version;
co_await c.prune_all();
});
@@ -287,10 +296,11 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
co_await container().invoke_on_others([&name, role_ptr](cache& c) -> future<> {
auto units = co_await get_units(c._loading_sem, 1, c._as);
if (!role_ptr) {
c.remove_role(name);
return;
co_return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c.add_role(name, std::move(role_copy));

View File

@@ -1192,6 +1192,7 @@ scylla_core = (['message/messaging_service.cc',
'utils/azure/identity/default_credentials.cc',
'utils/gcp/gcp_credentials.cc',
'utils/gcp/object_storage.cc',
'utils/gcp/object_storage_retry_strategy.cc',
'gms/version_generator.cc',
'gms/versioned_value.cc',
'gms/gossiper.cc',

View File

@@ -69,7 +69,7 @@ float compute_cosine_similarity(std::span<const float> v1, std::span<const float
}
if (squared_norm_a == 0 || squared_norm_b == 0) {
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
return std::numeric_limits<float>::quiet_NaN();
}
// The cosine similarity is in the range [-1, 1].

View File

@@ -2308,6 +2308,7 @@ future<> view_builder::drain() {
vlogger.info("Draining view builder");
_as.request_abort();
co_await _mnotifier.unregister_listener(this);
co_await _ops_gate.close();
co_await _vug.drain();
co_await _sem.wait();
_sem.broken();
@@ -2742,30 +2743,48 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na
}
// Do it in the background, serialized and broadcast from shard 0.
static_cast<void>(dispatch_create_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
return dispatch_create_view(std::move(ks_name), std::move(view_name));
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to dispatch view creation {}.{}: {}", ks_name, view_name, ep);
}));
}
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
future<> view_builder::dispatch_update_view(sstring ks_name, sstring view_name) {
if (should_ignore_tablet_keyspace(_db, ks_name)) {
return;
co_return;
}
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::nullopt);
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
if (step_it == _base_to_build_step.end()) {
co_return; // In case all the views for this CF have finished building already.
}
auto status_it = std::ranges::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
return bs.view->id() == view->id();
});
if (status_it != step_it->second.build_status.end()) {
status_it->view = std::move(view);
}
}
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
// Do it in the background, serialized.
(void)with_semaphore(_sem, view_builder_semaphore_units, [ks_name, view_name, this] {
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
if (step_it == _base_to_build_step.end()) {
return;// In case all the views for this CF have finished building already.
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
return dispatch_update_view(std::move(ks_name), std::move(view_name));
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const seastar::gate_closed_exception&) {
vlogger.warn("Ignoring gate_closed_exception during view update {}.{}", ks_name, view_name);
} catch (const seastar::broken_named_semaphore&) {
vlogger.warn("Ignoring broken_named_semaphore during view update {}.{}", ks_name, view_name);
} catch (const replica::no_such_column_family&) {
vlogger.warn("Ignoring no_such_column_family during view update {}.{}", ks_name, view_name);
}
auto status_it = std::ranges::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
return bs.view->id() == view->id();
});
if (status_it != step_it->second.build_status.end()) {
status_it->view = std::move(view);
}
}).handle_exception_type([] (replica::no_such_column_family&) { });
}));
}
future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) {
@@ -2827,7 +2846,9 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
}
// Do it in the background, serialized and broadcast from shard 0.
static_cast<void>(dispatch_drop_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
return dispatch_drop_view(std::move(ks_name), std::move(view_name));
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to dispatch view drop {}.{}: {}", ks_name, view_name, ep);
}));
}

View File

@@ -16,6 +16,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/sharded.hh>
@@ -190,6 +191,7 @@ class view_builder final : public service::migration_listener::only_view_notific
// Guard the whole startup routine with a semaphore so that it's not intercepted by
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
seastar::gate _ops_gate;
seastar::abort_source _as;
future<> _step_fiber = make_ready_future<>();
// Used to coordinate between shards the conclusion of the build process for a particular view.
@@ -284,6 +286,7 @@ private:
future<> mark_as_built(view_ptr);
void setup_metrics();
future<> dispatch_create_view(sstring ks_name, sstring view_name);
future<> dispatch_update_view(sstring ks_name, sstring view_name);
future<> dispatch_drop_view(sstring ks_name, sstring view_name);
future<> handle_seed_view_build_progress(const sstring& ks_name, const sstring& view_name);
future<> handle_create_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units);

View File

@@ -142,10 +142,6 @@ want modify a non-top-level attribute directly (e.g., a.b[3].c) need RMW:
Alternator implements such requests by reading the entire top-level
attribute a, modifying only a.b[3].c, and then writing back a.
Currently, Alternator doesn't use Tablets. That's because Alternator relies
on LWT (lightweight transactions), and LWT is not supported in keyspaces
with Tablets enabled.
```{eval-rst}
.. toctree::
:maxdepth: 2

View File

@@ -213,3 +213,71 @@ Alternator table, the following features will not work for this table:
* Enabling Streams with CreateTable or UpdateTable doesn't work
(results in an error).
See <https://github.com/scylladb/scylla/issues/23838>.
## Custom write timestamps
DynamoDB doesn't allow clients to set the write timestamp of updates. All
updates use the current server time as their timestamp, and ScyllaDB uses
these timestamps for last-write-wins conflict resolution when concurrent
writes reach different replicas.
ScyllaDB Alternator extends this with the `system:timestamp_attribute` tag,
which allows specifying a custom write timestamp for each PutItem,
UpdateItem, DeleteItem, or BatchWriteItem request. To use this feature:
1. Tag the table (at CreateTable time or using TagResource) with
`system:timestamp_attribute` set to the name of an attribute that will
hold the custom write timestamp.
2. When performing a PutItem or UpdateItem, include the named attribute
in the request with a numeric value. The value represents the write
timestamp in **microseconds since the Unix epoch** (this is the same
unit used internally by ScyllaDB for timestamps).
For a DeleteItem or a BatchWriteItem DeleteRequest, include the named
attribute in the `Key` parameter (it will be stripped from the key
before use).
3. The named attribute is **not stored** in the item data - it only
controls the write timestamp. If you also want to record the timestamp
as data, use a separate attribute for that purpose.
4. If the named attribute is absent, the write proceeds normally using the
current server time as the timestamp. If the named attribute is present
but has a non-numeric value, the write is rejected with a ValidationException.
### Limitations
- **Incompatible with conditions**: If the write includes a ConditionExpression
(or uses the `Expected` legacy condition), LWT is needed and the operation
is rejected with a ValidationException, because LWT requires the write
timestamp to be set by the Paxos protocol, not by the client.
- **Incompatible with `always` write isolation**: Tables using the `always`
(or `always_use_lwt`) write isolation policy cannot use the timestamp
attribute feature at all, because every write uses LWT in that mode.
When using `system:timestamp_attribute`, consider tagging the table with
`system:write_isolation=only_rmw_uses_lwt` (or `forbid_rmw`) so that
unconditional writes do not use LWT.
### Example use case
This feature is useful for ingesting data from multiple sources where each
record has a known logical timestamp. By setting the `system:timestamp_attribute`
tag, you can ensure that the record with the highest logical timestamp always
wins, regardless of ingestion order:
```python
# Create table with timestamp attribute
dynamodb.create_table(
TableName='my_table',
...
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'write_ts'}]
)
# Write a record with a specific timestamp (in microseconds since epoch)
table.put_item(Item={
'pk': 'my_key',
'data': 'new_value',
'write_ts': Decimal('1700000000000000'), # Nov 14, 2023 in microseconds
})
```

View File

@@ -187,6 +187,23 @@ You can create a keyspace with tablets enabled with the ``tablets = {'enabled':
the keyspace schema with ``tablets = { 'enabled': false }`` or
``tablets = { 'enabled': true }``.
.. _keyspace-rf-rack-valid-to-enforce-rack-list:
Enforcing Rack-List Replication for Tablet Keyspaces
------------------------------------------------------------------
The ``rf_rack_valid_keyspaces`` is a legacy option that ensures that all keyspaces with tablets enabled are
:term:`RF-rack-valid <RF-rack-valid keyspace>`.
Requiring every tablet keyspace to use the rack list replication factor exclusively is enough to guarantee the keyspace is
:term:`RF-rack-valid <RF-rack-valid keyspace>`. It reduces restrictions and provides stronger guarantees compared
to ``rf_rack_valid_keyspaces`` option.
To enforce rack list in tablet keyspaces, use ``enforce_rack_list`` option. It can be set only if all tablet keyspaces use
rack list. To ensure that, follow a procedure of :ref:`conversion to rack list replication factor <conversion-to-rack-list-rf>`.
After that restart all nodes in the cluster, with ``enforce_rack_list`` enabled and ``rf_rack_valid_keyspaces`` disabled. Make
sure to avoid setting or updating replication factor (with CREATE KEYSPACE or ALTER KEYSPACE) while nodes are being restarted.
.. _tablets-limitations:
Limitations and Unsupported Features

View File

@@ -200,8 +200,6 @@ for two cases. One is setting replication factor to 0, in which case the number
The other is when the numeric replication factor is equal to the current number of replicas
for a given datacanter, in which case the current rack list is preserved.
Altering from a numeric replication factor to a rack list is not supported yet.
Note that when ``ALTER`` ing keyspaces and supplying ``replication_factor``,
auto-expansion will only *add* new datacenters for safety, it will not alter
existing datacenters or remove any even if they are no longer in the cluster.
@@ -424,6 +422,21 @@ Altering from a rack list to a numeric replication factor is not supported.
Keyspaces which use rack lists are :term:`RF-rack-valid <RF-rack-valid keyspace>` if each rack in the rack list contains at least one node (excluding :doc:`zero-token nodes </architecture/zero-token-nodes>`).
.. _conversion-to-rack-list-rf:
Conversion to rack-list replication factor
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To migrate a keyspace from a numeric replication factor to a rack-list replication factor, provide the rack-list replication factor explicitly in ALTER KEYSPACE statement. The number of racks in the list must be equal to the numeric replication factor. The replication factor can be converted in any number of DCs at once. In a statement that converts replication factor, no replication factor updates (increase or decrease) are allowed in any DC.
.. code-block:: cql
CREATE KEYSPACE Excelsior
WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 1} AND tablets = { 'enabled': true };
ALTER KEYSPACE Excelsior
WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : ['RAC1', 'RAC2', 'RAC3'], 'dc2' : ['RAC4']} AND tablets = { 'enabled': true };
.. _drop-keyspace-statement:
DROP KEYSPACE

View File

@@ -10,7 +10,6 @@ Install ScyllaDB |CURRENT_VERSION|
/getting-started/install-scylla/launch-on-azure
/getting-started/installation-common/scylla-web-installer
/getting-started/install-scylla/install-on-linux
/getting-started/installation-common/install-jmx
/getting-started/install-scylla/run-in-docker
/getting-started/installation-common/unified-installer
/getting-started/installation-common/air-gapped-install
@@ -24,9 +23,9 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:id: "getting-started"
:class: my-panel
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on AWS </getting-started/install-scylla/launch-on-aws>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on GCP </getting-started/install-scylla/launch-on-gcp>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on Azure </getting-started/install-scylla/launch-on-azure>`
* :doc:`Launch ScyllaDB on AWS </getting-started/install-scylla/launch-on-aws>`
* :doc:`Launch ScyllaDB on GCP </getting-started/install-scylla/launch-on-gcp>`
* :doc:`Launch ScyllaDB on Azure </getting-started/install-scylla/launch-on-azure>`
.. panel-box::
@@ -35,8 +34,7 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:class: my-panel
* :doc:`Install ScyllaDB with Web Installer (recommended) </getting-started/installation-common/scylla-web-installer>`
* :doc:`Install ScyllaDB |CURRENT_VERSION| Linux Packages </getting-started/install-scylla/install-on-linux>`
* :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
* :doc:`Install ScyllaDB Linux Packages </getting-started/install-scylla/install-on-linux>`
* :doc:`Install ScyllaDB Without root Privileges </getting-started/installation-common/unified-installer>`
* :doc:`Air-gapped Server Installation </getting-started/installation-common/air-gapped-install>`
* :doc:`ScyllaDB Developer Mode </getting-started/installation-common/dev-mod>`

View File

@@ -94,16 +94,6 @@ Install ScyllaDB
apt-get install scylla{,-server,-kernel-conf,-node-exporter,-conf,-python3,-cqlsh}=2025.3.1-0.20250907.2bbf3cf669bb-1
#. (Ubuntu only) Set Java 11.
.. code-block:: console
sudo apt-get update
sudo apt-get install -y openjdk-11-jre-headless
sudo update-java-alternatives --jre-headless -s java-1.11.0-openjdk-amd64
.. group-tab:: Centos/RHEL
#. Install the EPEL repository.
@@ -157,14 +147,6 @@ Install ScyllaDB
sudo yum install scylla-5.2.3
(Optional) Install scylla-jmx
-------------------------------
scylla-jmx is an optional package and is not installed by default.
If you need JMX server, see :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`.
.. include:: /getting-started/_common/setup-after-install.rst
Next Steps

View File

@@ -1,78 +0,0 @@
======================================
Install scylla-jmx Package
======================================
scylla-jmx is an optional package and is not installed by default.
If you need JMX server, you can still install it from scylla-jmx GitHub page.
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Download .deb package from scylla-jmx page.
Access to https://github.com/scylladb/scylla-jmx, select latest
release from "releases", download a file end with ".deb".
#. (Optional) Transfer the downloaded package to the install node.
If the pc from which you downloaded the package is different from
the node where you install scylladb, you will need to transfer
the files to the node.
#. Install scylla-jmx package.
.. code-block:: console
sudo apt install -y ./scylla-jmx_<version>_all.deb
.. group-tab:: Centos/RHEL
#. Download .rpm package from scylla-jmx page.
Access to https://github.com/scylladb/scylla-jmx, select latest
release from "releases", download a file end with ".rpm".
#. (Optional) Transfer the downloaded package to the install node.
If the pc from which you downloaded the package is different from
the node where you install scylladb, you will need to transfer
the files to the node.
#. Install scylla-jmx package.
.. code-block:: console
sudo yum install -y ./scylla-jmx-<version>.noarch.rpm
.. group-tab:: Install without root privileges
#. Download .tar.gz package from scylla-jmx page.
Access to https://github.com/scylladb/scylla-jmx, select latest
release from "releases", download a file end with ".tar.gz".
#. (Optional) Transfer the downloaded package to the install node.
If the pc from which you downloaded the package is different from
the node where you install scylladb, you will need to transfer
the files to the node.
#. Install scylla-jmx package.
.. code:: console
tar xpf scylla-jmx-<version>.noarch.tar.gz
cd scylla-jmx
./install.sh --nonroot
Next Steps
-----------
* :doc:`Configure ScyllaDB </getting-started/system-configuration>`
* Manage your clusters with `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_
* Monitor your cluster and data with `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_
* Get familiar with ScyllaDBs :doc:`command line reference guide </operating-scylla/nodetool>`.
* Learn about ScyllaDB at `ScyllaDB University <https://university.scylladb.com/>`_

View File

@@ -49,11 +49,6 @@ Download and Install
./install.sh --nonroot --python3 ~/scylladb/python3/bin/python3
#. (Optional) Install scylla-jmx
scylla-jmx is an optional package and is not installed by default.
If you need JMX server, see :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`.
Configure and Run ScyllaDB
----------------------------

View File

@@ -25,4 +25,8 @@ For Example:
nodetool rebuild <source-dc-name>
``nodetool rebuild`` command works only for vnode keyspaces. For tablet keyspaces, use ``nodetool cluster repair`` instead.
See :doc:`Data Distribution with Tablets </architecture/tablets/>`.
.. include:: nodetool-index.rst

View File

@@ -155,7 +155,6 @@ Add New DC
UN 54.235.9.159 109.75 KB 256 ? 39798227-9f6f-4868-8193-08570856c09a RACK1
UN 54.146.228.25 128.33 KB 256 ? 7a4957a1-9590-4434-9746-9c8a6f796a0c RACK1
.. TODO possibly provide additional information WRT how ALTER works with tablets
#. When all nodes are up and running ``ALTER`` the following Keyspaces in the new nodes:
@@ -171,26 +170,68 @@ Add New DC
DESCRIBE KEYSPACE mykeyspace;
CREATE KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3};
CREATE KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3};
ALTER Command
.. code-block:: cql
ALTER KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
ALTER KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
ALTER KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
ALTER KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
ALTER KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
ALTER KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
After
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace;
CREATE KEYSPACE mykeyspace WITH REPLICATION = {'class: 'NetworkTopologyStrategy', <exiting_dc>:3, <new_dc>: 3};
CREATE KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
CREATE KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
CREATE KEYSPACE mykeyspace WITH REPLICATION = {'class': 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
CREATE KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
CREATE KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
#. Run ``nodetool rebuild`` on each node in the new datacenter, specify the existing datacenter name in the rebuild command.
For tablet keyspaces, update the replication factor one by one:
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace2;
CREATE KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3} AND tablets = { 'enabled': true };
.. code-block:: cql
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 1} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 2} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3} AND tablets = { 'enabled': true };
.. note::
If ``rf_rack_valid_keyspaces`` option is set, a tablet keyspace needs to use rack list replication factor, so that a new DC (rack) can be added. See :ref:`the conversion procedure <conversion-to-rack-list-rf>`. In this case, to add a datacenter:
Before
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace3;
CREATE KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>']} AND tablets = { 'enabled': true };
Add all the nodes to the new datacenter and then alter the keyspace one by one:
.. code-block:: cql
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>']} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>']} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
After
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace3;
CREATE KEYSPACE mykeyspace3 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
#. If any vnode keyspace was altered, run ``nodetool rebuild`` on each node in the new datacenter, specifying the existing datacenter name in the rebuild command.
For example:
@@ -198,7 +239,7 @@ Add New DC
The rebuild ensures that the new nodes that were just added to the cluster will recognize the existing datacenters in the cluster.
#. Run a full cluster repair, using :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair>` on each node, or using `ScyllaDB Manager ad-hoc repair <https://manager.docs.scylladb.com/stable/repair>`_
#. If any vnode keyspace was altered, run a full cluster repair, using :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair>` on each node, or using `ScyllaDB Manager ad-hoc repair <https://manager.docs.scylladb.com/stable/repair>`_
#. If you are using ScyllaDB Monitoring, update the `monitoring stack <https://monitoring.docs.scylladb.com/stable/install/monitoring_stack.html#configure-scylla-nodes-from-files>`_ to monitor it. If you are using ScyllaDB Manager, make sure you install the `Manager Agent <https://manager.docs.scylladb.com/stable/install-scylla-manager-agent.html>`_ and Manager can access the new DC.

View File

@@ -40,12 +40,14 @@ Prerequisites
Procedure
---------
#. Run the ``nodetool repair -pr`` command on each node in the data-center that is going to be decommissioned. This will verify that all the data is in sync between the decommissioned data-center and the other data-centers in the cluster.
#. If there are vnode keyspaces in this DC, run the ``nodetool repair -pr`` command on each node in the data-center that is going to be decommissioned. This will verify that all the data is in sync between the decommissioned data-center and the other data-centers in the cluster.
For example:
If the ASIA-DC cluster is to be removed, then, run the ``nodetool repair -pr`` command on all the nodes in the ASIA-DC
#. If there are tablet keyspaces in this DC, run the ``nodetool cluster repair`` on an arbitrary node. The reason for running repair is to ensure that any updates stored only on the about-to-be-decommissioned replicas are propagated to the other replicas, before the replicas on the decommissioned datacenter are dropped.
#. ALTER every cluster KEYSPACE, so that the keyspaces will no longer replicate data to the decommissioned data-center.
For example:
@@ -73,6 +75,33 @@ Procedure
cqlsh> ALTER KEYSPACE nba WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 0, 'EUROPE-DC' : 3};
For tablet keyspaces, update the replication factor one by one:
.. code-block:: shell
cqlsh> DESCRIBE nba2
cqlsh> CREATE KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 2, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
.. code-block:: shell
cqlsh> ALTER KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 1, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
cqlsh> ALTER KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 0, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
.. note::
If ``rf_rack_valid_keyspaces`` option is set, a tablet keyspace needs to use rack list replication factor, so that the DC can be removed. See :ref:`the conversion procedure <conversion-to-rack-list-rf>`. In this case, to remove a datacenter:
.. code-block:: shell
cqlsh> DESCRIBE nba3
cqlsh> CREATE KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : ['RAC4', 'RAC5'], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
.. code-block:: shell
cqlsh> ALTER KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : ['RAC4'], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
cqlsh> ALTER KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : [], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
.. note::
If table audit is enabled, the ``audit`` keyspace is automatically created with ``NetworkTopologyStrategy``.

View File

@@ -199,9 +199,6 @@ You should take note of the current version in case you want to |ROLLBACK|_ the
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
If you need JMX server, see
:doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
and get new version.
Start the node
--------------

View File

@@ -283,17 +283,6 @@ concept CanHandleLuaTypes = requires(Func f) {
{ f(*static_cast<const lua_table*>(nullptr)) } -> std::same_as<lua_visit_ret_type<Func>>;
};
// This is used to test if a double fits in a long long, so
// we expect overflows. Prevent the sanitizer from complaining.
#ifdef __clang__
[[clang::no_sanitize("undefined")]]
#endif
static
long long
cast_to_long_long_allow_overflow(double v) {
return (long long)v;
}
template <typename Func>
requires CanHandleLuaTypes<Func>
static auto visit_lua_value(lua_State* l, int index, Func&& f) {
@@ -304,9 +293,10 @@ static auto visit_lua_value(lua_State* l, int index, Func&& f) {
auto operator()(const long long& v) { return f(utils::multiprecision_int(v)); }
auto operator()(const utils::multiprecision_int& v) { return f(v); }
auto operator()(const double& v) {
long long v2 = cast_to_long_long_allow_overflow(v);
if (v2 == v) {
return (*this)(v2);
auto min = double(std::numeric_limits<long long>::min());
auto max = double(std::numeric_limits<long long>::max());
if (min <= v && v <= max && std::trunc(v) == v) {
return (*this)((long long)v);
}
// FIXME: We could use frexp to produce a decimal instead of a double
return f(v);

View File

@@ -616,12 +616,16 @@ tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topo
return maybe_get_primary_replica(id, replicas, topo, [&] (const auto& _) { return true; }).value();
}
tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
if (get_tablet_info(id).replicas.size() < 2) {
tablet_replica tablet_map::get_secondary_replica(tablet_id id, const locator::topology& topo) const {
const auto& orig_replicas = get_tablet_info(id).replicas;
if (orig_replicas.size() < 2) {
throw std::runtime_error(format("No secondary replica for tablet id {}", id));
}
const auto& replicas = get_tablet_info(id).replicas;
return replicas.at((size_t(id)+1) % replicas.size());
tablet_replica_set replicas = orig_replicas;
std::ranges::sort(replicas, tablet_replica_comparator(topo));
// This formula must match the one in get_primary_replica(),
// just with + 1.
return replicas.at((size_t(id) + size_t(id) / replicas.size() + 1) % replicas.size());
}
std::optional<tablet_replica> tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const {

View File

@@ -648,9 +648,10 @@ public:
/// Returns the primary replica for the tablet
tablet_replica get_primary_replica(tablet_id id, const locator::topology& topo) const;
/// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector
/// Returns the secondary replica for the tablet: the replica that immediately follows the primary
/// replica in the topology-sorted replica list.
/// \throws std::runtime_error if the tablet has less than 2 replicas.
tablet_replica get_secondary_replica(tablet_id id) const;
tablet_replica get_secondary_replica(tablet_id id, const locator::topology& topo) const;
// Returns the replica that matches hosts and dcs filters for tablet_task_info.
std::optional<tablet_replica> maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const;

View File

@@ -7329,7 +7329,11 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
const locator::host_id this_host = _db.local().get_token_metadata().get_my_id();
std::vector<uint64_t> tablet_sizes_per_shard(smp::count);
// Align to 64 bytes to avoid cache line ping-pong when updating size in map_reduce0() below
struct alignas(64) aligned_tablet_size {
uint64_t size = 0;
};
std::vector<aligned_tablet_size> tablet_sizes_per_shard(smp::count);
// Each node combines a per-table load map from all of its shards and returns it to the coordinator.
// So if there are 1k nodes, there will be 1k RPCs in total.
@@ -7371,7 +7375,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
locator::combined_load_stats combined_ls { table->table_load_stats(tablet_filter) };
load_stats.tables.emplace(id, std::move(combined_ls.table_ls));
tablet_sizes_per_shard[this_shard_id()] += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls);
tablet_sizes_per_shard[this_shard_id()].size += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls);
co_await coroutine::maybe_yield();
}
@@ -7390,7 +7394,10 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
if (config_capacity != 0) {
tls.effective_capacity = config_capacity;
} else {
const uint64_t sum_tablet_sizes = std::reduce(tablet_sizes_per_shard.begin(), tablet_sizes_per_shard.end());
uint64_t sum_tablet_sizes = 0;
for (const auto& ts : tablet_sizes_per_shard) {
sum_tablet_sizes += ts.size;
}
tls.effective_capacity = si.available + sum_tablet_sizes;
}

View File

@@ -0,0 +1,356 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# Tests for the system:timestamp_attribute Scylla-specific feature.
# This feature allows users to control the write timestamp of PutItem and
# UpdateItem operations by specifying an attribute name in the table's
# system:timestamp_attribute tag. When that attribute is present in the
# write request with a numeric value (microseconds since Unix epoch), it
# is used as the write timestamp. The attribute itself is not stored in
# the item data.
#
# This is a Scylla-specific feature and is not tested on DynamoDB.
import time
import pytest
from botocore.exceptions import ClientError
from decimal import Decimal
from .util import create_test_table, random_string
# A large timestamp in microseconds (far future, year ~2033)
LARGE_TS = Decimal('2000000000000000')
# A medium timestamp in microseconds (year ~2001)
MEDIUM_TS = Decimal('1000000000000000')
# A small timestamp in microseconds (year ~1970+)
SMALL_TS = Decimal('100000000000000')
# Fixtures for tables with the system:timestamp_attribute tag. The tables
# are created once per module and shared between all tests that use them,
# to avoid the overhead of creating and deleting tables for each test.
# Because system:timestamp_attribute is a Scylla-only feature, all tests
# using these fixtures are implicitly Scylla-only (via scylla_only parameter).
# A table with only a hash key and system:timestamp_attribute='ts' tag.
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
# correct even if the server default changes to always_use_lwt in the future.
@pytest.fixture(scope="module")
def test_table_ts(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
yield table
table.delete()
# A table with hash (string) and range (string) keys and system:timestamp_attribute='ts' tag.
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
# correct even if the server default changes to always_use_lwt in the future.
@pytest.fixture(scope="module")
def test_table_ts_ss(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH'},
{'AttributeName': 'c', 'KeyType': 'RANGE'},
],
AttributeDefinitions=[
{'AttributeName': 'p', 'AttributeType': 'S'},
{'AttributeName': 'c', 'AttributeType': 'S'},
])
yield table
table.delete()
# A table with hash key, system:timestamp_attribute='ts' tag, and
# system:write_isolation='always' to test rejection in LWT_ALWAYS mode.
# In always_use_lwt mode, every write uses LWT, so the timestamp attribute
# feature cannot be used at all.
@pytest.fixture(scope="module")
def test_table_ts_lwt(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'always'}],
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
yield table
table.delete()
# Test that PutItem with the timestamp attribute uses the given numeric
# value as the write timestamp, and the timestamp attribute is NOT stored
# in the item.
def test_timestamp_attribute_put_item_basic(test_table_ts):
p = random_string()
# Put an item with the timestamp attribute
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Read the item back
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# 'val' should be stored normally
assert item['val'] == 'hello'
# 'ts' (the timestamp attribute) should NOT be stored in the item
assert 'ts' not in item
# Test that PutItem respects the write timestamp ordering: a write with a
# larger timestamp should win over a write with a smaller timestamp,
# regardless of wall-clock order.
def test_timestamp_attribute_put_item_ordering(test_table_ts):
p = random_string()
# First, write item with a LARGE timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
# Then write item with a SMALL timestamp (should lose since SMALL < LARGE)
test_table_ts.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
# The item with the larger timestamp (val='large_ts') should win
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'large_ts'
# Now try to overwrite with a LARGER timestamp (should win)
test_table_ts.put_item(Item={'p': p, 'val': 'latest', 'ts': LARGE_TS + 1})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'latest'
# Test that UpdateItem with the timestamp attribute in AttributeUpdates
# uses the given numeric value as the write timestamp, and the timestamp
# attribute is NOT stored in the item.
def test_timestamp_attribute_update_item_attribute_updates(test_table_ts):
p = random_string()
# Use UpdateItem with AttributeUpdates, setting 'val' and 'ts'
test_table_ts.update_item(
Key={'p': p},
AttributeUpdates={
'val': {'Value': 'hello', 'Action': 'PUT'},
'ts': {'Value': LARGE_TS, 'Action': 'PUT'},
}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# 'ts' should NOT be stored in the item
assert 'ts' not in item
# Update with a smaller timestamp - should NOT overwrite
test_table_ts.update_item(
Key={'p': p},
AttributeUpdates={
'val': {'Value': 'overwritten', 'Action': 'PUT'},
'ts': {'Value': SMALL_TS, 'Action': 'PUT'},
}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# The item with the larger timestamp should still win
assert item['val'] == 'hello'
# Test that UpdateItem with the timestamp attribute in UpdateExpression
# uses the given numeric value as the write timestamp, and the timestamp
# attribute is NOT stored in the item.
def test_timestamp_attribute_update_item_update_expression(test_table_ts):
p = random_string()
# Use UpdateItem with UpdateExpression to set 'val' and 'ts'
test_table_ts.update_item(
Key={'p': p},
UpdateExpression='SET val = :v, ts = :t',
ExpressionAttributeValues={':v': 'hello', ':t': LARGE_TS}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# 'ts' should NOT be stored in the item
assert 'ts' not in item
# Update with a smaller timestamp - should NOT overwrite
test_table_ts.update_item(
Key={'p': p},
UpdateExpression='SET val = :v, ts = :t',
ExpressionAttributeValues={':v': 'overwritten', ':t': SMALL_TS}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# The item with the larger timestamp should still win
assert item['val'] == 'hello'
# Test that when the timestamp attribute is not present in the write request,
# the operation behaves normally (no custom timestamp is applied).
def test_timestamp_attribute_absent(test_table_ts):
p = random_string()
# Put item without the timestamp attribute
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# No 'ts' attribute expected either
assert 'ts' not in item
# Test that using a condition expression (which requires LWT) together with
# the timestamp attribute is rejected.
def test_timestamp_attribute_with_condition_rejected(test_table_ts):
p = random_string()
# Put an initial item (no timestamp attribute, so LWT is ok)
test_table_ts.put_item(Item={'p': p, 'val': 'initial'})
# Try to put with a ConditionExpression and a timestamp - should be rejected
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.put_item(
Item={'p': p, 'val': 'updated', 'ts': LARGE_TS},
ConditionExpression='attribute_exists(p)'
)
# Test that using the timestamp attribute with the 'always' write isolation
# policy is rejected, because in always_use_lwt mode every write uses LWT
# (including unconditional ones), which is incompatible with custom timestamps.
def test_timestamp_attribute_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
# Even a plain PutItem with a timestamp is rejected in LWT_ALWAYS mode
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Test that when the timestamp attribute has a non-numeric value, the write
# is rejected with a ValidationException.
def test_timestamp_attribute_non_numeric(test_table_ts):
p = random_string()
# Put item with the timestamp attribute as a string (non-numeric) - should fail
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': 'not_a_number'})
# Test that the timestamp attribute tag can be set on a table with a sort key.
def test_timestamp_attribute_with_range_key(test_table_ts_ss):
p = random_string()
c = random_string()
# Write with a large timestamp
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'large', 'ts': LARGE_TS})
# Write with a small timestamp (should lose)
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'small', 'ts': SMALL_TS})
item = test_table_ts_ss.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
assert item['val'] == 'large'
assert 'ts' not in item
# Test that the timestamp attribute value is interpreted in microseconds since
# the Unix epoch, and that writes with and without explicit timestamps interact
# correctly.
def test_timestamp_attribute_microseconds(test_table_ts):
# Get current time in microseconds from the Python client side.
now_us = int(time.time() * 1_000_000)
one_hour_us = 3600 * 1_000_000
# Part 1: write with the current time as the explicit timestamp, then
# overwrite without an explicit timestamp. The second write uses the
# server's current time (which is >= now_us), so it should win.
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'old', 'ts': Decimal(str(now_us))})
test_table_ts.put_item(Item={'p': p, 'val': 'new'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'new'
# Part 2: write with a timestamp one hour in the future, then overwrite
# without an explicit timestamp. The server's current time (≈ now_us) is
# much less than now_us + one_hour_us, so the first write should win.
p = random_string()
future_us = now_us + one_hour_us
test_table_ts.put_item(Item={'p': p, 'val': 'future', 'ts': Decimal(str(future_us))})
test_table_ts.put_item(Item={'p': p, 'val': 'now'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'future'
# Test that BatchWriteItem also respects the timestamp attribute.
def test_timestamp_attribute_batch_write(test_table_ts):
p = random_string()
# Write item via BatchWriteItem with a large timestamp
with test_table_ts.batch_writer() as batch:
batch.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
# Write item via BatchWriteItem with a small timestamp (should lose)
with test_table_ts.batch_writer() as batch:
batch.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'large_ts'
assert 'ts' not in item
# Test that DeleteItem respects the timestamp attribute: a delete with a
# smaller timestamp than the item's write timestamp should not take effect.
def test_timestamp_attribute_delete_item(test_table_ts):
p = random_string()
# Write an item with a large timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# Delete with a small timestamp - the delete should lose (item still exists)
test_table_ts.delete_item(Key={'p': p, 'ts': SMALL_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Delete with a large timestamp - the delete should win (item is removed)
test_table_ts.delete_item(Key={'p': p, 'ts': LARGE_TS + 1})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is None
# Test that DeleteItem without the timestamp attribute in the key behaves
# normally (no custom timestamp is applied).
def test_timestamp_attribute_delete_item_no_ts(test_table_ts):
p = random_string()
# Use SMALL_TS so the delete (which uses the current server time) wins.
# If we used LARGE_TS (far future), the delete without an explicit timestamp
# would use current time which is smaller than LARGE_TS and the delete would lose.
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': SMALL_TS})
# Delete without a timestamp attribute - should succeed normally
test_table_ts.delete_item(Key={'p': p})
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
# Verify that an item written with a far-future timestamp is NOT deleted by
# a delete without an explicit timestamp (server time < LARGE_TS).
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
test_table_ts.delete_item(Key={'p': p})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Test that DeleteItem with a non-numeric timestamp attribute is rejected.
def test_timestamp_attribute_delete_item_non_numeric(test_table_ts):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.delete_item(Key={'p': p, 'ts': 'not_a_number'})
# Test that BatchWriteItem DeleteRequest also respects the timestamp attribute.
def test_timestamp_attribute_batch_delete(test_table_ts):
p = random_string()
# Write an item with a large timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Delete via BatchWriteItem with a small timestamp - delete should lose
test_table_ts.meta.client.batch_write_item(RequestItems={
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Delete via BatchWriteItem with a large timestamp - delete should win
test_table_ts.meta.client.batch_write_item(RequestItems={
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': LARGE_TS + 1}}}]
})
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
# Test that DeleteItem with a ConditionExpression and a custom timestamp is
# rejected, because conditional writes require LWT which is incompatible with
# custom timestamps.
def test_timestamp_attribute_delete_item_condition_rejected(test_table_ts):
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.delete_item(
Key={'p': p, 'ts': SMALL_TS},
ConditionExpression='attribute_exists(p)'
)
# Test that DeleteItem with a custom timestamp is rejected when the table uses
# always_use_lwt isolation, because every write uses LWT in that mode.
def test_timestamp_attribute_delete_item_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.delete_item(Key={'p': p, 'ts': SMALL_TS})
# Test that BatchWriteItem PutRequest with a custom timestamp is rejected when
# the table uses always_use_lwt isolation.
def test_timestamp_attribute_batch_put_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
test_table_ts_lwt.name: [{'PutRequest': {'Item': {'p': p, 'val': 'v', 'ts': SMALL_TS}}}]
})
# Test that BatchWriteItem DeleteRequest with a custom timestamp is rejected
# when the table uses always_use_lwt isolation.
def test_timestamp_attribute_batch_delete_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
test_table_ts_lwt.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
})

View File

@@ -6096,4 +6096,83 @@ SEASTAR_THREAD_TEST_CASE(test_tablet_manual_repair_rf1_auto_repair_on) {
do_with_cql_env_thread(run_tablet_manual_repair_rf1, std::move(cfg_in)).get();
}
// Test for tablet_map::get_secondary_replica() and specifically how it
// relates to get_primary_replica().
// We never officially documented given a list of replicas, which replica
// is to be considered the "primary" - it's not simply the first replica in
// the list but the first in some reshuffling of the list, reshuffling whose
// details changed in commits like 817fdad and d88036d. So this patch doesn't
// enshrine what get_primary_replica() or get_secondary_replica() should
// return. It just verifies that get_secondary_replica() returns a *different*
// replica than get_primary_replica() if there are 2 or more replicas, or
// throws an error when there's just one replica.
// Reproduces SCYLLADB-777.
SEASTAR_THREAD_TEST_CASE(test_get_secondary_replica) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
locator::topology::config cfg = {
.this_endpoint = inet_address("127.0.0.1"),
.this_host_id = h1,
.local_dc_rack = endpoint_dc_rack::default_location,
};
auto topo = locator::topology(cfg);
topo.add_or_update_endpoint(h1, endpoint_dc_rack::default_location, node::state::normal);
topo.add_or_update_endpoint(h2, endpoint_dc_rack::default_location, node::state::normal);
topo.add_or_update_endpoint(h3, endpoint_dc_rack::default_location, node::state::normal);
// With 1 replica, get_secondary_replica should throw.
{
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
BOOST_REQUIRE_THROW(tmap.get_secondary_replica(tid, topo), std::runtime_error);
}
// With 2 replicas, get_secondary_replica should return a different replica
// than get_primary_replica for every tablet.
{
tablet_map tmap(4);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 0},
}
});
}
for (auto tid : tmap.tablet_ids()) {
auto primary = tmap.get_primary_replica(tid, topo);
auto secondary = tmap.get_secondary_replica(tid, topo);
BOOST_REQUIRE(primary != secondary);
}
}
// With 3 replicas, same check.
{
tablet_map tmap(4);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 0},
tablet_replica {h3, 0},
}
});
}
for (auto tid : tmap.tablet_ids()) {
auto primary = tmap.get_primary_replica(tid, topo);
auto secondary = tmap.get_secondary_replica(tid, topo);
BOOST_REQUIRE(primary != secondary);
}
}
topo.clear_gently().get();
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -1,102 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import pytest
from cassandra.cluster import Session
from cassandra.protocol import ConfigurationException, InvalidRequest
from dtest_class import Tester
logger = logging.getLogger(__name__)
def create_ks_and_assert_warning(session, query, ks_name, key_warn_msg_words):
ret = session.execute_async(query)
_ = ret.result()
found = False
if len(key_warn_msg_words) > 0:
assert len(ret.warnings) >= 1, "Expected RF guardrail warning"
for warning in ret.warnings:
found = found or all(word in warning.lower() for word in key_warn_msg_words)
assert found, "Didn't match all required keywords"
session.execute(f"USE {ks_name}")
def assert_creating_ks_fails(session, query, ks_name):
with pytest.raises(ConfigurationException):
session.execute(query)
with pytest.raises(InvalidRequest):
session.execute(f"USE {ks_name}")
@pytest.mark.next_gating
class TestGuardrails(Tester):
def test_default_rf(self):
"""
As of now, the only RF guardrail enabled is a soft limit checking that RF >= 3. Not complying to this soft limit
results in a CQL being executed, but with a warning. Also, whatever the guardrails' values, RF = 0 is always OK.
"""
cluster = self.cluster
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cluster.set_configuration_options(values={"rf_rack_valid_keyspaces": False})
cluster.populate([1, 1, 1]).start(wait_other_notice=True)
session_dc1: Session = self.patient_cql_connection(cluster.nodelist()[0])
ks_name = "ks"
rf = {"dc1": 2, "dc2": 3, "dc3": 0}
query = "CREATE KEYSPACE %s WITH REPLICATION={%s}"
options = ", ".join(["'%s':%d" % (dc_value, rf_value) for dc_value, rf_value in rf.items()])
query = query % (ks_name, "'class':'NetworkTopologyStrategy', %s" % options)
create_ks_and_assert_warning(session_dc1, query, ks_name, ["warn", "min", "replication", "factor", "3", "dc1", "2"])
def test_all_rf_limits(self):
"""
There're 4 limits for RF: soft/hard min and soft/hard max limits. Breaking soft limits issues a warning,
breaking the hard limits prevents the query from being executed.
"""
cluster = self.cluster
MIN_FAIL_THRESHOLD = 2
MIN_WARN_THRESHOLD = 3
MAX_WARN_THRESHOLD = 4
MAX_FAIL_THRESHOLD = 5
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cluster.set_configuration_options(values={"rf_rack_valid_keyspaces": False})
cluster.set_configuration_options(
values={
"minimum_replication_factor_fail_threshold": MIN_FAIL_THRESHOLD, "minimum_replication_factor_warn_threshold": MIN_WARN_THRESHOLD, "maximum_replication_factor_warn_threshold": MAX_WARN_THRESHOLD,
"maximum_replication_factor_fail_threshold": MAX_FAIL_THRESHOLD
}
)
query = "CREATE KEYSPACE %s WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1': %s}"
cluster.populate([1]).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
def test_rf(rf):
ks_name = f"ks_{rf}"
if rf < MIN_FAIL_THRESHOLD or rf > MAX_FAIL_THRESHOLD:
assert_creating_ks_fails(session, query % (ks_name, rf), ks_name)
elif rf < MIN_WARN_THRESHOLD:
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, ["warn", "min", "replication", "factor", str(MIN_WARN_THRESHOLD), "dc1", "2"])
elif rf > MAX_WARN_THRESHOLD:
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, ["warn", "max", "replication", "factor", str(MAX_WARN_THRESHOLD), "dc1", "5"])
else:
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, [])
for rf in range(MIN_FAIL_THRESHOLD - 1, MAX_FAIL_THRESHOLD + 1):
test_rf(rf)

View File

@@ -61,8 +61,8 @@ async def test_mv_build_during_shutdown(manager: ManagerClient):
# Start building two views. The first is delayed by the injection, and the second
# view build is queued, waiting on the view builder semaphore.
await manager.api.enable_injection(server.ip_addr, "delay_before_get_view_natural_endpoint", one_shot=True)
create_task1 = cql.run_async(f"CREATE materialized view {ks}.t_view1 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
create_task2 = cql.run_async(f"CREATE materialized view {ks}.t_view2 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
await cql.run_async(f"CREATE materialized view {ks}.t_view1 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
await cql.run_async(f"CREATE materialized view {ks}.t_view2 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
@@ -80,4 +80,4 @@ async def test_mv_build_during_shutdown(manager: ManagerClient):
# For dropping the keyspace
await manager.server_start(server.server_id)
await reconnect_driver(manager)
asyncio.gather(create_task1, create_task2)

View File

@@ -95,7 +95,8 @@ async def test_garbage_collect(manager: ManagerClient, object_storage):
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options']}
server = await manager.server_add(config=cfg)
cmd = ['--logger-log-level', 's3=trace:http=debug:gcp_storage=trace']
server = await manager.server_add(config=cfg, cmdline=cmd)
cql = manager.get_cql()

View File

@@ -15,8 +15,8 @@ from typing import Callable
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass
from test.cluster.conftest import PHASE_REPORT_KEY
from test.pylib.manager_client import ManagerClient
from test.cluster.conftest import *
from test.pylib.util import gather_safely

View File

@@ -19,7 +19,7 @@ from test.cluster.util import get_topology_coordinator, find_server_by_host_id,
from test.pylib.manager_client import ManagerClient
from test.pylib.tablets import get_tablet_count
from test.pylib.util import Host
from test.storage.conftest import space_limited_servers
from test.cluster.storage.conftest import space_limited_servers
logger = logging.getLogger(__name__)

View File

@@ -47,6 +47,5 @@ run_in_dev:
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test
- dtest/guardrails_test
run_in_debug:
- random_failures/test_random_failures

View File

@@ -183,6 +183,72 @@ async def test_alternator_ttl_scheduling_group(manager: ManagerClient):
table.delete()
@pytest.mark.parametrize("with_down_node", [False, True], ids=["all_nodes_up", "one_node_down"])
async def test_alternator_ttl_multinode_expiration(manager: ManagerClient, with_down_node):
"""When the cluster has multiple nodes, different nodes are responsible
for checking expiration in different token ranges - each node is
responsible for its "primary ranges". Let's check that this expiration
really does happen - for the entire token range - by writing many
partitions that will span the entire token range, and seeing that they
all expire. We don't check that nodes don't do more work than they
should - an inefficient implementation where every node scans the
entire data set will also pass this test.
When the test is run a second time with with_down_node=True, we verify
that TTL expiration works correctly even when one of the nodes is
brought down. This node's TTL scanner is responsible for scanning part
of the token range, so when this node is down, part of the data might
not get expired. At that point - other node(s) should take over
expiring data in that range - and this test verifies that this indeed
happens. Reproduces issue #9787 and SCYLLADB-777.
"""
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
alternator = get_alternator(servers[0].ip_addr)
if with_down_node:
# Bring down one of nodes. Everything we do below, like creating a
# table, reading and writing, should continue to work with one node
# down.
await manager.server_stop_gracefully(servers[2].server_id)
table = alternator.create_table(TableName=unique_table_name(),
BillingMode='PAY_PER_REQUEST',
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH' },
],
AttributeDefinitions=[
{'AttributeName': 'p', 'AttributeType': 'N' },
])
# Set the "expiration" column to mark item's expiration time
table.meta.client.update_time_to_live(TableName=table.name, TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
# Insert 50 rows, in different partitions, so the murmur3 hash maps them
# all over the token space so different nodes would be responsible for
# expiring them. All items are marked to expire 10 seconds in the past,
# so should all expire as soon as possible, during this test.
expiration = int(time.time()) - 10
with table.batch_writer() as batch:
for p in range(50):
batch.put_item({'p': p, 'expiration': expiration})
# Expect that after a short delay, all items in the table will have
# expired - so a scan should return no responses. This should happen
# even though one of the nodes is down and not doing its usual
# expiration-scanning work.
timeout = time.time() + 60
items = -1
while items != 0 and time.time() < timeout:
response = table.scan(ConsistentRead=True)
items = len(response['Items'])
# In theory (though probably not in practice in this test), a scan()
# can return zero items but have more pages - so we need to be more
# diligent and scan all pages to check it's completely empty.
while items == 0 and 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], ConsistentRead=True)
items += len(response['Items'])
if items == 0:
break
time.sleep(0.1)
assert items == 0
@pytest.mark.asyncio
async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
"""Test that if the "broadcast_rpc_address" of a node is set, the

View File

@@ -0,0 +1,99 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import pytest
from cassandra.protocol import ConfigurationException, InvalidRequest
from test.pylib.async_cql import _wrap_future
from test.pylib.manager_client import ManagerClient
from test.pylib.util import unique_name
logger = logging.getLogger(__name__)
async def create_ks_and_assert_warning(cql, query, ks_name, key_warn_msg_words):
# We have to use `Session::execute_async` here to be able to obtain `warnings`.
ret = cql.execute_async(query)
await _wrap_future(ret)
found = False
if len(key_warn_msg_words) > 0:
assert len(ret.warnings) >= 1, "Expected RF guardrail warning"
for warning in ret.warnings:
found = found or all(word in warning.lower() for word in key_warn_msg_words)
assert found, "Didn't match all required keywords"
await cql.run_async(f"USE {ks_name}")
async def assert_creating_ks_fails(cql, query, ks_name):
with pytest.raises(ConfigurationException):
await cql.run_async(query)
with pytest.raises(InvalidRequest):
await cql.run_async(f"USE {ks_name}")
@pytest.mark.asyncio
async def test_default_rf(manager: ManagerClient):
"""
As of now, the only RF guardrail enabled is a soft limit checking that RF >= 3. Not complying to this soft limit
results in a CQL query being executed, but with a warning. Also, whatever the guardrails' values, RF = 0 is always OK.
"""
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cfg = {"rf_rack_valid_keyspaces": False}
await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": "r1"})
await manager.server_add(config=cfg, property_file={"dc": "dc2", "rack": "r1"})
await manager.server_add(config=cfg, property_file={"dc": "dc3", "rack": "r1"})
cql = manager.get_cql()
ks_name = unique_name()
rf = {"dc1": 2, "dc2": 3, "dc3": 0}
options = ", ".join([f"'{dc}':{rf_val}" for dc, rf_val in rf.items()])
query = f"CREATE KEYSPACE {ks_name} WITH REPLICATION={{'class':'NetworkTopologyStrategy', {options}}}"
await create_ks_and_assert_warning(cql, query, ks_name, ["warn", "min", "replication", "factor", "3", "dc1", "2"])
@pytest.mark.asyncio
async def test_all_rf_limits(manager: ManagerClient):
"""
There are 4 limits for RF: soft/hard min and soft/hard max limits. Breaking soft limits issues a warning,
breaking the hard limits prevents the query from being executed.
"""
MIN_FAIL_THRESHOLD = 2
MIN_WARN_THRESHOLD = 3
MAX_WARN_THRESHOLD = 4
MAX_FAIL_THRESHOLD = 5
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cfg = {
"rf_rack_valid_keyspaces": False,
"minimum_replication_factor_fail_threshold": MIN_FAIL_THRESHOLD,
"minimum_replication_factor_warn_threshold": MIN_WARN_THRESHOLD,
"maximum_replication_factor_warn_threshold": MAX_WARN_THRESHOLD,
"maximum_replication_factor_fail_threshold": MAX_FAIL_THRESHOLD,
}
dc = "dc1"
await manager.server_add(config=cfg, property_file={"dc": dc, "rack": "r1"})
cql = manager.get_cql()
for rf in range(MIN_FAIL_THRESHOLD - 1, MAX_FAIL_THRESHOLD + 1):
ks_name = unique_name()
query = f"CREATE KEYSPACE {ks_name} WITH REPLICATION = {{'class': 'NetworkTopologyStrategy', '{dc}': {rf}}}"
if rf < MIN_FAIL_THRESHOLD or rf > MAX_FAIL_THRESHOLD:
await assert_creating_ks_fails(cql, query, ks_name)
elif rf < MIN_WARN_THRESHOLD:
await create_ks_and_assert_warning(cql, query, ks_name, ["warn", "min", "replication", "factor", str(MIN_WARN_THRESHOLD), dc, str(rf)])
elif rf > MAX_WARN_THRESHOLD:
await create_ks_and_assert_warning(cql, query, ks_name, ["warn", "max", "replication", "factor", str(MAX_WARN_THRESHOLD), dc, str(rf)])
else:
await create_ks_and_assert_warning(cql, query, ks_name, [])

View File

@@ -54,7 +54,7 @@ async def test_autoretrain_dict(manager: ManagerClient):
uncompressed_size = blob_size * n_blobs * rf
# Start with compressor without a dictionary
cfg = { "sstable_compression_user_table_options": "ZstdCompressor" }
cfg = { "sstable_compression_user_table_options": { 'sstable_compression': 'ZstdCompressor' } }
logger.info("Bootstrapping cluster")
servers = await manager.servers_add(2, cmdline=[

View File

@@ -34,7 +34,7 @@ def scylla_path(build_mode):
@pytest.mark.parametrize("mode", ["read"])
async def test_perf_simple_query(scylla_path, mode, tmp_path):
args = [scylla_path, "perf-simple-query", "--duration", "1", "--partitions", "1000"]
args = [scylla_path, "perf-simple-query", "--duration", "1", "--partitions", "1000", "--stop-on-error", "false"]
await run(args)
@@ -54,7 +54,8 @@ async def test_perf_cql_raw(scylla_path, tmp_path, workload):
"--smp", "2",
"--workdir", str(tmp_path),
"--developer-mode", "1",
"--partitions", "1000"
"--partitions", "1000",
"--continue-after-error", "true"
]
try:
await run(cmd)
@@ -81,7 +82,8 @@ async def test_perf_alternator(scylla_path, tmp_path, workload):
"--smp", "2",
"--workdir", str(tmp_path),
"--developer-mode", "1",
"--partitions", "1000"
"--partitions", "1000",
"--continue-after-error", "true"
]
try:
await run(cmd)
@@ -101,7 +103,8 @@ async def test_perf_cql_raw_remote(scylla_path, tmp_path, workload, manager):
"--duration", "1",
"--remote-host", host,
"--smp", "1",
"--partitions", "1000"
"--partitions", "1000",
"--continue-after-error", "true"
]
await run(client_cmd)
@@ -121,6 +124,7 @@ async def test_perf_alternator_remote(scylla_path, tmp_path, workload, manager):
"--duration", "1",
"--remote-host", host,
"--smp", "1",
"--partitions", "1000"
"--partitions", "1000",
"--continue-after-error", "true"
]
await run(client_cmd)

View File

@@ -5,7 +5,7 @@
import pytest
from .util import new_test_table, is_scylla
from cassandra.protocol import InvalidRequest
from math import sqrt, isclose
from math import sqrt, isclose, nan, isnan
###############################################################################
@@ -46,7 +46,7 @@ def compute_similarity(similarity_function, v1, v2):
norm_v = sqrt(sum(x**2 for x in v1))
norm_q = sqrt(sum(x**2 for x in v2))
if norm_v == 0 or norm_q == 0:
raise ValueError("Cosine similarity is not defined for zero vectors")
return nan
cosine = dot / (norm_v * norm_q)
return (1 + cosine) / 2
elif similarity_function == "euclidean":
@@ -247,13 +247,22 @@ def test_vector_similarity_with_zero_vectors(cql, table1, similarity_function):
def test_vector_similarity_cosine_with_zero_vectors(cql, table1):
zero = [0.0, 0.0, 0.0]
queries = [
f"SELECT pk, v1, similarity_cosine(v1, {zero}) FROM {table1}",
f"SELECT pk, v1, similarity_cosine({zero}, v1) FROM {table1}",
f"SELECT pk, v1, similarity_cosine({zero}, {zero}) FROM {table1}",
]
expected_error = "Function system.similarity_cosine doesn't support all-zero vectors"
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(f"SELECT pk, v1, similarity_cosine(v1, {zero}) FROM {table1}")
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(f"SELECT pk, v1, similarity_cosine({zero}, v1) FROM {table1}")
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(f"SELECT pk, v1, similarity_cosine({zero}, {zero}) FROM {table1}")
for query in queries:
# Scylla returns NaN for cosine similarity with zero vectors, while Cassandra throws an error.
# We allow for this difference as we want the rescoring
if is_scylla(cql):
result = cql.execute(query)
for row in result:
assert isnan(row[2])
else:
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(query)
@pytest.mark.parametrize("similarity_function", similarity_functions)

View File

@@ -93,26 +93,27 @@ class ResourceGather(ABC):
env.update(os.environ)
else:
env = os.environ.copy()
p = subprocess.Popen(
args=args,
bufsize=1,
stdout=output_file.open(mode="w", encoding="utf-8"),
stderr=subprocess.STDOUT,
preexec_fn=self.put_process_to_cgroup,
close_fds=True,
cwd=cwd,
env=env,
text=True,
)
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise
with output_file.open(mode="w", encoding="utf-8") as output_handle:
p = subprocess.Popen(
args=args,
bufsize=1,
stdout=output_handle,
stderr=subprocess.STDOUT,
preexec_fn=self.put_process_to_cgroup,
close_fds=True,
cwd=cwd,
env=env,
text=True,
)
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise
return p
def make_cgroup(self) -> None:

View File

@@ -51,30 +51,3 @@ def get_coroutine():
name = resolve(vtable_addr)
if name and name.strip() == target:
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
def coroutine_debug_config(tmpdir):
"""
Check if scylla_find agrees with find_vptrs, for debugging.
Execute GDB commands for coroutine debugging with detailed output.
This test fails sometimes, but rarely and unreliably.
We want to get a coredump from it the next time it fails.
Sending a SIGSEGV should induce that.
https://github.com/scylladb/scylladb/issues/22501
"""
target = 'service::topology_coordinator::run() [clone .resume]'
target_addr = int(gdb.parse_and_eval(f"&'{target}'"))
find_command = f"scylla find -a 0x{target_addr:x}"
gdb.write(f"Didn't find {target} (0x{target_addr:x}). Running '{find_command}'\n")
mem_range = get_seastar_memory_start_and_size()
gdb.execute(find_command)
gdb.write(f"Memory range: 0x{mem_range[0]:x} 0x{mem_range[1]:x}\n")
gdb.write("Found coroutines:\n")
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and '.resume' in name.strip():
gdb.write(f"{name}\n")
core_filename = f"{tmpdir}/../scylla_gdb_coro_task-{uuid.uuid4()}.core"
gdb.execute(f"gcore {core_filename}")
raise gdb.error(f"No coroutine frames found with expected name. Dumped Scylla core to {core_filename}")

View File

@@ -47,24 +47,13 @@ def coroutine_task(gdb_cmd, scylla_server):
Finds a coroutine task, similar to the `task` fixture.
This fixture executes the `coroutine_config` script in GDB to locate a
specific coroutine task. If the task is not found, the `coroutine_debug_config`
debugging script is called which checks if scylla_find agrees with find_vptrs.
This debugging script then forces a coredump to capture additional
diagnostic information before the test is marked as failed.
Coredump is saved to `testlog/release/{scylla}`.
specific coroutine task.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_coroutine()").stdout
match = re.search(r"coroutine_config=\s*(.*)", result)
if not match:
result = execute_gdb_command(
gdb_cmd,
full_command=f"python coroutine_debug_config('{scylla_server.workdir}')",
)
pytest.fail(
f"Failed to find coroutine task. Debugging logs have been collected\n"
f"Debugging code result: {result}\n"
)
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
return match.group(1).strip()

View File

@@ -1,10 +0,0 @@
type: Topology
pool_size: 4
cluster:
initial_size: 0
extra_scylla_config_options:
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
enable_user_defined_functions: False
rf_rack_valid_keyspaces: True
tablets_mode_for_new_keyspaces: enabled

View File

@@ -460,3 +460,38 @@ SEASTAR_TEST_CASE(no_nulls_in_rescored_results, *boost::unit_test::expected_fail
}));
}
}
// Reproducer for SCYLLADB-456
SEASTAR_TEST_CASE(rescoring_with_zerovector_query) {
for (const auto& params : test_data) {
auto server = co_await make_vs_mock_server();
co_await do_with_cql_env(
[&](cql_test_env& env) -> future<> {
configure(env.local_qp().vector_store_client()).with_dns({{"server.node", std::vector<std::string>{server->host()}}});
env.local_qp().vector_store_client().start_background_tasks();
co_await create_index_and_insert_data(env, params);
server->next_ann_response({http::reply::status_type::ok, R"({
"primary_keys": { "id": [4, 3, 2, 1] },
"distances": [0, 0, 0, 0]
})"});
// For cosine similarity the ANN vector query would fail as `similarity_cosine` function did not support zero vectors.
try {
auto msg = co_await env.execute_cql("SELECT id FROM ks.cf ORDER BY embedding ANN OF [0, 0] LIMIT 3;");
auto rms = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
BOOST_REQUIRE(rms);
const auto& rows = rms->rs().result_set().rows();
BOOST_REQUIRE_EQUAL(rows.size(), 3);
} catch (const std::exception& e) {
BOOST_FAIL(e.what());
}
},
make_config(format("http://server.node:{}", server->port())))
.finally(seastar::coroutine::lambda([&] -> future<> {
co_await server->stop();
}));
}
}

View File

@@ -66,6 +66,7 @@ target_sources(utils
azure/identity/default_credentials.cc
gcp/gcp_credentials.cc
gcp/object_storage.cc
gcp/object_storage_retry_strategy.cc
)
target_include_directories(utils
PUBLIC

View File

@@ -9,6 +9,7 @@
#include "object_storage.hh"
#include "gcp_credentials.hh"
#include "object_storage_retry_strategy.hh"
#include <algorithm>
#include <numeric>
@@ -20,12 +21,16 @@
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/units.hh>
#include <seastar/http/client.hh>
#include <seastar/util/short_streams.hh>
#include "utils/rest/client.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/error_injection.hh"
#include "utils/exceptions.hh"
#include "utils/http.hh"
#include "utils/http_client_error_processing.hh"
#include "utils/overloaded_functor.hh"
static logger gcp_storage("gcp_storage");
@@ -226,6 +231,7 @@ class utils::gcp::storage::client::impl {
seastar::semaphore& _limits;
seastar::http::experimental::client _client;
shared_ptr<seastar::tls::certificate_credentials> _certs;
future<> authorize(request_wrapper& req, const std::string& scope);
public:
impl(const utils::http::url_info&, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
impl(std::string_view endpoint, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
@@ -243,6 +249,13 @@ public:
future<> close();
};
future<> storage::client::impl::authorize(request_wrapper& req, const std::string& scope) {
if (_credentials) {
co_await _credentials->refresh(scope, &storage_scope_implies, _certs);
req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token));
}
}
utils::gcp::storage::client::impl::impl(const utils::http::url_info& url, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
: _endpoint(url.host)
, _credentials(std::move(c))
@@ -293,104 +306,87 @@ using namespace std::chrono_literals;
/**
* Performs a REST post/put/get with credential refresh/retry.
*/
future<>
future<>
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, handler_func_ex handler, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
static constexpr auto max_retries = 10;
rest::request_wrapper req(_endpoint);
req.target(path);
req.method(op);
exponential_backoff_retry exr(10ms, 10000ms);
bool do_backoff = false;
for (auto& [k,v] : headers) {
req.add_header(k, v);
}
for (int retry = 0; ; ++retry) {
if (std::exchange(do_backoff, false)) {
co_await (as ? exr.retry(*as) : exr.retry());
}
std::visit(overloaded_functor {
[&](const std::string& s) { req.content(content_type, s); },
[&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); }
}, body);
rest::request_wrapper req(_endpoint);
req.target(path);
req.method(op);
// GCP storage requires this even if content is empty
req.add_header("Content-Length", std::to_string(req.request().content_length));
if (_credentials) {
try {
try {
co_await _credentials->refresh(scope, &storage_scope_implies, _certs);
req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token));
} catch (httpd::unexpected_status_error& e) {
switch (e.status()) {
default:
if (reply::classify_status(e.status()) != reply::status_class::server_error) {
break;
}
[[fallthrough]];
case status_type::request_timeout:
case status_type::too_many_requests:
if (retry < max_retries) {
gcp_storage.debug("Got {}: {}", e.status(), std::current_exception());
// service unavailable etc -> retry
do_backoff = true;
continue;
}
break;
}
throw;
}
} catch (...) {
gcp_storage.error("Error refreshing credentials: {}", std::current_exception());
std::throw_with_nested(permission_error("Error refreshing credentials"));
}
}
for (auto& [k,v] : headers) {
req.add_header(k, v);
}
std::visit(overloaded_functor {
[&](const std::string& s) { req.content(content_type, s); },
[&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); }
}, body);
// GCP storage requires this even if content is empty
req.add_header("Content-Length", std::to_string(req.request().content_length));
gcp_storage.trace("Sending: {}", redacted_request_type {
req.request(),
bearer_filter()
});
gcp_storage.trace("Sending: {}", redacted_request_type {
req.request(),
bearer_filter()
});
try {
try {
co_await rest::simple_send(_client, req, [&handler](const seastar::http::reply& res, seastar::input_stream<char>& in) -> future<> {
gcp_storage.trace("Result: {}", res);
if (res._status == status_type::unauthorized) {
throw permission_error(int(res._status), co_await get_gcp_error_message(in));
} else if (res._status == status_type::request_timeout || res._status == status_type::too_many_requests || reply::classify_status(res._status) == reply::status_class::server_error) {
throw storage_error(int(res._status), co_await get_gcp_error_message(in));
}
co_await handler(res, in);
}, as);
break;
} catch (storage_error& e) {
gcp_storage.debug("{}: Got unexpected response: {}", _endpoint, e.what());
auto s = status_type(e.status());
switch (s) {
default:
if (reply::classify_status(s) != reply::status_class::server_error) {
break;
}
[[fallthrough]];
case status_type::request_timeout:
case status_type::too_many_requests:
do_backoff = true;
[[fallthrough]];
case status_type::unauthorized:
if (retry < max_retries) {
continue; // retry loop.
}
break;
}
throw;
co_await authorize(req, scope);
} catch (...) {
// network, whatnot. maybe add retries here as well, but should really
// be on seastar level
throw;
// just disregard the failure, we will retry below in the wrapped handler
}
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
auto _in = std::move(in);
auto status_class = reply::classify_status(rep._status);
/*
* Surprisingly Google Cloud Storage (GCS) commonly returns HTTP 308 during resumable uploads, including when you use PUT. This is expected behavior and
* not an error. The 308 tells the client to continue the upload at the same URL without changing the method or body, which is exactly how GCSs
* resumable upload protocol works.
*/
if (status_class != reply::status_class::informational && status_class != reply::status_class::success &&
rep._status != status_type::permanent_redirect) {
if (rep._status == status_type::unauthorized) {
gcp_storage.warn("Request to failed with status {}. Refreshing credentials.", rep._status);
co_await authorize(req, scope);
}
auto content = co_await util::read_entire_stream_contiguous(_in);
auto error_msg = get_gcp_error_message(std::string_view(content));
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
}
std::exception_ptr eptr;
try {
// TODO: rename the fault injection point to something more generic
if (utils::get_local_injector().enter("s3_client_fail_authorization")) {
throw httpd::unexpected_status_error(status_type::unauthorized);
}
co_await handler(rep, _in);
} catch (...) {
eptr = std::current_exception();
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
};
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);
co_return co_await rest::simple_send(_client, req, wrapped_handler, &retry_strategy, as);
} catch (...) {
try {
std::rethrow_exception(std::current_exception());
} catch (const httpd::unexpected_status_error& e) {
auto status = e.status();
if (reply::classify_status(status) == reply::status_class::redirection || status == reply::status_type::not_found) {
throw storage_io_error{ENOENT, format("GCP object doesn't exist ({})", status)};
}
if (status == reply::status_type::forbidden || status == reply::status_type::unauthorized) {
throw storage_io_error{EACCES, format("GCP access denied ({})", status)};
}
throw storage_io_error{EIO, format("GCP request failed with ({})", status)};
} catch (...) {
throw storage_io_error{EIO, format("GCP error ({})", std::current_exception())};
}
}
}
@@ -1003,21 +999,22 @@ future<> utils::gcp::storage::client::delete_object(std::string_view bucket_in,
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, seastar::http::internal::url_encode(object_name));
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_WRITE
, ""s
, ""s
, httpclient::method_type::DELETE
);
httpclient::result_type res;
try {
res = co_await _impl->send_with_retry(path, GCP_OBJECT_SCOPE_READ_WRITE, ""s, ""s, httpclient::method_type::DELETE);
} catch (const storage_io_error& ex) {
if (ex.code().value() == ENOENT) {
gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name);
co_return; // ok...?
}
std::rethrow_exception(std::current_exception());
}
switch (res.result()) {
case status_type::ok:
case status_type::no_content:
gcp_storage.debug("Deleted {}:{}", bucket, object_name);
co_return; // done and happy
case status_type::not_found:
gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name);
co_return; // ok...?
default:
throw failed_operation(fmt::format("Could not delete object {}:{}: {} ({})", bucket, object_name, res.result()
, get_gcp_error_message(res.body())

View File

@@ -0,0 +1,52 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "object_storage_retry_strategy.hh"
#include "utils/exceptions.hh"
#include "utils/http_client_error_processing.hh"
#include <seastar/core/sleep.hh>
#include <seastar/http/exception.hh>
static logger rs_logger("gcp_retry_strategy");
namespace utils::gcp::storage {
object_storage_retry_strategy::object_storage_retry_strategy(unsigned max_retries,
std::chrono::milliseconds base_sleep_time,
std::chrono::milliseconds max_sleep_time,
abort_source* as)
: _max_retries(max_retries), _exr(base_sleep_time, max_sleep_time), _as(as) {
}
future<bool> object_storage_retry_strategy::should_retry(std::exception_ptr error, unsigned attempted_retries) const {
if (attempted_retries >= _max_retries) {
rs_logger.warn("Retries exhausted. Retry# {}", attempted_retries);
co_return false;
}
auto retryable = from_exception_ptr(error);
if (retryable) {
rs_logger.debug("GCP client request failed. Reason: {}. Retry# {}", error, attempted_retries);
co_await (_as ? _exr.retry(*_as) : _exr.retry());
} else {
rs_logger.warn("GCP client encountered non-retryable error. Reason: {}. Retry# {}", error, attempted_retries);
}
co_return retryable;
}
bool object_storage_retry_strategy::from_exception_ptr(std::exception_ptr exception) {
return dispatch_exception<bool>(
std::move(exception),
[](std::exception_ptr, std::string&&) { return false; },
[](const seastar::httpd::unexpected_status_error& ex) {
return http::from_http_code(ex.status()) == http::retryable::yes || ex.status() == seastar::http::reply::status_type::unauthorized;
},
[](const std::system_error& ex) { return http::from_system_error(ex) == http::retryable::yes; });
}
} // namespace utils::gcp::storage

View File

@@ -0,0 +1,34 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "utils/exponential_backoff_retry.hh"
#include <seastar/http/retry_strategy.hh>
namespace utils::gcp::storage {
// GCP object storage retry strategy
// General guidelines https://docs.cloud.google.com/storage/docs/retry-strategy
class object_storage_retry_strategy : public seastar::http::experimental::retry_strategy {
protected:
unsigned _max_retries;
mutable exponential_backoff_retry _exr;
abort_source* _as;
public:
object_storage_retry_strategy(unsigned max_retries = 10,
std::chrono::milliseconds base_sleep_time = std::chrono::milliseconds(10),
std::chrono::milliseconds max_sleep_time = std::chrono::milliseconds(10000),
abort_source* as = nullptr);
seastar::future<bool> should_retry(std::exception_ptr error, unsigned attempted_retries) const override;
static bool from_exception_ptr(std::exception_ptr exception);
};
} // namespace utils::gcp::storage

View File

@@ -126,6 +126,10 @@ seastar::future<> rest::httpclient::send(const handler_func& f, seastar::abort_s
}
seastar::future<> rest::simple_send(seastar::http::experimental::client& client, seastar::http::request& req, const handler_func_ex& f, seastar::abort_source* as) {
co_await simple_send(client, req, f, nullptr, as);
}
seastar::future<> rest::simple_send(seastar::http::experimental::client& client, seastar::http::request& req, const handler_func_ex& f, const http::experimental::retry_strategy* strategy, seastar::abort_source* as) {
if (as) {
as->check();
}
@@ -139,6 +143,14 @@ seastar::future<> rest::simple_send(seastar::http::experimental::client& client,
req._headers[httpclient::CONTENT_TYPE_HEADER] = "application/x-www-form-urlencoded";
}
if (strategy) {
co_return co_await client.make_request(std::move(req), [&](const http::reply& rep, input_stream<char>&& in) -> future<> {
// ensure these are on our coroutine frame.
auto& resp_handler = f;
auto in_stream = std::move(in);
co_await resp_handler(rep, in_stream);
}, *strategy, std::nullopt, as);
}
co_await client.make_request(std::move(req), [&](const http::reply& rep, input_stream<char>&& in) -> future<> {
// ensure these are on our coroutine frame.
auto& resp_handler = f;

View File

@@ -114,6 +114,7 @@ private:
using handler_func_ex = std::function<future<>(const seastar::http::reply&, seastar::input_stream<char>&)>;
seastar::future<> simple_send(seastar::http::experimental::client&, seastar::http::request&, const handler_func_ex&, seastar::abort_source* = nullptr);
seastar::future<> simple_send(seastar::http::experimental::client&, seastar::http::request&, const handler_func_ex&, const http::experimental::retry_strategy* strategy, seastar::abort_source* = nullptr);
// Interface for redacting sensitive data from HTTP requests and responses before logging.
class http_log_filter {

View File

@@ -56,7 +56,7 @@ dns::dns(logging::logger& logger, std::vector<seastar::sstring> hosts, listener_
co_await coroutine::return_exception_ptr(std::move(err));
}
auto addr = co_await std::move(f);
co_return addr.addr_list;
co_return addr.addr_entries | std::views::transform(&net::hostent::address_entry::addr) | std::ranges::to<std::vector>();
})
, _hosts(std::move(hosts))
, _listener(std::move(listener))