Compare commits

...

41 Commits

Author SHA1 Message Date
Jenkins Promoter
bafd185087 Update pgo profiles - aarch64 2026-02-01 05:06:48 +02:00
Jenkins Promoter
07d1f8f48a Update pgo profiles - x86_64 2026-02-01 04:20:45 +02:00
Botond Dénes
0cf9f41649 Merge '[Backport 2026.1] docs: add documentation for automatic repair' from Scylladb[bot]
Explain what automatic repair is and how to configure it. While at it, improve the existing repair documentation a bit.

Fixes: SCYLLADB-130

This PR missed the 2026.1 branch date, so it needs backport to 2026.1, where the auto repair feature debuts.

- (cherry picked from commit a84b1b8b78)

- (cherry picked from commit 57b2cd2c16)

- (cherry picked from commit 1713d75c0d)

Parent PR: #28199

Closes scylladb/scylladb#28424

* github.com:scylladb/scylladb:
  docs: add feature page for automatic repair
  docs: inter-link incremental-repair and repair documents
  docs: incremental-repair: fix curl example
2026-01-30 16:01:03 +02:00
Botond Dénes
dc89e2ea37 Merge '[Backport 2026.1] test: test_alternator_proxy_protocol: fix race between node startup and test start' from Scylladb[bot]
test_alternator_proxy_protocol starts a node and connects via the alternator ports.
Starting a node, by default, waits until the CQL ports are up. This does not guarantee
that the alternator ports are up (they will be up very soon after this), so there is a short
window where a connection to the alternator ports will fail.

Fix by adding a ServerUpState=SERVING mode, which waits for the node to report
to its supervisor (systemd, which we are pretending to be) that its ports are open.
The test is then adjusted to request this new ServerUpState.

Fixes #28210
Fixes #28211

Flaky tests are only in master and branch-2026.1, so backporting there.

- (cherry picked from commit ebac810c4e)

- (cherry picked from commit 59f2a3ce72)

Parent PR: #28291

Closes scylladb/scylladb#28443

* github.com:scylladb/scylladb:
  test: test_alternator_proxy_protocol: wait for the node to report itself as serving
  test: cluster_manager: add ability to wait for supervisor STATUS=serving
2026-01-30 15:59:09 +02:00
Tomasz Grabiec
797f56cb45 Merge '[Backport 2026.1] Improve load balancer logging and other minor cleanups' from Scylladb[bot]
Contains various improvements to tablet load balancer. Batched together to save on the bill for CI.

Most notably:
 - Make plan summary more concise, and print info only about present elements.
 - Print rack name in addition to DC name when making a per-rack plan
 - Print "Not possible to achieve balance" only when this is the final plan with no active migrations
 - Print per-node stats when "Not possible to achieve balance" is printed
 - amortize metrics lookup cost
 - avoid spamming logs with per-node "Node {} does not have complete tablet stats, ignoring"

Backport to 2026.1: since the changes enhance debuggability and are relatively low risk

Fixes #28423
Fixes #28422

- (cherry picked from commit 32b336e062)

- (cherry picked from commit df32318f66)

- (cherry picked from commit f2b0146f0f)

- (cherry picked from commit 0d090aa47b)

- (cherry picked from commit 12fdd205d6)

- (cherry picked from commit 615b86e88b)

- (cherry picked from commit 7228bd1502)

- (cherry picked from commit 4a161bff2d)

- (cherry picked from commit ef0e9ad34a)

- (cherry picked from commit 9715965d0c)

- (cherry picked from commit 8e831a7b6d)

Parent PR: #28337

Closes scylladb/scylladb#28428

* github.com:scylladb/scylladb:
  tablets: tablet_allocator.cc: Convert tabs to spaces
  tablets: load_balancer: Warn about incomplete stats once for all offending nodes
  tablets: load_balancer: Improve node stats printout
  tablets: load_balancer: Warn about imbalance only when there are no more active migrations
  tablets: load_balancer: Extract print_node_stats()
  tablet: load_balancer: Use empty() instead of size() where applicable
  tablets: Fix redundancy in migration_plan::empty()
  tablets: Cache pointer to stats during plan-making
  tablets: load_balancer: Print rack in addition to DC when giving context
  tablets: load_balancer: Make plan summary concise
  tablets: load_balancer: Move "tablet_migration_bypass" injection point to make_plan()
2026-01-30 14:08:34 +01:00
Pawel Pery
be1d418bc0 vector_search: allow full secondary indexes syntax while creating the vector index
Vector Search feature needs to support creating vector indexes with additional
filtering column. There will be two types of indexes: global which indexes
vectors per table, and local which indexes vectors per partition key. The new
syntaxes are based on ScyllaDB's Global Secondary Index and Local Secondary
Index. Vector indexes don't use secondary indexes functionalities in any way -
all indexing, filtering and processing data will be done on Vector Store side.

This patch allows creating vector indexes using this CQL syntax:

```
CREATE TABLE IF NOT EXISTS cycling.comments_vs (
  commenter text,
  comment text,
  comment_vector VECTOR <FLOAT, 5>,
  created_at timestamp,
  discussion_board_id int,
  country text,
  lang text,
  PRIMARY KEY ((commenter, discussion_board_id), created_at)
);

CREATE CUSTOM INDEX IF NOT EXISTS global_ann_index
  ON cycling.comments_vs(comment_vector, country, lang) USING 'vector_index'
  WITH OPTIONS = { 'similarity_function': 'DOT_PRODUCT' };

CREATE CUSTOM INDEX IF NOT EXISTS local_ann_index
  ON cycling.comments_vs((commenter, discussion_board_id), comment_vector, country, lang)
  USING 'vector_index'
  WITH OPTIONS = { 'similarity_function': 'DOT_PRODUCT' };
```

Currently, if we run these queries to create indexes we will receive such errors:

```
InvalidRequest: Error from server: code=2200 [Invalid query] message="Vector index can only be created on a single column"
InvalidRequest: Error from server: code=2200 [Invalid query] message="Local index definition must contain full partition key only. Redundant column: XYZ"
```

This commit refactors `vector_index::check_target` to correctly validate
columns building the index. Vector-store currently support filtering by native
types, so the type of columns is checked. The first column from the list must
be a vector (to build index based on these vectors), so it is also checked.

Allowed types for columns are native types without counter (it is not possible
to create a table with counter and vector) and without duration (it is not
possible to correctly compare durations, this type is even not allowed in
secondary indexes).

This commits adds cqlpy test to check errors while creating indexes.

Fixes: SCYLLADB-298

This needs to be backported to version 2026.1 as this is a fix for filtering support.

Closes scylladb/scylladb#28366

(cherry picked from commit f49c9e896a)

Closes scylladb/scylladb#28448
2026-01-30 11:25:01 +01:00
Patryk Jędrzejczak
46923f7358 Merge '[Backport 2026.1] Introduce TTL and retries to address resolution' from Scylladb[bot]
In production environments, we observed cases where the S3 client would repeatedly fail to connect due to DNS entries becoming stale. Because the existing logic only attempted the first resolved address and lacked a way to refresh DNS state, the client could get stuck in a failure loop.

Introduce RR TTL and connection failure retry to
- re-resolve the RR in a timely manner
- forcefully reset and re-resolve addresses
- add a special case when the TTL is 0 and the record must be resolved for every request

Fixes: CUSTOMER-96
Fixes: CUSTOMER-139

Should be backported to 2025.3/4 and 2026.1 since we already encountered it in the production clusters for 2025.3

- (cherry picked from commit bd9d5ad75b)

- (cherry picked from commit 359d0b7a3e)

- (cherry picked from commit ce0c7b5896)

- (cherry picked from commit 5b3e513cba)

- (cherry picked from commit 66a33619da)

- (cherry picked from commit 6eb7dba352)

- (cherry picked from commit a05a4593a6)

- (cherry picked from commit 3a31380b2c)

- (cherry picked from commit 912c48a806)

Parent PR: #27891

Closes scylladb/scylladb#28405

* https://github.com/scylladb/scylladb:
  connection_factory: includes cleanup
  dns_connection_factory: refine the move constructor
  connection_factory: retry on failure
  connection_factory: introduce TTL timer
  connection_factory: get rid of shared_future in dns_connection_factory
  connection_factory: extract connection logic into a member
  connection_factory: remove unnecessary `else`
  connection_factory: use all resolved DNS addresses
  s3_test: remove client double-close
2026-01-30 11:10:48 +01:00
Avi Kivity
4032e95715 test: test_alternator_proxy_protocol: wait for the node to report itself as serving
Use the new ServerUpState=SERVING mechanism to wait to the alternator
ports to be up, rather than relying on the default waiting for CQL,
which happens earlier and therefore opens a window where a connection to
the alternator ports will fail.

(cherry picked from commit 59f2a3ce72)
2026-01-29 22:46:11 +02:00
Avi Kivity
eab10c00b1 test: cluster_manager: add ability to wait for supervisor STATUS=serving
When running under systemd, ScyllaDB sends a STATUS=serving message
to systemd. Co-opt this mechanism by setting up NOTIFY_SOCKET, thus
making the cluster manager pretend it is systemd. Users of the cluster
manager can now wait for the node to report itself up, rather than
having to parse log files or retry connections.

(cherry picked from commit ebac810c4e)
2026-01-29 19:48:53 +00:00
Patryk Jędrzejczak
091c3b4e22 test: test_gossiper_orphan_remover: get host ID of the bootstrapping node before it crashes
The test is currently flaky. It tries to get the host ID of the bootstrapping
node via the REST API after the node crashes. This can obviously fail. The
test usually doesn't fail, though, as it relies on the host ID being saved
in `ScyllaServer._host_id` at this point by `ScyllaServer.try_get_host_id()`
repeatedly called in `ScyllaServer.start()`. However, with a very fast crash
and unlucky timings, no such call may succeed.

We deflake the test by getting the host ID before the crash. Note that at this
point, the bootstrapping node must be serving the REST API requests because
`await log.wait_for("finished do_send_ack2_msg")` above guarantees that the
node has started the gossip shadow round, which happens after starting the REST
API.

Fixes #28385

Closes scylladb/scylladb#28388

(cherry picked from commit a2c1569e04)

Closes scylladb/scylladb#28417
2026-01-29 11:25:10 +01:00
Tomasz Grabiec
19eadafdef tablets: tablet_allocator.cc: Convert tabs to spaces
(cherry picked from commit 8e831a7b6d)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
358fc15893 tablets: load_balancer: Warn about incomplete stats once for all offending nodes
To reduce log spamming when all nodes are missing stats.

(cherry picked from commit 9715965d0c)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
32124d209e tablets: load_balancer: Improve node stats printout
Make it more concise:
- reduce precision for load to 6 fractional digits
- reduce precision for tablets/shard to 3 fractional digits
- print "dc1/rack1" instead of "dc=dc1 rack=rack1", like in other places
- print "rd=0 wr=0" instead of "stream_read=0 stream_write=0"

Example:

 load_balancer - Node 477569c0-f937-11f0-ab6f-541ce4a00601: dc10/rack10c load=170.666667 tablets=1 shards=12 tablets/shard=0.083 state=normal cap=64424509440 stream: rd=0 wr=0
 load_balancer - Node 47678711-f937-11f0-ab6f-541ce4a00601: dc10/rack10c load=0.000000 tablets=0 shards=12 tablets/shard=0.000 state=normal cap=64424509440 stream: rd=0 wr=0
 load_balancer - Node 47832560-f937-11f0-ab6f-541ce4a00601: dc10/rack10c load=0.000000 tablets=0 shards=12 tablets/shard=0.000 state=normal cap=64424509440 stream: rd=0 wr=0

(cherry picked from commit ef0e9ad34a)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
c7f4bda459 tablets: load_balancer: Warn about imbalance only when there are no more active migrations
Otherwise, it may be only a temporary situation due to lack of
candidates, and may be unnecessarily alerting.

Also, print node stats to allow assessing how bad the situation is on
the spot. Those stats can hint to a cause of imbalance, if balancing
is per-DC and racks have different capacity.

(cherry picked from commit 4a161bff2d)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
568af3cd8d tablets: load_balancer: Extract print_node_stats()
(cherry picked from commit 7228bd1502)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
bd694dd1a1 tablet: load_balancer: Use empty() instead of size() where applicable
(cherry picked from commit 615b86e88b)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
9672e0171f tablets: Fix redundancy in migration_plan::empty()
(cherry picked from commit 12fdd205d6)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
8cec41acf2 tablets: Cache pointer to stats during plan-making
Saves on lookup cost, esp. for candidate evaluation. This showed up in
perf profile in the past.

Also, lays the ground for splitting stats per rack.

(cherry picked from commit 0d090aa47b)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
d207de0d76 tablets: load_balancer: Print rack in addition to DC when giving context
Load-balancing can be now per-rack instead of per-DC. So just printing
"in DC" is confusing. If we're balancing a rack, we should print which
rack is that.

(cherry picked from commit f2b0146f0f)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
edde4e878e tablets: load_balancer: Make plan summary concise
Before:

  load_balancer - Prepared 1 migration plans, out of which there were 1 tablet migration(s) and 0 resize decision(s) and 0 tablet repair(s) and 0 rack-list colocation(s)

After:

  load_balancer - Prepared plan: migrations: 1

We print only stats about elements which are present.

(cherry picked from commit df32318f66)
2026-01-29 09:06:49 +00:00
Tomasz Grabiec
be1c674f1a tablets: load_balancer: Move "tablet_migration_bypass" injection point to make_plan()
Just a cleanup. After this, we don't have a new scope in the outmost
make_plan() just for injection handling.

(cherry picked from commit 32b336e062)
2026-01-29 09:06:49 +00:00
Botond Dénes
a7cff37024 docs: add feature page for automatic repair
Explain what the feature is and how to confiture it.
Inter-link all the repair related pages, so one can discover all about
repair, regardless of which page they land on.

(cherry picked from commit 1713d75c0d)
2026-01-29 00:25:21 +00:00
Botond Dénes
9431bc5628 docs: inter-link incremental-repair and repair documents
The user can now discover the general explanatio of repair when reading
about incremental repair, useful if they don't know what repair is.
The user can now discover incremental repair while reading the generic
repair procedure document.

(cherry picked from commit 57b2cd2c16)
2026-01-29 00:25:21 +00:00
Botond Dénes
14db8375ac docs: incremental-repair: fix curl example
Currently it is regular text, make it a code block so it is easier to
read and copy+paste.

(cherry picked from commit a84b1b8b78)
2026-01-29 00:25:21 +00:00
Ernest Zaslavsky
614020b5d5 aws_error: handle all restartable nested exception types
Previously we only inspected std::system_error inside
std::nested_exception to support a specific TLS-related failure
mode. However, nested exceptions may contain any type, including
other restartable (retryable) errors. This change unwraps one
nested exception per iteration and re-applies all known handlers
until a match is found or the chain is exhausted.

Closes scylladb/scylladb#28240

(cherry picked from commit cb2aa85cf5)

Closes scylladb/scylladb#28345
2026-01-28 14:58:28 +02:00
Anna Stuchlik
e091afb400 doc: add the version name to the Install pages
This is a follow-up to https://github.com/scylladb/scylladb/pull/28022
It adds the version name to more install pages.

Closes scylladb/scylladb#28289

(cherry picked from commit c25b770342)

Closes scylladb/scylladb#28362
2026-01-28 12:52:23 +02:00
Ernest Zaslavsky
edc46fe6a1 connection_factory: includes cleanup
(cherry picked from commit 912c48a806)
2026-01-27 22:43:08 +00:00
Ernest Zaslavsky
f8b9b767c2 dns_connection_factory: refine the move constructor
Clean up the awkward move constructor that was declared in the header
but defaulted in a separate compilation unit, improving clarity and
consistency.

(cherry picked from commit 3a31380b2c)
2026-01-27 22:43:08 +00:00
Ernest Zaslavsky
23d038b385 connection_factory: retry on failure
If connecting to a provided address throws, renew the address list and
retry once (and only once) before giving up.

(cherry picked from commit a05a4593a6)
2026-01-27 22:43:08 +00:00
Ernest Zaslavsky
3e2d1384bf connection_factory: introduce TTL timer
Add a TTL-based timer to connection_factory to automatically refresh
resolved host name addresses when they expire.

(cherry picked from commit 6eb7dba352)
2026-01-27 22:43:08 +00:00
Ernest Zaslavsky
bd7481e30c connection_factory: get rid of shared_future in dns_connection_factory
Move state management from dns_connection_factory into state class
itself to encapsulate its internal state and stop managing it from the
`dns_connection_factory`

(cherry picked from commit 66a33619da)
2026-01-27 22:43:08 +00:00
Ernest Zaslavsky
16d7b65754 connection_factory: extract connection logic into a member
extract connection logic into a private member function to make it reusable

(cherry picked from commit 5b3e513cba)
2026-01-27 22:43:08 +00:00
Ernest Zaslavsky
e30c01eae6 connection_factory: remove unnecessary else
(cherry picked from commit ce0c7b5896)
2026-01-27 22:43:08 +00:00
Ernest Zaslavsky
d0f3725887 connection_factory: use all resolved DNS addresses
Improve dns_connection_factory to iterate over all resolved
addresses instead of using only the first one.

(cherry picked from commit 359d0b7a3e)
2026-01-27 22:43:07 +00:00
Ernest Zaslavsky
c12168b7ef s3_test: remove client double-close
`test_chunked_download_data_source_with_delays` was calling `close()` on a client twice, remove the unnecessary call

(cherry picked from commit bd9d5ad75b)
2026-01-27 22:43:07 +00:00
Yaron Kaikov
76c0162060 .github/workflows/backport-pr-fixes-validation: support Atlassian URL format in backport PR fixes validation
Add support for matching full Atlassian JIRA URLs in the format
https://scylladb.atlassian.net/browse/SCYLLADB-400 in addition to
the bare JIRA key format (SCYLLADB-400).

This makes the validation more flexible by accepting both formats
that developers commonly use when referencing JIRA issues.

Fixes: https://github.com/scylladb/scylladb/issues/28373

Closes scylladb/scylladb#28374

(cherry picked from commit 3f10f44232)

Closes scylladb/scylladb#28394
2026-01-27 16:05:06 +02:00
Avi Kivity
c9620d9573 test/cqlpy: restore LWT tests marked XFAIL for tablets
Commit 0156e97560 ("storage_proxy: cas: reject for
tablets-enabled tables") marked a bunch of LWT tests as
XFAIL with tablets enabled, pending resolution of #18066.
But since that event is now in the past, we undo the XFAIL
markings (or in some cases, use an any-keyspace fixture
instead of a vnodes-only fixture).

Ref #18066.

Closes scylladb/scylladb#28336

(cherry picked from commit ec70cea2a1)

Closes scylladb/scylladb#28365
2026-01-27 12:20:18 +02:00
Anna Stuchlik
91cf77d016 doc: update the GPG keys
Update the keys in the installation instructions (linux packages).

Fixes https://github.com/scylladb/scylladb/issues/28330

Closes scylladb/scylladb#28357

(cherry picked from commit edc291961b)

Closes scylladb/scylladb#28370
2026-01-27 11:20:11 +02:00
Anna Stuchlik
2c2f0693ab doc: remove the troubleshooting section on upgrades from OSS
This commit removes a document originally created to troubleshoot
upgrades from Open Source to Enterprise.

Since we no longer support Open Source, this document is now redundant.

Fixes https://github.com/scylladb/scylladb/issues/28246

Closes scylladb/scylladb#28248

(cherry picked from commit 84281f900f)

Closes scylladb/scylladb#28360
2026-01-26 19:17:28 +02:00
Jenkins Promoter
2c73d0e6b5 Update ScyllaDB version to: 2026.1.0-rc1 2026-01-26 11:06:45 +02:00
Yaron Kaikov
f94296e0ae Update ScyllaDB version to: 2026.1.0-rc0 2026-01-25 11:07:17 +02:00
42 changed files with 705 additions and 362 deletions

View File

@@ -18,7 +18,7 @@ jobs:
// Regular expression pattern to check for "Fixes" prefix
// Adjusted to dynamically insert the repository full name
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|(?:https://scylladb\\.atlassian\\.net/browse/)?([A-Z]+-\\d+))`;
const regex = new RegExp(pattern);
if (!regex.test(body)) {

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=2026.1.0-dev
VERSION=2026.1.0-rc1
if test -f version
then

View File

@@ -5,6 +5,10 @@
/stable/kb/perftune-modes-sync.html: /stable/kb/index.html
# Remove the troubleshooting page relevant for Open Source only
/stable/troubleshooting/missing-dotmount-files.html: /troubleshooting/index.html
# Move the diver information to another project
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html

View File

@@ -281,7 +281,8 @@ For example::
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
or columns provided in a definition of the index.
For example::

View File

@@ -140,17 +140,83 @@ Vector Index :label-note:`ScyllaDB Cloud`
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
similarity search on vector data.
similarity search on vector data. Vector indexes can be a global index for indexing vectors per table or a local
index for indexing vectors per partition.
The vector index is the only custom type index supported in ScyllaDB. It is created using
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. Example:
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. It is also possible to
add additional columns to the index for filtering the search results. The partition column
specified in the global vector index definition must be the vector column, and any subsequent
columns are treated as filtering columns. The local vector index requires that the partition key
of the base table is also the partition key of the index and the vector column is the first one
from the following columns.
Example of a simple index:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed to enable similarity search using
a global vector index. Additional filtering can be performed on the primary key
columns of the base table.
Example of a global vector index with additional filtering:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding, category, info)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed to enable similarity search using
a global index. Additional columns are added for filtering the search results.
The filtering is possible on ``category``, ``info`` and all primary key columns
of the base table.
Example of a local vector index:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings ((id, created_at), embedding, category, info)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed for similarity search (a local
index) and additional columns are added for filtering the search results. The
filtering is possible on ``category``, ``info`` and all primary key columns of
the base table. The columns ``id`` and ``created_at`` must be the partition key
of the base table.
Vector indexes support additional filtering columns of native data types
(excluding counter and duration). The indexed column itself must be a vector
column, while the extra columns can be used to filter search results.
The supported types are:
* ``ascii``
* ``bigint``
* ``blob``
* ``boolean``
* ``date``
* ``decimal``
* ``double``
* ``float``
* ``inet``
* ``int``
* ``smallint``
* ``text``
* ``varchar``
* ``time``
* ``timestamp``
* ``timeuuid``
* ``tinyint``
* ``uuid``
* ``varint``
The following options are supported for vector indexes. All of them are optional.
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+

View File

@@ -0,0 +1,23 @@
.. _automatic-repair:
Automatic Repair
================
Traditionally, launching `repairs </operating-scylla/procedures/maintenance/repair>`_ in a ScyllaDB cluster is left to an external process, typically done via `Scylla Manager <https://manager.docs.scylladb.com/stable/repair/index.html>`_.
Automatic repair offers built-in scheduling in ScyllaDB itself. If the time since the last repair is greater than the configured repair interval, ScyllaDB will start a repair for the tablet `tablet </architecture/tablets>`_ automatically.
Repairs are spread over time and among nodes and shards, to avoid load spikes or any adverse effects on user workloads.
To enable automatic repair, add this to the configuration (``scylla.yaml``):
.. code-block:: yaml
auto_repair_enabled_default: true
auto_repair_threshold_default_in_seconds: 86400
This will enable automatic repair for all tables with a repair period of 1 day. This configuration has to be set on each node, to an identical value.
More featureful configuration methods will be implemented in the future.
To disable, set ``auto_repair_enabled_default: false``.
Automatic repair relies on `Incremental Repair </features/incremental-repair>`_ and as such it only works with `tablet </architecture/tablets>`_ tables.

View File

@@ -3,7 +3,7 @@
Incremental Repair
==================
ScyllaDB's standard repair process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency.
ScyllaDB's standard `repair </operating-scylla/procedures/maintenance/repair>`_ process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency.
The core idea of incremental repair is to repair only the data that has been written or changed since the last repair was run. It intelligently skips data that has already been verified, dramatically reducing the time, I/O, and CPU resources required for the repair operation.
@@ -37,7 +37,12 @@ The available modes are:
* ``disabled``: Completely disables the incremental repair logic for the current operation. The repair behaves like a classic, non-incremental repair, and it does not read or update any incremental repair status markers.
The incremental_mode parameter can be specified using nodetool cluster repair, e.g., nodetool cluster repair --incremental-mode incremental. It can also be specified with the REST API, e.g., curl -X POST "http://127.0.0.1:10000/storage_service/tablets/repair?ks=ks1&table=tb1&tokens=all&incremental_mode=incremental"
The incremental_mode parameter can be specified using nodetool cluster repair, e.g., nodetool cluster repair --incremental-mode incremental.
It can also be specified with the REST API, e.g.:
.. code::
curl -X POST "http://127.0.0.1:10000/storage_service/tablets/repair?ks=ks1&table=tb1&tokens=all&incremental_mode=incremental"
Benefits of Incremental Repair
------------------------------
@@ -46,6 +51,8 @@ Benefits of Incremental Repair
* **Reduced Resource Usage:** Consumes significantly less CPU, I/O, and network bandwidth compared to a full repair.
* **More Frequent Repairs:** The efficiency of incremental repair allows you to run it more frequently, ensuring a higher level of data consistency across your cluster at all times.
Tables using Incremental Repair can schedule repairs in ScyllaDB itself, with `Automatic Repair </features/automatic-repair>`_.
Notes
-----

View File

@@ -17,6 +17,7 @@ This document highlights ScyllaDB's key data modeling features.
Workload Prioritization </features/workload-prioritization>
Backup and Restore </features/backup-and-restore>
Incremental Repair </features/incremental-repair/>
Automatic Repair </features/automatic-repair/>
Vector Search </features/vector-search/>
.. panel-box::
@@ -44,5 +45,7 @@ This document highlights ScyllaDB's key data modeling features.
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
efficient and lightweight approach to maintaining data consistency by
repairing only the data that has changed since the last repair.
* :doc:`Automatic Repair </features/automatic-repair/>` schedules and runs repairs
directly in ScyllaDB, without external schedulers.
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
similarity-based queries on vector embeddings.

View File

@@ -24,9 +24,9 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:id: "getting-started"
:class: my-panel
* :doc:`Launch ScyllaDB on AWS </getting-started/install-scylla/launch-on-aws>`
* :doc:`Launch ScyllaDB on GCP </getting-started/install-scylla/launch-on-gcp>`
* :doc:`Launch ScyllaDB on Azure </getting-started/install-scylla/launch-on-azure>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on AWS </getting-started/install-scylla/launch-on-aws>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on GCP </getting-started/install-scylla/launch-on-gcp>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on Azure </getting-started/install-scylla/launch-on-azure>`
.. panel-box::
@@ -35,7 +35,7 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:class: my-panel
* :doc:`Install ScyllaDB with Web Installer (recommended) </getting-started/installation-common/scylla-web-installer>`
* :doc:`Install ScyllaDB Linux Packages </getting-started/install-scylla/install-on-linux>`
* :doc:`Install ScyllaDB |CURRENT_VERSION| Linux Packages </getting-started/install-scylla/install-on-linux>`
* :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
* :doc:`Install ScyllaDB Without root Privileges </getting-started/installation-common/unified-installer>`
* :doc:`Air-gapped Server Installation </getting-started/installation-common/air-gapped-install>`

View File

@@ -4,9 +4,9 @@
.. |RHEL_EPEL_8| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm
.. |RHEL_EPEL_9| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
======================================
Install ScyllaDB Linux Packages
======================================
========================================================
Install ScyllaDB |CURRENT_VERSION| Linux Packages
========================================================
We recommend installing ScyllaDB using :doc:`ScyllaDB Web Installer for Linux </getting-started/installation-common/scylla-web-installer/>`,
a platform-agnostic installation script, to install ScyllaDB on any supported Linux platform.
@@ -46,8 +46,8 @@ Install ScyllaDB
.. code-block:: console
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys a43e06657bac99e3
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --export --armor a43e06657bac99e3 | gpg --dearmor > /etc/apt/keyrings/scylladb.gpg
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys c503c686b007f39e
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --export --armor c503c686b007f39e | gpg --dearmor > /etc/apt/keyrings/scylladb.gpg
.. code-block:: console
:substitutions:

View File

@@ -1,6 +1,6 @@
==========================
Launch ScyllaDB on AWS
==========================
===============================================
Launch ScyllaDB |CURRENT_VERSION| on AWS
===============================================
This article will guide you through self-managed ScyllaDB deployment on AWS. For a fully-managed deployment of ScyllaDB
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.

View File

@@ -1,6 +1,6 @@
==========================
Launch ScyllaDB on Azure
==========================
===============================================
Launch ScyllaDB |CURRENT_VERSION| on Azure
===============================================
This article will guide you through self-managed ScyllaDB deployment on Azure. For a fully-managed deployment of ScyllaDB
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.

View File

@@ -1,6 +1,6 @@
==========================
Launch ScyllaDB on GCP
==========================
=============================================
Launch ScyllaDB |CURRENT_VERSION| on GCP
=============================================
This article will guide you through self-managed ScyllaDB deployment on GCP. For a fully-managed deployment of ScyllaDB
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.

View File

@@ -58,4 +58,12 @@ See also
* `Blog: ScyllaDB Open Source 3.1: Efficiently Maintaining Consistency with Row-Level Repair <https://www.scylladb.com/2019/08/13/scylla-open-source-3-1-efficiently-maintaining-consistency-with-row-level-repair/>`_
Incremental Repair
------------------
Built on top of `Row-level Repair <row-level-repair_>`_ and `Tablets </architecture/tablets>`_, Incremental Repair enables frequent and quick repairs. For more details, see `Incremental Repair </features/incremental-repair>`_.
Automatic Repair
----------------
Built on top of `Incremental Repair </features/incremental-repair>`_, `Automatic Repair </features/automatic-repair>`_ offers repair scheduling and execution directly in ScyllaDB, without external processes.

View File

@@ -8,7 +8,6 @@ Troubleshooting ScyllaDB
support/index
startup/index
upgrade/index
cluster/index
modeling/index
storage/index
@@ -29,7 +28,6 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
* :doc:`Errors and ScyllaDB Customer Support <support/index>`
* :doc:`ScyllaDB Startup <startup/index>`
* :doc:`ScyllaDB Cluster and Node <cluster/index>`
* :doc:`ScyllaDB Upgrade <upgrade/index>`
* :doc:`Data Modeling <modeling/index>`
* :doc:`Data Storage and SSTables <storage/index>`
* :doc:`CQL errors <CQL/index>`

View File

@@ -1,79 +0,0 @@
Inaccessible "/var/lib/scylla" and "/var/lib/systemd/coredump" after ScyllaDB upgrade
======================================================================================
Problem
^^^^^^^
When you reboot the machine after a ScyllaDB upgrade, you cannot access data directories under ``/var/lib/scylla``, and
coredump saves to ``rootfs``.
The problem may occur when you upgrade ScylaDB Open Source 4.6 or later to a version of ScyllaDB Enterprise if
the ``/etc/systemd/system/var-lib-scylla.mount`` and ``/etc/systemd/system/var-lib-systemd-coredump.mount`` are
deleted by RPM.
To avoid losing the files, the upgrade procedure includes a step to backup the .mount files. The following
example shows the command to backup the files before the upgrade from version 5.0:
.. code-block:: console
for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ) /etc/systemd/system/{var-lib-scylla,var-lib-systemd-coredump}.mount; do sudo cp -v $conf $conf.backup-5.0; done
If you don't backup the .mount files before the upgrade, the files may be lost.
Solution
^^^^^^^^
If you didn't backup the .mount files before the upgrade and the files were deleted during the upgrade,
you need to restore them manually.
To restore ``/etc/systemd/system/var-lib-systemd-coredump.mount``, run the following:
.. code-block:: console
$ cat << EOS | sudo tee /etc/systemd/system/var-lib-systemd-coredump.mount
[Unit]
Description=Save coredump to scylla data directory
Conflicts=umount.target
Before=scylla-server.service
After=local-fs.target
DefaultDependencies=no
[Mount]
What=/var/lib/scylla/coredump
Where=/var/lib/systemd/coredump
Type=none
Options=bind
[Install]
WantedBy=multi-user.target
EOS
To restore ``/etc/systemd/system/var-lib-scylla.mount``, run the following (specifying your data disk):
.. code-block:: console
$ UUID=`blkid -s UUID -o value <specify your data disk, eg: /dev/md0>`
$ cat << EOS | sudo tee /etc/systemd/system/var-lib-scylla.mount
[Unit]
Description=ScyllaDB data directory
Before=scylla-server.service
After=local-fs.target
DefaultDependencies=no
[Mount]
What=/dev/disk/by-uuid/$UUID
Where=/var/lib/scylla
Type=xfs
Options=noatime
[Install]
WantedBy=multi-user.target
EOS
After restoring .mount files, you need to enable them:
.. code-block:: console
$ sudo systemctl daemon-reload
$ sudo systemctl enable --now var-lib-scylla.mount
$ sudo systemctl enable --now var-lib-systemd-coredump.mount
.. include:: /troubleshooting/_common/ts-return.rst

View File

@@ -1,16 +0,0 @@
Upgrade
=================
.. toctree::
:hidden:
:maxdepth: 2
Inaccessible configuration files after ScyllaDB upgrade </troubleshooting/missing-dotmount-files>
.. panel-box::
:title: Upgrade Issues
:id: "getting-started"
:class: my-panel
* :doc:`Inaccessible "/var/lib/scylla" and "/var/lib/systemd/coredump" after ScyllaDB upgrade </troubleshooting//missing-dotmount-files>`

View File

@@ -17,11 +17,11 @@
#include "index/secondary_index.hh"
#include "index/secondary_index_manager.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "utils/managed_string.hh"
#include <seastar/core/sstring.hh>
#include <boost/algorithm/string.hpp>
namespace secondary_index {
static void validate_positive_option(int max, const sstring& value_name, const sstring& value) {
@@ -147,17 +147,88 @@ std::optional<cql3::description> vector_index::describe(const index_metadata& im
}
void vector_index::check_target(const schema& schema, const std::vector<::shared_ptr<cql3::statements::index_target>>& targets) const {
if (targets.size() != 1) {
throw exceptions::invalid_request_exception("Vector index can only be created on a single column");
}
auto target = targets[0];
auto c_def = schema.get_column_definition(to_bytes(target->column_name()));
if (!c_def) {
throw exceptions::invalid_request_exception(format("Column {} not found in schema", target->column_name()));
}
auto type = c_def->type;
if (!type->is_vector() || static_cast<const vector_type_impl*>(type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception(format("Vector indexes are only supported on columns of vectors of floats", target->column_name()));
struct validate_visitor {
const class schema& schema;
bool& is_vector;
/// Vector indexes support filtering on native types that can be used as primary key columns.
/// There is no counter (it cannot be used with vector columns)
/// and no duration (it cannot be used as a primary key or in secondary indexes).
static bool is_supported_filtering_column(abstract_type const & kind_type) {
switch (kind_type.get_kind()) {
case abstract_type::kind::ascii:
case abstract_type::kind::boolean:
case abstract_type::kind::byte:
case abstract_type::kind::bytes:
case abstract_type::kind::date:
case abstract_type::kind::decimal:
case abstract_type::kind::double_kind:
case abstract_type::kind::float_kind:
case abstract_type::kind::inet:
case abstract_type::kind::int32:
case abstract_type::kind::long_kind:
case abstract_type::kind::short_kind:
case abstract_type::kind::simple_date:
case abstract_type::kind::time:
case abstract_type::kind::timestamp:
case abstract_type::kind::timeuuid:
case abstract_type::kind::utf8:
case abstract_type::kind::uuid:
case abstract_type::kind::varint:
return true;
default:
break;
}
return false;
}
void validate(cql3::column_identifier const& column, bool is_vector) const {
auto const& c_name = column.to_string();
auto const* c_def = schema.get_column_definition(column.name());
if (c_def == nullptr) {
throw exceptions::invalid_request_exception(format("Column {} not found in schema", c_name));
}
auto type = c_def->type;
if (is_vector) {
auto const* vector_type = dynamic_cast<const vector_type_impl*>(type.get());
if (vector_type == nullptr) {
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
}
auto elements_type = vector_type->get_elements_type();
if (elements_type->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
}
return;
}
if (!is_supported_filtering_column(*type)) {
throw exceptions::invalid_request_exception(format("Unsupported vector index filtering column {} type", c_name));
}
}
void operator()(const std::vector<::shared_ptr<cql3::column_identifier>>& columns) const {
for (const auto& column : columns) {
// CQL restricts the secondary local index to have multiple columns with partition key only.
// Vectors shouldn't be partition key columns and they aren't supported as a filtering column,
// so we can assume here that these are non-vectors filtering columns.
validate(*column, false);
}
}
void operator()(const ::shared_ptr<cql3::column_identifier>& column) {
validate(*column, is_vector);
// The first column is the vector column, the rest mustn't be vectors.
is_vector = false;
}
};
bool is_vector = true;
for (const auto& target : targets) {
std::visit(validate_visitor{.schema = schema, .is_vector = is_vector}, target->value);
}
}

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:a4710f1f0b0bb329721c21d133618e811e820f2e70553b0aca28fb278bff89c9
size 6492280
oid sha256:a7c482a396374b635341f7923969ac0b649bda69810b12de22407938bb6505f7
size 6524480

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:2433f7a1fc5cda0dd990ab59587eb6046dca0fe1ae48d599953d1936fe014ed9
size 6492176
oid sha256:f0303b6705733d1d236700c3e36652c97eb02e8e78b2e04e8008dffd23804759
size 6526408

View File

@@ -90,14 +90,14 @@ load_balancer_stats_manager::load_balancer_stats_manager(sstring group_name):
setup_metrics(_cluster_stats);
}
load_balancer_dc_stats& load_balancer_stats_manager::for_dc(const dc_name& dc) {
const lw_shared_ptr<load_balancer_dc_stats>& load_balancer_stats_manager::for_dc(const dc_name& dc) {
auto it = _dc_stats.find(dc);
if (it == _dc_stats.end()) {
auto stats = std::make_unique<load_balancer_dc_stats>();
auto stats = make_lw_shared<load_balancer_dc_stats>();
setup_metrics(dc, *stats);
it = _dc_stats.emplace(dc, std::move(stats)).first;
}
return *it->second;
return it->second;
}
load_balancer_node_stats& load_balancer_stats_manager::for_node(const dc_name& dc, host_id node) {
@@ -149,22 +149,22 @@ db::tablet_options combine_tablet_options(R&& opts) {
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
auto tokens_view = s | std::views::split(delimiter)
| std::views::transform([](auto&& range) {
return std::string_view(&*range.begin(), std::ranges::distance(range));
})
| std::views::transform([](std::string_view sv) {
return locator::tablet_id(std::stoul(std::string(sv)));
});
| std::views::transform([](auto&& range) {
return std::string_view(&*range.begin(), std::ranges::distance(range));
})
| std::views::transform([](std::string_view sv) {
return locator::tablet_id(std::stoul(std::string(sv)));
});
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
}
struct repair_plan {
locator::global_tablet_id gid;
locator::tablet_info tinfo;
dht::token_range range;
dht::token last_token;
db_clock::duration repair_time_diff;
bool is_user_reuqest;
locator::global_tablet_id gid;
locator::tablet_info tinfo;
dht::token_range range;
dht::token last_token;
db_clock::duration repair_time_diff;
bool is_user_reuqest;
};
// Used to compare different migration choices in regard to impact on load imbalance.
@@ -291,6 +291,12 @@ struct rack_list_colocation_state {
}
};
/// Formattable wrapper for migration_plan, whose formatter prints a short summary of the plan.
struct plan_summary {
migration_plan& plan;
explicit plan_summary(migration_plan& plan) : plan(plan) {}
};
future<rack_list_colocation_state> find_required_rack_list_colocations(
replica::database& db,
token_metadata_ptr tmptr,
@@ -452,7 +458,36 @@ struct fmt::formatter<service::repair_plan> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const service::repair_plan& p, FormatContext& ctx) const {
auto diff_seconds = std::chrono::duration<float>(p.repair_time_diff).count();
fmt::format_to(ctx.out(), "{{tablet={} last_token={} is_user_req={} diff_seconds={}}}", p.gid, p.last_token, p.is_user_reuqest, diff_seconds);
fmt::format_to(ctx.out(), "{{tablet={} last_token={} is_user_req={} diff_seconds={}}}", p.gid, p.last_token, p.is_user_reuqest, diff_seconds);
return ctx.out();
}
};
template<>
struct fmt::formatter<service::plan_summary> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format(const service::plan_summary& p, FormatContext& ctx) const {
auto& plan = p.plan;
std::string_view delim = "";
auto get_delim = [&] { return std::exchange(delim, ", "); };
if (plan.migrations().size()) {
fmt::format_to(ctx.out(), "{}migrations: {}", get_delim(), plan.migrations().size());
}
if (plan.repair_plan().repairs().size()) {
fmt::format_to(ctx.out(), "{}repairs: {}", get_delim(), plan.repair_plan().repairs().size());
}
if (plan.resize_plan().resize.size()) {
fmt::format_to(ctx.out(), "{}resize: {}", get_delim(), plan.resize_plan().resize.size());
}
if (plan.resize_plan().finalize_resize.size()) {
fmt::format_to(ctx.out(), "{}resize-ready: {}", get_delim(), plan.resize_plan().finalize_resize.size());
}
if (plan.rack_list_colocation_plan().size()) {
fmt::format_to(ctx.out(), "{}rack-list colocation ready: {}", get_delim(), plan.rack_list_colocation_plan().request_to_resume());
}
if (delim.empty()) {
fmt::format_to(ctx.out(), "empty");
}
return ctx.out();
}
};
@@ -868,9 +903,12 @@ class load_balancer {
absl::flat_hash_map<table_id, uint64_t> _disk_used_per_table;
dc_name _dc;
std::optional<sstring> _rack; // Set when plan making is limited to a single rack.
sstring _location; // Name of the current scope of plan making. DC or DC+rack.
lw_shared_ptr<load_balancer_dc_stats> _current_stats; // Stats for current scope of plan making.
size_t _total_capacity_shards; // Total number of non-drained shards in the balanced node set.
size_t _total_capacity_nodes; // Total number of non-drained nodes in the balanced node set.
uint64_t _total_capacity_storage; // Total storage of non-drained nodes in the balanced node set.
size_t _migrating_candidates; // Number of candidate replicas skipped because tablet is migrating.
locator::load_stats_ptr _table_load_stats;
load_balancer_stats_manager& _stats;
std::unordered_set<host_id> _skiplist;
@@ -995,22 +1033,21 @@ public:
migration_plan plan;
auto rack_list_colocation = ongoing_rack_list_colocation();
if (!utils::get_local_injector().enter("tablet_migration_bypass")) {
// Prepare plans for each DC separately and combine them to be executed in parallel.
for (auto&& dc : topo.get_datacenters()) {
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) {
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
auto rack_plan = co_await make_plan(dc, rack);
auto level = rack_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migrations in rack {} in DC {}", rack_plan.size(), rack, dc);
plan.merge(std::move(rack_plan));
}
} else {
auto dc_plan = co_await make_plan(dc);
auto level = dc_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migrations in DC {}", dc_plan.size(), dc);
plan.merge(std::move(dc_plan));
// Prepare plans for each DC separately and combine them to be executed in parallel.
for (auto&& dc : topo.get_datacenters()) {
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) {
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
auto rack_plan = co_await make_plan(dc, rack);
auto level = rack_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
lblogger.log(level, "Plan for {}/{}: {}", dc, rack, plan_summary(rack_plan));
plan.merge(std::move(rack_plan));
}
} else {
auto dc_plan = co_await make_plan(dc);
auto level = dc_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
lblogger.log(level, "Plan for {}: {}", dc, plan_summary(dc_plan));
plan.merge(std::move(dc_plan));
}
}
@@ -1027,9 +1064,8 @@ public:
plan.set_repair_plan(co_await make_repair_plan(plan));
}
auto level = plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s) and {} tablet repair(s) and {} rack-list colocation(s)",
plan.size(), plan.tablet_migration_count(), plan.resize_decision_count(), plan.tablet_repair_count(), plan.tablet_rack_list_colocation_count());
auto level = plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
lblogger.log(level, "Prepared plan: {}", plan_summary(plan));
co_return std::move(plan);
}
@@ -1408,7 +1444,7 @@ public:
co_return all_colocated;
}
future<migration_plan> make_merge_colocation_plan(const dc_name& dc, node_load_map& nodes) {
future<migration_plan> make_merge_colocation_plan(node_load_map& nodes) {
migration_plan plan;
table_resize_plan resize_plan;
@@ -1565,7 +1601,7 @@ public:
if (cross_rack_migration(src, dst)) {
// FIXME: This is illegal if table has views, as it breaks base-view pairing.
// Can happen when RF!=#racks.
_stats.for_dc(_dc).cross_rack_collocations++;
_current_stats->cross_rack_collocations++;
lblogger.debug("Cross-rack co-location migration for {}@{} (rack: {}) to co-habit {}@{} (rack: {})",
t2_id, src, rack_of(src), t1_id, dst, rack_of(dst));
utils::get_local_injector().inject("forbid_cross_rack_migration_attempt", [&] {
@@ -2215,7 +2251,7 @@ public:
// Evaluates impact on load balance of migrating a tablet set of a given table to dst.
migration_badness evaluate_dst_badness(node_load_map& nodes, table_id table, tablet_replica dst, uint64_t tablet_set_disk_size) {
_stats.for_dc(_dc).candidates_evaluated++;
_current_stats->candidates_evaluated++;
auto& node_info = nodes[dst.host];
@@ -2254,7 +2290,7 @@ public:
// Evaluates impact on load balance of migrating a tablet set of a given table from src.
migration_badness evaluate_src_badness(node_load_map& nodes, table_id table, tablet_replica src, uint64_t tablet_set_disk_size) {
_stats.for_dc(_dc).candidates_evaluated++;
_current_stats->candidates_evaluated++;
auto& node_info = nodes[src.host];
@@ -2603,15 +2639,15 @@ public:
auto mig_streaming_info = get_migration_streaming_infos(_tm->get_topology(), tmap, mig);
if (!can_accept_load(nodes, mig_streaming_info)) {
_stats.for_dc(node_load.dc()).migrations_skipped++;
_current_stats->migrations_skipped++;
lblogger.debug("Unable to balance {}: load limit reached", host);
break;
}
apply_load(nodes, mig_streaming_info);
lblogger.debug("Adding migration: {} size: {}", mig, tablets.tablet_set_disk_size);
_stats.for_dc(node_load.dc()).migrations_produced++;
_stats.for_dc(node_load.dc()).intranode_migrations_produced++;
_current_stats->migrations_produced++;
_current_stats->intranode_migrations_produced++;
mark_as_scheduled(mig);
plan.add(std::move(mig));
@@ -2718,21 +2754,21 @@ public:
auto targets = get_viable_targets();
if (rs->is_rack_based(_dc)) {
lblogger.debug("candidate tablet {} skipped because RF is rack-based and it's in a different rack", tablet);
_stats.for_dc(src_info.dc()).tablets_skipped_rack++;
_current_stats->tablets_skipped_rack++;
return skip_info{std::move(targets)};
}
if (!targets.contains(dst_info.id)) {
auto new_rack_load = rack_load[dst_info.rack()] + 1;
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
tablet, dst_info.rack(), new_rack_load, max_rack_load);
_stats.for_dc(src_info.dc()).tablets_skipped_rack++;
_current_stats->tablets_skipped_rack++;
return skip_info{std::move(targets)};
}
}
for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) {
if (r.host == dst_info.id) {
_stats.for_dc(src_info.dc()).tablets_skipped_node++;
_current_stats->tablets_skipped_node++;
lblogger.debug("candidate tablet {} skipped because it has a replica on target node", tablet);
if (need_viable_targets) {
return skip_info{get_viable_targets()};
@@ -2939,7 +2975,7 @@ public:
};
if (min_candidate.badness.is_bad() && _use_table_aware_balancing) {
_stats.for_dc(_dc).bad_first_candidates++;
_current_stats->bad_first_candidates++;
// Consider better alternatives.
if (drain_skipped) {
@@ -3060,7 +3096,7 @@ public:
lblogger.debug("Table {} shard overcommit: {}", table, overcommit);
}
future<migration_plan> make_internode_plan(const dc_name& dc, node_load_map& nodes,
future<migration_plan> make_internode_plan(node_load_map& nodes,
const std::unordered_set<host_id>& nodes_to_drain,
host_id target) {
migration_plan plan;
@@ -3120,7 +3156,7 @@ public:
if (nodes_by_load.empty()) {
lblogger.debug("No more candidate nodes");
_stats.for_dc(dc).stop_no_candidates++;
_current_stats->stop_no_candidates++;
break;
}
@@ -3191,7 +3227,7 @@ public:
if (nodes_by_load_dst.empty()) {
lblogger.debug("No more target nodes");
_stats.for_dc(dc).stop_no_candidates++;
_current_stats->stop_no_candidates++;
break;
}
@@ -3221,7 +3257,7 @@ public:
const load_type max_load = std::max(max_off_candidate_load, src_node_info.avg_load);
if (is_balanced(target_info.avg_load, max_load)) {
lblogger.debug("Balance achieved.");
_stats.for_dc(dc).stop_balance++;
_current_stats->stop_balance++;
break;
}
}
@@ -3255,7 +3291,7 @@ public:
auto& tmap = tmeta.get_tablet_map(source_tablets.table());
if (can_check_convergence && !check_convergence(src_node_info, target_info, source_tablets)) {
lblogger.debug("No more candidates. Load would be inverted.");
_stats.for_dc(dc).stop_load_inversion++;
_current_stats->stop_load_inversion++;
break;
}
@@ -3289,11 +3325,11 @@ public:
}
}
if (candidate.badness.is_bad()) {
_stats.for_dc(_dc).bad_migrations++;
_current_stats->bad_migrations++;
}
if (drain_skipped) {
_stats.for_dc(_dc).migrations_from_skiplist++;
_current_stats->migrations_from_skiplist++;
}
if (src_node_info.req && *src_node_info.req == topology_request::leave && src_node_info.excluded) {
@@ -3313,7 +3349,7 @@ public:
if (can_accept_load(nodes, mig_streaming_info)) {
apply_load(nodes, mig_streaming_info);
lblogger.debug("Adding migration: {} size: {}", mig, source_tablets.tablet_set_disk_size);
_stats.for_dc(dc).migrations_produced++;
_current_stats->migrations_produced++;
mark_as_scheduled(mig);
plan.add(std::move(mig));
} else {
@@ -3324,10 +3360,10 @@ public:
// Just because the next migration is blocked doesn't mean we could not proceed with migrations
// for other shards which are produced by the planner subsequently.
skipped_migrations++;
_stats.for_dc(dc).migrations_skipped++;
_current_stats->migrations_skipped++;
if (skipped_migrations >= max_skipped_migrations) {
lblogger.debug("Too many migrations skipped, aborting balancing");
_stats.for_dc(dc).stop_skip_limit++;
_current_stats->stop_skip_limit++;
break;
}
}
@@ -3346,7 +3382,7 @@ public:
}
if (plan.size() == batch_size) {
_stats.for_dc(dc).stop_batch_size++;
_current_stats->stop_batch_size++;
}
if (plan.empty()) {
@@ -3363,7 +3399,13 @@ public:
// If there are 7 tablets and RF=3, each node must have 1 tablet replica.
// So node3 will have average load of 1, and node1 and node2 will have
// average shard load of 7.
lblogger.info("Not possible to achieve balance.");
// Show when this is the final plan with no active migrations left to execute,
// otherwise it may just be a temporary situation due to lack of candidates.
if (_migrating_candidates == 0) {
lblogger.info("Not possible to achieve balance in {}", _location);
print_node_stats(nodes, only_active::no);
}
}
co_return std::move(plan);
@@ -3420,11 +3462,37 @@ public:
}
};
using only_active = bool_class<struct only_active_tag>;
void print_node_stats(node_load_map& nodes, only_active only_active_) {
for (auto&& [host, load] : nodes) {
size_t read = 0;
size_t write = 0;
for (auto& shard_load : load.shards) {
read += shard_load.streaming_read_load;
write += shard_load.streaming_write_load;
}
auto level = !only_active_ || (read + write) > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Node {}: {}/{} load={:.6f} tablets={} shards={} tablets/shard={:.3f} state={} cap={}"
" rd={} wr={}",
host, load.dc(), load.rack(), load.avg_load, load.tablet_count, load.shard_count,
load.tablets_per_shard(), load.state(), load.dusage->capacity, read, write);
}
}
future<migration_plan> make_plan(dc_name dc, std::optional<sstring> rack = std::nullopt) {
migration_plan plan;
if (utils::get_local_injector().enter("tablet_migration_bypass")) {
co_return std::move(plan);
}
_dc = dc;
_rack = rack;
_location = fmt::format("{}{}", dc, rack ? fmt::format("/{}", *rack) : "");
_current_stats = _stats.for_dc(dc);
auto _ = seastar::defer([&] { _current_stats = nullptr; });
_migrating_candidates = 0;
auto node_filter = [&] (const locator::node& node) {
return node.dc_rack().dc == dc && (!rack || node.dc_rack().rack == *rack);
@@ -3433,7 +3501,7 @@ public:
// Causes load balancer to move some tablet even though load is balanced.
auto shuffle = in_shuffle_mode();
_stats.for_dc(dc).calls++;
_current_stats->calls++;
lblogger.debug("Examining DC {} rack {} (shuffle={}, balancing={}, tablets_per_shard_goal={}, force_capacity_based_balancing={})",
dc, rack, shuffle, _tm->tablets().balancing_enabled(), _tablets_per_shard_goal, _force_capacity_based_balancing);
@@ -3529,7 +3597,7 @@ public:
if (nodes.empty()) {
lblogger.debug("No nodes to balance.");
_stats.for_dc(dc).stop_balance++;
_current_stats->stop_balance++;
co_return plan;
}
@@ -3552,15 +3620,23 @@ public:
// If we don't have nodes to drain, remove nodes which don't have complete tablet sizes
if (nodes_to_drain.empty()) {
std::optional<host_id> incomplete_host;
size_t incomplete_count = 0;
for (auto nodes_i = nodes.begin(); nodes_i != nodes.end();) {
host_id host = nodes_i->first;
if (!_load_sketch->has_complete_data(host)) {
lblogger.info("Node {} does not have complete tablet stats, ignoring", nodes_i->first);
incomplete_host.emplace(host);
incomplete_count++;
nodes_i = nodes.erase(nodes_i);
} else {
++nodes_i;
}
}
if (incomplete_host) {
lblogger.info("Ignoring {} node(s) with incomplete tablet stats, e.g. {}", incomplete_count, *incomplete_host);
}
}
plan.set_has_nodes_to_drain(!nodes_to_drain.empty());
@@ -3594,11 +3670,11 @@ public:
});
if (!has_dest_nodes) {
for (auto host : nodes_to_drain) {
plan.add(drain_failure(host, format("No candidate nodes in DC {} to drain {}."
" Consider adding new nodes or reducing replication factor.", dc, host)));
plan.add(drain_failure(host, format("No candidate nodes in {} to drain {}."
" Consider adding new nodes or reducing replication factor.", _location, host)));
}
lblogger.debug("No candidate nodes");
_stats.for_dc(dc).stop_no_candidates++;
_current_stats->stop_no_candidates++;
co_return plan;
}
@@ -3704,6 +3780,8 @@ public:
if (!migrating(t1) && !migrating(t2)) {
auto candidate = colocated_tablets{global_tablet_id{table, t1.tid}, global_tablet_id{table, t2->tid}};
add_candidate(shard_load_info, migration_tablet_set{std::move(candidate), tablet_sizes_sum});
} else {
_migrating_candidates++;
}
} else {
if (tids.size() != tablet_sizes.size()) {
@@ -3712,6 +3790,8 @@ public:
for (size_t i = 0; i < tids.size(); i++) {
if (!migrating(get_table_desc(tids[i]))) { // migrating tablets are not candidates
add_candidate(shard_load_info, migration_tablet_set{global_tablet_id{table, tids[i]}, tablet_sizes[i]});
} else {
_migrating_candidates++;
}
}
}
@@ -3749,26 +3829,14 @@ public:
}
}
for (auto&& [host, load] : nodes) {
size_t read = 0;
size_t write = 0;
for (auto& shard_load : load.shards) {
read += shard_load.streaming_read_load;
write += shard_load.streaming_write_load;
}
auto level = (read + write) > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Node {}: dc={} rack={} load={} tablets={} shards={} tablets/shard={} state={} cap={}"
" stream_read={} stream_write={}",
host, dc, load.rack(), load.avg_load, load.tablet_count, load.shard_count,
load.tablets_per_shard(), load.state(), load.dusage->capacity, read, write);
}
print_node_stats(nodes, only_active::yes);
if (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) {
host_id target = *min_load_node;
lblogger.info("target node: {}, avg_load: {}, max: {}", target, min_load, max_load);
plan.merge(co_await make_internode_plan(dc, nodes, nodes_to_drain, target));
plan.merge(co_await make_internode_plan(nodes, nodes_to_drain, target));
} else {
_stats.for_dc(dc).stop_balance++;
_current_stats->stop_balance++;
}
if (_tm->tablets().balancing_enabled()) {
@@ -3776,9 +3844,9 @@ public:
}
if (_tm->tablets().balancing_enabled() && plan.empty() && !ongoing_rack_list_colocation()) {
auto dc_merge_plan = co_await make_merge_colocation_plan(dc, nodes);
auto dc_merge_plan = co_await make_merge_colocation_plan(nodes);
auto level = dc_merge_plan.tablet_migration_count() > 0 ? seastar::log_level::info : seastar::log_level::debug;
lblogger.log(level, "Prepared {} migrations for co-locating sibling tablets in DC {}", dc_merge_plan.tablet_migration_count(), dc);
lblogger.log(level, "Prepared {} migrations for co-locating sibling tablets in {}", dc_merge_plan.tablet_migration_count(), _location);
plan.merge(std::move(dc_merge_plan));
}

View File

@@ -100,7 +100,7 @@ class load_balancer_stats_manager {
using host_id = locator::host_id;
sstring group_name;
std::unordered_map<dc_name, std::unique_ptr<load_balancer_dc_stats>> _dc_stats;
std::unordered_map<dc_name, lw_shared_ptr<load_balancer_dc_stats>> _dc_stats;
std::unordered_map<host_id, std::unique_ptr<load_balancer_node_stats>> _node_stats;
load_balancer_cluster_stats _cluster_stats;
seastar::metrics::label dc_label{"target_dc"};
@@ -113,7 +113,7 @@ class load_balancer_stats_manager {
public:
load_balancer_stats_manager(sstring group_name);
load_balancer_dc_stats& for_dc(const dc_name& dc);
const lw_shared_ptr<load_balancer_dc_stats>& for_dc(const dc_name& dc);
load_balancer_node_stats& for_node(const dc_name& dc, host_id node);
load_balancer_cluster_stats& for_cluster();
@@ -196,7 +196,7 @@ public:
bool has_nodes_to_drain() const { return _has_nodes_to_drain; }
const migrations_vector& migrations() const { return _migrations; }
bool empty() const { return _migrations.empty() && !_resize_plan.size() && !_repair_plan.size() && !_rack_list_colocation_plan.size() && _drain_failures.empty(); }
bool empty() const { return !size(); }
size_t size() const { return _migrations.size() + _resize_plan.size() + _repair_plan.size() + _rack_list_colocation_plan.size() + _drain_failures.size(); }
size_t tablet_migration_count() const { return _migrations.size(); }
size_t resize_decision_count() const { return _resize_plan.size(); }

View File

@@ -11,6 +11,7 @@
#include "utils/s3/aws_error.hh"
#include <boost/test/unit_test.hpp>
#include <seastar/core/sstring.hh>
#include <seastar/http/exception.hh>
enum class message_style : uint8_t { singular = 1, plural = 2 };
@@ -122,7 +123,7 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
std::throw_with_nested(std::logic_error("Higher level logic_error"));
}
} catch (...) {
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
BOOST_REQUIRE_EQUAL(aws::aws_error_type::NETWORK_CONNECTION, error.get_error_type());
BOOST_REQUIRE_EQUAL("Software caused connection abort", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
@@ -136,7 +137,7 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
std::throw_with_nested(std::runtime_error("Higher level runtime_error"));
}
} catch (...) {
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL("Higher level runtime_error", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
@@ -146,7 +147,7 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
try {
throw std::runtime_error("Something bad happened");
} catch (...) {
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL("Something bad happened", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
@@ -156,9 +157,39 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
try {
throw "foo";
} catch (...) {
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL("No error message was provided, exception content: char const*", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
}
// Test system_error
try {
throw std::system_error(std::error_code(ECONNABORTED, std::system_category()));
} catch (...) {
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
BOOST_REQUIRE_EQUAL(aws::aws_error_type::NETWORK_CONNECTION, error.get_error_type());
BOOST_REQUIRE_EQUAL("Software caused connection abort", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
}
// Test aws_exception
try {
throw aws::aws_exception(aws::aws_error::get_errors().at("HTTP_TOO_MANY_REQUESTS"));
} catch (...) {
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
BOOST_REQUIRE_EQUAL(aws::aws_error_type::HTTP_TOO_MANY_REQUESTS, error.get_error_type());
BOOST_REQUIRE_EQUAL("", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
}
// Test httpd::unexpected_status_error
try {
throw seastar::httpd::unexpected_status_error(seastar::http::reply::status_type::network_connect_timeout);
} catch (...) {
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
BOOST_REQUIRE_EQUAL(aws::aws_error_type::HTTP_NETWORK_CONNECT_TIMEOUT, error.get_error_type());
BOOST_REQUIRE_EQUAL(" HTTP code: 599 Network Connect Timeout", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
}
}

View File

@@ -767,7 +767,6 @@ void test_chunked_download_data_source(const client_maker_function& client_maker
#endif
cln->delete_object(object_name).get();
cln->close().get();
}
SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_minio) {

View File

@@ -26,6 +26,7 @@ import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.cluster.conftest import skip_mode
from test.pylib.internal_types import ServerUpState
logger = logging.getLogger(__name__)
@@ -198,8 +199,14 @@ ALTERNATOR_PROXY_SERVER_CONFIG = {
@pytest.fixture(scope="function")
async def alternator_proxy_server(manager: ManagerClient):
"""Fixture that creates a server with Alternator proxy protocol ports enabled."""
server = await manager.server_add(config=ALTERNATOR_PROXY_SERVER_CONFIG)
"""Fixture that creates a server with Alternator proxy protocol ports enabled.
Waits for SERVING state to ensure Alternator ports are ready.
"""
server = await manager.server_add(
config=ALTERNATOR_PROXY_SERVER_CONFIG,
expected_server_up_state=ServerUpState.SERVING
)
yield (server, manager)

View File

@@ -30,6 +30,7 @@ async def test_crashed_node_substitution(manager: ManagerClient):
log = await manager.server_open_log(failed_server.server_id)
await log.wait_for("finished do_send_ack2_msg")
failed_id = await manager.get_host_id(failed_server.server_id)
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
await task
@@ -50,7 +51,6 @@ async def test_crashed_node_substitution(manager: ManagerClient):
[await manager.api.message_injection(s.ip_addr, 'fast_orphan_removal_fiber') for s in servers]
log = await manager.server_open_log(servers[0].server_id)
failed_id = await manager.get_host_id(failed_server.server_id)
await log.wait_for(f"Finished to force remove node {failed_id}")
post_wait_live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)

View File

@@ -96,9 +96,8 @@ def dotestCreateAndDropIndex(cql, table, indexName, addKeyspaceOnDrop):
f"DROP INDEX {KEYSPACE}.{indexName}")
@pytest.fixture(scope="module")
# FIXME: LWT is not supported with tablets yet. See #18066
def table1(cql, test_keyspace_vnodes):
with create_table(cql, test_keyspace_vnodes, "(a int primary key, b int)") as table:
def table1(cql, test_keyspace):
with create_table(cql, test_keyspace, "(a int primary key, b int)") as table:
yield table
# Reproduces #8717 (CREATE INDEX IF NOT EXISTS was broken):
@@ -454,7 +453,7 @@ TOO_BIG = 1024 * 65
# Reproduces #8627
@pytest.mark.xfail(reason="issue #8627")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testIndexOnCompositeValueOver64k(cql, test_keyspace):
too_big = bytearray([1])*TOO_BIG
@@ -476,7 +475,7 @@ def testIndexOnCompositeValueOver64k(cql, test_keyspace):
too_big)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testIndexOnPartitionKeyInsertValueOver64k(cql, test_keyspace):
too_big = bytearray([1])*TOO_BIG
@@ -533,7 +532,7 @@ def testIndexOnPartitionKeyWithStaticColumnAndNoRows(cql, test_keyspace):
assert_rows(execute(cql, table, "SELECT * FROM %s WHERE pk2 = ?", 20), [1, 20, None, 9, None])
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testIndexOnClusteringColumnInsertValueOver64k(cql, test_keyspace):
too_big = bytearray([1])*TOO_BIG
@@ -568,7 +567,7 @@ def testIndexOnClusteringColumnInsertValueOver64k(cql, test_keyspace):
# Reproduces #8627
@pytest.mark.xfail(reason="issue #8627")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testIndexOnFullCollectionEntryInsertCollectionValueOver64k(cql, test_keyspace):
too_big = bytearray([1])*TOO_BIG

View File

@@ -62,7 +62,7 @@ def testTimestampTTL(cql, test_keyspace):
# Migrated from cql_tests.py:TestCQL.invalid_custom_timestamp_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testInvalidCustomTimestamp(cql, test_keyspace):
# Conditional updates

View File

@@ -1239,7 +1239,7 @@ def testInsertWithCompactStorageAndTwoClusteringColumns(cql, test_keyspace, forc
# Test for CAS with compact storage table, and CASSANDRA-6813 in particular,
# migrated from cql_tests.py:TestCQL.cas_and_compact_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testCompactStorage(cql, test_keyspace):
with create_table(cql, test_keyspace, "(partition text, key text, owner text, PRIMARY KEY (partition, key)) WITH COMPACT STORAGE") as table:

View File

@@ -28,7 +28,7 @@ def is_scylla(cql):
yield any('scylla' in name for name in names)
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testInsertSetIfNotExists(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, s set<int>)") as table:
@@ -478,7 +478,7 @@ def check_invalid_list(cql, table, condition, expected):
# Migrated from cql_tests.py:TestCQL.list_item_conditional_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testListItem(cql, test_keyspace):
for frozen in [False, True]:
@@ -505,7 +505,7 @@ def testListItem(cql, test_keyspace):
# Test expanded functionality from CASSANDRA-6839,
# migrated from cql_tests.py:TestCQL.expanded_list_item_conditional_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testExpandedListItem(cql, test_keyspace):
for frozen in [False, True]:
@@ -682,7 +682,7 @@ def testWholeMap(cql, test_keyspace):
# Migrated from cql_tests.py:TestCQL.map_item_conditional_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testMapItem(cql, test_keyspace):
for frozen in [False, True]:
@@ -711,7 +711,7 @@ def testMapItem(cql, test_keyspace):
assert list(execute(cql, table, "UPDATE %s set m['foo'] = 'bar', m['bar'] = 'foo' WHERE k = 1 IF m[?] IN (?, ?)", "foo", "blah", None))[0][0] == True
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testFrozenWithNullValues(cql, test_keyspace):
with create_table(cql, test_keyspace, f"(k int PRIMARY KEY, m frozen<list<text>>)") as table:
@@ -732,7 +732,7 @@ def testFrozenWithNullValues(cql, test_keyspace):
# Test expanded functionality from CASSANDRA-6839,
# migrated from cql_tests.py:TestCQL.expanded_map_item_conditional_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testExpandedMapItem(cql, test_keyspace):
for frozen in [False, True]:

View File

@@ -32,7 +32,7 @@ def is_scylla(cql):
# Migrated from cql_tests.py:TestCQL.static_columns_cas_test()
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testStaticColumnsCas(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(id int, k text, version int static, v text, PRIMARY KEY (id, k))") as table:
@@ -153,7 +153,7 @@ def testStaticColumnsCas(cql, test_keyspace, is_scylla):
# Test CASSANDRA-10532
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testStaticColumnsCasDelete(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(pk int, ck int, static_col int static, value int, PRIMARY KEY (pk, ck))") as table:
@@ -216,7 +216,7 @@ def testStaticColumnsCasDelete(cql, test_keyspace, is_scylla):
row(1, 7, null, 8))
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testStaticColumnsCasUpdate(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(pk int, ck int, static_col int static, value int, PRIMARY KEY (pk, ck))") as table:
@@ -271,7 +271,7 @@ def testStaticColumnsCasUpdate(cql, test_keyspace, is_scylla):
row(1, 7, 1, 8))
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testConditionalUpdatesOnStaticColumns(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(a int, b int, s int static, d text, PRIMARY KEY (a, b))") as table:
@@ -305,7 +305,7 @@ def testConditionalUpdatesOnStaticColumns(cql, test_keyspace, is_scylla):
row(8, null, 8, null))
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testStaticsWithMultipleConditions(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(a int, b int, s1 int static, s2 int static, d int, PRIMARY KEY (a, b))") as table:
@@ -343,7 +343,7 @@ def testStaticsWithMultipleConditions(cql, test_keyspace, is_scylla):
[row(false,None,None,None,None,None),row(false,None,None,None,None,None),row(false,None,None,None,None,None),row(false,None,None,None,None,None)] if is_scylla else [row(false)])
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testStaticColumnsCasUpdateWithNullStaticColumn(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(pk int, ck int, s1 int static, s2 int static, value int, PRIMARY KEY (pk, ck))") as table:
@@ -363,7 +363,7 @@ def testStaticColumnsCasUpdateWithNullStaticColumn(cql, test_keyspace, is_scylla
assertRows(execute(cql, table, "SELECT * FROM %s WHERE pk = ?", 2), row(2, null, 2, 1, null))
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def testStaticColumnsCasDeleteWithNullStaticColumn(cql, test_keyspace, is_scylla):
with create_table(cql, test_keyspace, "(pk int, ck int, s1 int static, s2 int static, value int, PRIMARY KEY (pk, ck))") as table:

View File

@@ -15,10 +15,9 @@ from cassandra.protocol import InvalidRequest
from .util import new_test_table, unique_key_int
@pytest.fixture(scope="module")
# FIXME: LWT is not supported with tablets yet. See #18066
def table1(cql, test_keyspace_vnodes):
def table1(cql, test_keyspace):
schema='p int, c int, r int, s int static, PRIMARY KEY(p, c)'
with new_test_table(cql, test_keyspace_vnodes, schema) as table:
with new_test_table(cql, test_keyspace, schema) as table:
yield table
# An LWT UPDATE whose condition uses non-static columns begins by reading

View File

@@ -47,31 +47,31 @@ def lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, pk_type, fn):
assert len(rows) == num_iterations * 2
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def test_lwt_uuid_fn_pk_insert(cql, test_keyspace):
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "uuid", "uuid")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def test_lwt_currenttimestamp_fn_pk_insert(cql, test_keyspace):
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "timestamp", "currenttimestamp")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def test_lwt_currenttime_fn_pk_insert(cql, test_keyspace):
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "time", "currenttime")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def test_lwt_currenttimeuuid_fn_pk_insert(cql, test_keyspace):
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "timeuuid", "currenttimeuuid")
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def test_lwt_now_fn_pk_insert(cql, test_keyspace):
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "timeuuid", "now")

View File

@@ -201,12 +201,9 @@ def test_unset_insert_where(cql, table2):
# NOT EXISTS"). Test that using an UNSET_VALUE in an LWT condition causes
# a clear error, not silent skip and not a crash as in issue #13001.
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def test_unset_insert_where_lwt(cql, test_keyspace):
# FIXME: new_test_table is used here due to https://github.com/scylladb/scylladb/issues/18066
# When fixed, this test can go back to using the `table2` fixture.
with new_test_table(cql, test_keyspace, "p int, c int, PRIMARY KEY (p, c)") as table2:
def test_unset_insert_where_lwt(cql, table2):
p = unique_key_int()
stmt = cql.prepare(f'INSERT INTO {table2} (p, c) VALUES ({p}, ?) IF NOT EXISTS')
with pytest.raises(InvalidRequest, match="unset"):
@@ -225,12 +222,9 @@ def test_unset_update_where(cql, table3):
# Python driver doesn't allow sending an UNSET_VALUE for the partition key,
# so only the clustering key is tested.
@pytest.mark.parametrize("test_keyspace",
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
["tablets", "vnodes"],
indirect=True)
def test_unset_update_where_lwt(cql, test_keyspace):
# FIXME: new_test_table is used here due to https://github.com/scylladb/scylladb/issues/18066
# When fixed, this test can go back to using the `table3` fixture.
with new_test_table(cql, test_keyspace, "p int, c int, r int, PRIMARY KEY (p, c)") as table3:
def test_unset_update_where_lwt(cql, table3):
stmt = cql.prepare(f"UPDATE {table3} SET r = 42 WHERE p = 0 AND c = ? IF r = ?")
with pytest.raises(InvalidRequest, match="unset"):

View File

@@ -10,6 +10,41 @@ import pytest
from .util import new_test_table, is_scylla, unique_name
from cassandra.protocol import InvalidRequest, ConfigurationException
supported_filtering_types = [
'ascii',
'bigint',
'blob',
'boolean',
'date',
'decimal',
'double',
'float',
'inet',
'int',
'smallint',
'text',
'varchar',
'time',
'timestamp',
'timeuuid',
'tinyint',
'uuid',
'varint',
]
unsupported_filtering_types = [
'duration',
'map<int, int>',
'list<int>',
'set<int>',
'tuple<int, int>',
'vector<float, 3>',
'frozen<map<int, int>>',
'frozen<list<int>>',
'frozen<set<int>>',
'frozen<tuple<int, int>>',
]
def test_create_vector_search_index(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p int primary key, v vector<float, 3>'
with new_test_table(cql, test_keyspace, schema) as table:
@@ -45,6 +80,57 @@ def test_create_vector_search_index_on_nonvector_column(cql, test_keyspace, scyl
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v) USING 'vector_index'")
def test_create_vector_search_global_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v, f1, f2) USING 'vector_index'")
def test_create_vector_search_local_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
def test_create_vector_search_local_index_with_filtering_columns_on_nonvector_column(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v int, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
def test_create_vector_search_index_with_supported_and_unsupported_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
supported_columns = ', '.join([f's{idx} {typ}' for idx, typ in enumerate(supported_filtering_types)])
unsupported_columns = ', '.join([f'u{idx} {typ}' for idx, typ in enumerate(unsupported_filtering_types)])
schema = f'p int, c int, v vector<float, 3>, {supported_columns}, {unsupported_columns}, primary key (p, c)'
with new_test_table(cql, test_keyspace, schema) as table:
for idx in range(len(supported_filtering_types)):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, s{idx}) USING 'vector_index'")
cql.execute(f"DROP INDEX {test_keyspace}.global_idx")
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, s{idx}) USING 'vector_index'")
cql.execute(f"DROP INDEX {test_keyspace}.local_idx")
for idx in range(len(unsupported_filtering_types)):
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, u{idx}) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, u{idx}) USING 'vector_index'")
def test_create_vector_search_local_index_with_unsupported_partition_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
for filter_type in unsupported_filtering_types:
schema = f'p {filter_type}, c int, v vector<float, 3>, f int, primary key (p, c)'
with pytest.raises(InvalidRequest, match="Unsupported|Invalid"):
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p), v, f) USING 'vector_index'")
def test_create_vector_search_index_with_duplicated_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = f'p int, c int, v vector<float, 3>, x int, primary key (p, c)'
with new_test_table(cql, test_keyspace, schema) as table:
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, p) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, x, x) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, p) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, x, x) USING 'vector_index'")
def test_create_vector_search_index_with_bad_options(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p int primary key, v vector<float, 3>'
with new_test_table(cql, test_keyspace, schema) as table:

View File

@@ -154,13 +154,13 @@ rebalance_stats rebalance_tablets(cql_test_env& e, locator::load_stats_ptr load_
auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10;
for (size_t i = 0; i < max_iterations; ++i) {
auto prev_lb_stats = talloc.stats().for_dc(dc);
auto prev_lb_stats = *talloc.stats().for_dc(dc);
auto start_time = std::chrono::steady_clock::now();
auto plan = talloc.balance_tablets(stm.get(), nullptr, nullptr, load_stats, skiplist).get();
auto end_time = std::chrono::steady_clock::now();
auto lb_stats = talloc.stats().for_dc(dc) - prev_lb_stats;
auto lb_stats = *talloc.stats().for_dc(dc) - prev_lb_stats;
auto elapsed = std::chrono::duration_cast<seconds_double>(end_time - start_time);
rebalance_stats iteration_stats = {

View File

@@ -39,3 +39,4 @@ class ServerUpState(IntEnum):
HOST_ID_QUERIED = auto()
CQL_CONNECTED = auto()
CQL_QUERIED = auto()
SERVING = auto() # Scylla sent sd_notify("serving")

View File

@@ -44,6 +44,7 @@ import platform
import contextlib
import fcntl
import urllib
import socket
import psutil
@@ -385,6 +386,10 @@ class ScyllaServer:
prefix=f"scylladb-{f'{xdist_worker_id}-' if xdist_worker_id else ''}{self.server_id}-test.py-"
)
self.maintenance_socket_path = f"{self.maintenance_socket_dir.name}/cql.m"
# Unix socket for receiving sd_notify messages from Scylla
self.notify_socket_path = pathlib.Path(self.maintenance_socket_dir.name) / "notify.sock"
self.notify_socket: Optional[socket.socket] = None
self._received_serving = False
self.exe = pathlib.Path(version.path).resolve()
self.vardir = pathlib.Path(vardir)
self.logger = logger
@@ -712,6 +717,50 @@ class ScyllaServer:
caslog.setLevel(oldlevel)
# Any other exception may indicate a problem, and is passed to the caller.
def _setup_notify_socket(self) -> None:
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
if self.notify_socket is not None:
return
# Remove existing socket file if present
self.notify_socket_path.unlink(missing_ok=True)
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC)
self.notify_socket.bind(str(self.notify_socket_path))
self._received_serving = False
def _cleanup_notify_socket(self) -> None:
"""Clean up the sd_notify socket."""
if self.notify_socket is not None:
self.notify_socket.close()
self.notify_socket = None
self.notify_socket_path.unlink(missing_ok=True)
def check_serving_notification(self) -> bool:
"""Check if Scylla has sent the 'serving' sd_notify message.
Returns True if the SERVING state has been reached.
"""
if self._received_serving:
return True
if self.notify_socket is None:
return False
# Try to read all available messages from the socket
while True:
try:
data = self.notify_socket.recv(4096)
# sd_notify message format: "STATUS=serving\n" or "READY=1\nSTATUS=serving\n"
message = data.decode('utf-8', errors='replace')
if 'STATUS=serving' in message:
self._received_serving = True
self.logger.debug("Received sd_notify 'serving' message")
return True
except BlockingIOError:
# No more messages available
break
except Exception as e:
self.logger.debug("Error reading from notify socket: %s", e)
break
return False
async def try_get_host_id(self, api: ScyllaRESTAPIClient) -> Optional[HostID]:
"""Try to get the host id (also tests Scylla REST API is serving)"""
@@ -754,6 +803,10 @@ class ScyllaServer:
env['UBSAN_OPTIONS'] = f'halt_on_error=1:abort_on_error=1:suppressions={TOP_SRC_DIR / "ubsan-suppressions.supp"}'
env['ASAN_OPTIONS'] = f'disable_coredump=0:abort_on_error=1:detect_stack_use_after_return=1'
# Set up socket for receiving sd_notify messages from Scylla
self._setup_notify_socket()
env['NOTIFY_SOCKET'] = self.notify_socket_path
# Reopen log file if it was closed (e.g., after a previous stop)
if self.log_file is None or self.log_file.closed:
self.log_file = self.log_filename.open("ab") # append mode to preserve previous logs
@@ -808,7 +861,10 @@ class ScyllaServer:
if server_up_state == ServerUpState.PROCESS_STARTED:
server_up_state = ServerUpState.HOST_ID_QUERIED
server_up_state = await self.get_cql_up_state() or server_up_state
if server_up_state == expected_server_up_state:
# Check for SERVING state (sd_notify "serving" message)
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
server_up_state = ServerUpState.SERVING
if server_up_state >= expected_server_up_state:
if expected_error is not None:
await report_error(
f"the node has reached {server_up_state} state,"
@@ -847,13 +903,14 @@ class ScyllaServer:
session.execute("DROP KEYSPACE k")
async def shutdown_control_connection(self) -> None:
"""Shut down driver connection"""
"""Shut down driver connection and notify socket"""
if self.control_connection is not None:
self.control_connection.shutdown()
self.control_connection = None
if self.control_cluster is not None:
self.control_cluster.shutdown()
self.control_cluster = None
self._cleanup_notify_socket()
@stop_event
@start_stop_lock

View File

@@ -23,39 +23,75 @@ future<shared_ptr<tls::certificate_credentials>> utils::http::system_trust_crede
co_return system_trust_credentials;
}
utils::http::dns_connection_factory::state::state(shared_ptr<tls::certificate_credentials> cin)
: creds(std::move(cin))
{}
future<> utils::http::dns_connection_factory::initialize(lw_shared_ptr<state> state, std::string host, int port, bool use_https, logging::logger& logger) {
co_await coroutine::all(
[state, host, port] () -> future<> {
auto hent = co_await net::dns::get_host_by_name(host, net::inet_address::family::INET);
state->addr = socket_address(hent.addr_list.front(), port);
},
[state, use_https] () -> future<> {
if (use_https && !state->creds) {
state->creds = co_await system_trust_credentials();
}
if (!use_https) {
state->creds = {};
}
}
);
state->initialized = true;
logger.debug("Initialized factory, address={} tls={}", state->addr, state->creds == nullptr ? "no" : "yes");
future<> utils::http::dns_connection_factory::init_credentials() {
if (_use_https && !_creds) {
_creds = co_await system_trust_credentials();
}
if (!_use_https) {
_creds = {};
}
_logger.debug("Initialized credentials, tls={}", _creds == nullptr ? "no" : "yes");
}
utils::http::dns_connection_factory::dns_connection_factory(dns_connection_factory&&) = default;
future<net::inet_address> utils::http::dns_connection_factory::get_address() {
auto get_addr = [this] -> net::inet_address {
const auto& addresses = _addr_list.value();
return addresses[_addr_pos++ % addresses.size()];
};
if (_addr_list) {
co_return get_addr();
}
auto units = co_await get_units(_init_semaphore, 1);
if (!_addr_list) {
auto hent = co_await net::dns::get_host_by_name(_host, net::inet_address::family::INET);
_address_ttl = std::ranges::min_element(hent.addr_entries, [](const net::hostent::address_entry& lhs, const net::hostent::address_entry& rhs) {
return lhs.ttl < rhs.ttl;
})->ttl;
if (_address_ttl.count() == 0) {
co_return hent.addr_entries[_addr_pos++ % hent.addr_entries.size()].addr;
}
_addr_list = hent.addr_entries | std::views::transform(&net::hostent::address_entry::addr) | std::ranges::to<std::vector>();
_addr_update_timer.rearm(lowres_clock::now() + _address_ttl);
}
co_return get_addr();
}
future<shared_ptr<tls::certificate_credentials>> utils::http::dns_connection_factory::get_creds() {
if (!_creds_init) [[unlikely]] {
auto units = co_await get_units(_init_semaphore, 1);
if (!_creds_init) {
co_await init_credentials();
_creds_init = true;
}
}
co_return _creds;
}
future<connected_socket> utils::http::dns_connection_factory::connect(net::inet_address address) {
auto socket_addr = socket_address(address, _port);
if (auto creds = co_await get_creds()) {
_logger.debug("Making new HTTPS connection addr={} host={}", socket_addr, _host);
co_return co_await tls::connect(creds, socket_addr, tls::tls_options{.server_name = _host});
}
_logger.debug("Making new HTTP connection addr={} host={}", socket_addr, _host);
co_return co_await seastar::connect(socket_addr, {}, transport::TCP);
}
utils::http::dns_connection_factory::dns_connection_factory(std::string host, int port, bool use_https, logging::logger& logger, shared_ptr<tls::certificate_credentials> certs)
: _host(std::move(host))
, _port(port)
, _logger(logger)
, _state(make_lw_shared<state>(std::move(certs)))
, _done(initialize(_state, _host, _port, use_https, _logger))
{}
,_creds(std::move(certs))
, _use_https(use_https)
, _addr_update_timer([this] {
if (auto units = try_get_units(_init_semaphore, 1)) {
_addr_list.reset();
}
}) {
_addr_update_timer.arm(lowres_clock::now());
}
utils::http::dns_connection_factory::dns_connection_factory(std::string uri, logging::logger& logger, shared_ptr<tls::certificate_credentials> certs)
: dns_connection_factory([&] {
@@ -68,18 +104,21 @@ utils::http::dns_connection_factory::dns_connection_factory(std::string uri, log
{}
future<connected_socket> utils::http::dns_connection_factory::make(abort_source*) {
if (!_state->initialized) {
_logger.debug("Waiting for factory to initialize");
co_await _done.get_future();
try {
auto address = co_await get_address();
co_return co_await connect(address);
} catch (...) {
// On failure, forcefully renew address resolution and try again
_logger.debug("Connection failed, resetting address provider and retrying: {}", std::current_exception());
}
_addr_list.reset();
auto address = co_await get_address();
co_return co_await connect(address);
}
if (_state->creds) {
_logger.debug("Making new HTTPS connection addr={} host={}", _state->addr, _host);
co_return co_await tls::connect(_state->creds, _state->addr, tls::tls_options{.server_name = _host});
} else {
_logger.debug("Making new HTTP connection addr={} host={}", _state->addr, _host);
co_return co_await seastar::connect(_state->addr, {}, transport::TCP);
}
future<> utils::http::dns_connection_factory::close() {
_addr_update_timer.cancel();
co_await get_units(_init_semaphore, 1);
}
static const char HTTPS[] = "https";

View File

@@ -9,7 +9,6 @@
#pragma once
#include <seastar/core/seastar.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/http/client.hh>
#include <seastar/net/dns.hh>
#include <seastar/net/tls.hh>
@@ -26,23 +25,26 @@ protected:
std::string _host;
int _port;
logging::logger& _logger;
struct state {
bool initialized = false;
socket_address addr;
shared_ptr<tls::certificate_credentials> creds;
state(shared_ptr<tls::certificate_credentials>);
};
lw_shared_ptr<state> _state;
shared_future<> _done;
semaphore _init_semaphore{1};
bool _creds_init = false;
std::optional<std::vector<net::inet_address>> _addr_list;
shared_ptr<tls::certificate_credentials> _creds;
uint16_t _addr_pos{0};
bool _use_https;
std::chrono::seconds _address_ttl{0};
timer<lowres_clock> _addr_update_timer;
// This method can out-live the factory instance, in case `make()` is never called before the instance is destroyed.
static future<> initialize(lw_shared_ptr<state> state, std::string host, int port, bool use_https, logging::logger& logger);
future<> init_credentials();
future<net::inet_address> get_address();
future<shared_ptr<tls::certificate_credentials>> get_creds();
future<connected_socket> connect(net::inet_address address);
public:
dns_connection_factory(dns_connection_factory&&);
dns_connection_factory(dns_connection_factory&&) = default;
dns_connection_factory(std::string host, int port, bool use_https, logging::logger& logger, shared_ptr<tls::certificate_credentials> = {});
dns_connection_factory(std::string endpoint_url, logging::logger& logger, shared_ptr<tls::certificate_credentials> = {});
virtual future<connected_socket> make(abort_source*) override;
future<> close() override;
};
// simple URL parser, just enough to handle required aspects for normal endpoint usage

View File

@@ -160,43 +160,9 @@ aws_error aws_error::from_system_error(const std::system_error& system_error) {
}
}
aws_error aws_error::from_maybe_nested_exception(std::exception_ptr eptr) {
std::string original_message;
while (eptr) {
try {
std::rethrow_exception(eptr);
} catch (const std::exception& e) {
if (original_message.empty()) {
original_message = e.what();
}
if (auto* sys = dynamic_cast<const std::system_error*>(&e)) {
return from_system_error(*sys);
}
try {
std::rethrow_if_nested(e);
} catch (...) {
eptr = std::current_exception();
continue;
}
break;
} catch (...) {
// Non-std::exception, should not happen in general
break;
}
}
if (original_message.empty()) {
original_message = fmt::format("No error message was provided, exception content: {}", eptr);
}
return {aws_error_type::UNKNOWN, std::move(original_message), retryable::no};
}
aws_error aws_error::from_exception_ptr(std::exception_ptr exception) {
if (exception) {
std::string original_message;
while (exception) {
try {
std::rethrow_exception(exception);
} catch (const aws_exception& ex) {
@@ -205,10 +171,20 @@ aws_error aws_error::from_exception_ptr(std::exception_ptr exception) {
return from_http_code(ex.status());
} catch (const std::system_error& ex) {
return from_system_error(ex);
} catch (const std::exception&) {
return from_maybe_nested_exception(std::current_exception());
} catch (const std::exception& ex) {
if (original_message.empty()) {
original_message = ex.what();
}
try {
std::rethrow_if_nested(ex);
} catch (...) {
exception = std::current_exception();
continue;
}
return aws_error{aws_error_type::UNKNOWN, std::move(original_message), retryable::no};
} catch (...) {
return aws_error{aws_error_type::UNKNOWN, seastar::format("{}", std::current_exception()), retryable::no};
return aws_error{aws_error_type::UNKNOWN, seastar::format("No error message was provided, exception content: {}", std::current_exception()), retryable::no};
}
}
return aws_error{aws_error_type::UNKNOWN, "No exception was provided to `aws_error::from_exception_ptr` function call", retryable::no};

View File

@@ -106,7 +106,6 @@ public:
static std::optional<aws_error> parse(seastar::sstring&& body);
static aws_error from_http_code(seastar::http::reply::status_type http_code);
static aws_error from_system_error(const std::system_error& system_error);
static aws_error from_maybe_nested_exception(std::exception_ptr maybe_nested_error);
static aws_error from_exception_ptr(std::exception_ptr exception);
static const aws_errors& get_errors();
};