mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 04:06:59 +00:00
Compare commits
41 Commits
copilot/fi
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bafd185087 | ||
|
|
07d1f8f48a | ||
|
|
0cf9f41649 | ||
|
|
dc89e2ea37 | ||
|
|
797f56cb45 | ||
|
|
be1d418bc0 | ||
|
|
46923f7358 | ||
|
|
4032e95715 | ||
|
|
eab10c00b1 | ||
|
|
091c3b4e22 | ||
|
|
19eadafdef | ||
|
|
358fc15893 | ||
|
|
32124d209e | ||
|
|
c7f4bda459 | ||
|
|
568af3cd8d | ||
|
|
bd694dd1a1 | ||
|
|
9672e0171f | ||
|
|
8cec41acf2 | ||
|
|
d207de0d76 | ||
|
|
edde4e878e | ||
|
|
be1c674f1a | ||
|
|
a7cff37024 | ||
|
|
9431bc5628 | ||
|
|
14db8375ac | ||
|
|
614020b5d5 | ||
|
|
e091afb400 | ||
|
|
edc46fe6a1 | ||
|
|
f8b9b767c2 | ||
|
|
23d038b385 | ||
|
|
3e2d1384bf | ||
|
|
bd7481e30c | ||
|
|
16d7b65754 | ||
|
|
e30c01eae6 | ||
|
|
d0f3725887 | ||
|
|
c12168b7ef | ||
|
|
76c0162060 | ||
|
|
c9620d9573 | ||
|
|
91cf77d016 | ||
|
|
2c2f0693ab | ||
|
|
2c73d0e6b5 | ||
|
|
f94296e0ae |
@@ -18,7 +18,7 @@ jobs:
|
||||
|
||||
// Regular expression pattern to check for "Fixes" prefix
|
||||
// Adjusted to dynamically insert the repository full name
|
||||
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|([A-Z]+-\\d+))`;
|
||||
const pattern = `Fixes:? ((?:#|${repo.replace('/', '\\/')}#|https://github\\.com/${repo.replace('/', '\\/')}/issues/)(\\d+)|(?:https://scylladb\\.atlassian\\.net/browse/)?([A-Z]+-\\d+))`;
|
||||
const regex = new RegExp(pattern);
|
||||
|
||||
if (!regex.test(body)) {
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.1.0-dev
|
||||
VERSION=2026.1.0-rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -5,6 +5,10 @@
|
||||
|
||||
/stable/kb/perftune-modes-sync.html: /stable/kb/index.html
|
||||
|
||||
# Remove the troubleshooting page relevant for Open Source only
|
||||
|
||||
/stable/troubleshooting/missing-dotmount-files.html: /troubleshooting/index.html
|
||||
|
||||
# Move the diver information to another project
|
||||
|
||||
/stable/using-scylla/drivers/index.html: https://docs.scylladb.com/stable/drivers/index.html
|
||||
|
||||
@@ -281,7 +281,8 @@ For example::
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
|
||||
or columns provided in a definition of the index.
|
||||
|
||||
For example::
|
||||
|
||||
|
||||
@@ -140,17 +140,83 @@ Vector Index :label-note:`ScyllaDB Cloud`
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
|
||||
|
||||
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
|
||||
similarity search on vector data.
|
||||
similarity search on vector data. Vector indexes can be a global index for indexing vectors per table or a local
|
||||
index for indexing vectors per partition.
|
||||
|
||||
The vector index is the only custom type index supported in ScyllaDB. It is created using
|
||||
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. Example:
|
||||
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. It is also possible to
|
||||
add additional columns to the index for filtering the search results. The partition column
|
||||
specified in the global vector index definition must be the vector column, and any subsequent
|
||||
columns are treated as filtering columns. The local vector index requires that the partition key
|
||||
of the base table is also the partition key of the index and the vector column is the first one
|
||||
from the following columns.
|
||||
|
||||
Example of a simple index:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed to enable similarity search using
|
||||
a global vector index. Additional filtering can be performed on the primary key
|
||||
columns of the base table.
|
||||
|
||||
Example of a global vector index with additional filtering:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding, category, info)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed to enable similarity search using
|
||||
a global index. Additional columns are added for filtering the search results.
|
||||
The filtering is possible on ``category``, ``info`` and all primary key columns
|
||||
of the base table.
|
||||
|
||||
Example of a local vector index:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings ((id, created_at), embedding, category, info)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed for similarity search (a local
|
||||
index) and additional columns are added for filtering the search results. The
|
||||
filtering is possible on ``category``, ``info`` and all primary key columns of
|
||||
the base table. The columns ``id`` and ``created_at`` must be the partition key
|
||||
of the base table.
|
||||
|
||||
Vector indexes support additional filtering columns of native data types
|
||||
(excluding counter and duration). The indexed column itself must be a vector
|
||||
column, while the extra columns can be used to filter search results.
|
||||
|
||||
The supported types are:
|
||||
|
||||
* ``ascii``
|
||||
* ``bigint``
|
||||
* ``blob``
|
||||
* ``boolean``
|
||||
* ``date``
|
||||
* ``decimal``
|
||||
* ``double``
|
||||
* ``float``
|
||||
* ``inet``
|
||||
* ``int``
|
||||
* ``smallint``
|
||||
* ``text``
|
||||
* ``varchar``
|
||||
* ``time``
|
||||
* ``timestamp``
|
||||
* ``timeuuid``
|
||||
* ``tinyint``
|
||||
* ``uuid``
|
||||
* ``varint``
|
||||
|
||||
|
||||
The following options are supported for vector indexes. All of them are optional.
|
||||
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
|
||||
23
docs/features/automatic-repair.rst
Normal file
23
docs/features/automatic-repair.rst
Normal file
@@ -0,0 +1,23 @@
|
||||
.. _automatic-repair:
|
||||
|
||||
Automatic Repair
|
||||
================
|
||||
|
||||
Traditionally, launching `repairs </operating-scylla/procedures/maintenance/repair>`_ in a ScyllaDB cluster is left to an external process, typically done via `Scylla Manager <https://manager.docs.scylladb.com/stable/repair/index.html>`_.
|
||||
|
||||
Automatic repair offers built-in scheduling in ScyllaDB itself. If the time since the last repair is greater than the configured repair interval, ScyllaDB will start a repair for the tablet `tablet </architecture/tablets>`_ automatically.
|
||||
Repairs are spread over time and among nodes and shards, to avoid load spikes or any adverse effects on user workloads.
|
||||
|
||||
To enable automatic repair, add this to the configuration (``scylla.yaml``):
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
auto_repair_enabled_default: true
|
||||
auto_repair_threshold_default_in_seconds: 86400
|
||||
|
||||
This will enable automatic repair for all tables with a repair period of 1 day. This configuration has to be set on each node, to an identical value.
|
||||
More featureful configuration methods will be implemented in the future.
|
||||
|
||||
To disable, set ``auto_repair_enabled_default: false``.
|
||||
|
||||
Automatic repair relies on `Incremental Repair </features/incremental-repair>`_ and as such it only works with `tablet </architecture/tablets>`_ tables.
|
||||
@@ -3,7 +3,7 @@
|
||||
Incremental Repair
|
||||
==================
|
||||
|
||||
ScyllaDB's standard repair process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency.
|
||||
ScyllaDB's standard `repair </operating-scylla/procedures/maintenance/repair>`_ process scans and processes all the data on a node, regardless of whether it has changed since the last repair. This operation can be resource-intensive and time-consuming. The Incremental Repair feature provides a much more efficient and lightweight alternative for maintaining data consistency.
|
||||
|
||||
The core idea of incremental repair is to repair only the data that has been written or changed since the last repair was run. It intelligently skips data that has already been verified, dramatically reducing the time, I/O, and CPU resources required for the repair operation.
|
||||
|
||||
@@ -37,7 +37,12 @@ The available modes are:
|
||||
* ``disabled``: Completely disables the incremental repair logic for the current operation. The repair behaves like a classic, non-incremental repair, and it does not read or update any incremental repair status markers.
|
||||
|
||||
|
||||
The incremental_mode parameter can be specified using nodetool cluster repair, e.g., nodetool cluster repair --incremental-mode incremental. It can also be specified with the REST API, e.g., curl -X POST "http://127.0.0.1:10000/storage_service/tablets/repair?ks=ks1&table=tb1&tokens=all&incremental_mode=incremental"
|
||||
The incremental_mode parameter can be specified using nodetool cluster repair, e.g., nodetool cluster repair --incremental-mode incremental.
|
||||
It can also be specified with the REST API, e.g.:
|
||||
|
||||
.. code::
|
||||
|
||||
curl -X POST "http://127.0.0.1:10000/storage_service/tablets/repair?ks=ks1&table=tb1&tokens=all&incremental_mode=incremental"
|
||||
|
||||
Benefits of Incremental Repair
|
||||
------------------------------
|
||||
@@ -46,6 +51,8 @@ Benefits of Incremental Repair
|
||||
* **Reduced Resource Usage:** Consumes significantly less CPU, I/O, and network bandwidth compared to a full repair.
|
||||
* **More Frequent Repairs:** The efficiency of incremental repair allows you to run it more frequently, ensuring a higher level of data consistency across your cluster at all times.
|
||||
|
||||
Tables using Incremental Repair can schedule repairs in ScyllaDB itself, with `Automatic Repair </features/automatic-repair>`_.
|
||||
|
||||
Notes
|
||||
-----
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
Workload Prioritization </features/workload-prioritization>
|
||||
Backup and Restore </features/backup-and-restore>
|
||||
Incremental Repair </features/incremental-repair/>
|
||||
Automatic Repair </features/automatic-repair/>
|
||||
Vector Search </features/vector-search/>
|
||||
|
||||
.. panel-box::
|
||||
@@ -44,5 +45,7 @@ This document highlights ScyllaDB's key data modeling features.
|
||||
* :doc:`Incremental Repair </features/incremental-repair/>` provides a much more
|
||||
efficient and lightweight approach to maintaining data consistency by
|
||||
repairing only the data that has changed since the last repair.
|
||||
* :doc:`Automatic Repair </features/automatic-repair/>` schedules and runs repairs
|
||||
directly in ScyllaDB, without external schedulers.
|
||||
* :doc:`Vector Search in ScyllaDB </features/vector-search/>` enables
|
||||
similarity-based queries on vector embeddings.
|
||||
|
||||
@@ -24,9 +24,9 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`Launch ScyllaDB on AWS </getting-started/install-scylla/launch-on-aws>`
|
||||
* :doc:`Launch ScyllaDB on GCP </getting-started/install-scylla/launch-on-gcp>`
|
||||
* :doc:`Launch ScyllaDB on Azure </getting-started/install-scylla/launch-on-azure>`
|
||||
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on AWS </getting-started/install-scylla/launch-on-aws>`
|
||||
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on GCP </getting-started/install-scylla/launch-on-gcp>`
|
||||
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on Azure </getting-started/install-scylla/launch-on-azure>`
|
||||
|
||||
|
||||
.. panel-box::
|
||||
@@ -35,7 +35,7 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`Install ScyllaDB with Web Installer (recommended) </getting-started/installation-common/scylla-web-installer>`
|
||||
* :doc:`Install ScyllaDB Linux Packages </getting-started/install-scylla/install-on-linux>`
|
||||
* :doc:`Install ScyllaDB |CURRENT_VERSION| Linux Packages </getting-started/install-scylla/install-on-linux>`
|
||||
* :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
|
||||
* :doc:`Install ScyllaDB Without root Privileges </getting-started/installation-common/unified-installer>`
|
||||
* :doc:`Air-gapped Server Installation </getting-started/installation-common/air-gapped-install>`
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
.. |RHEL_EPEL_8| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm
|
||||
.. |RHEL_EPEL_9| replace:: https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
|
||||
|
||||
======================================
|
||||
Install ScyllaDB Linux Packages
|
||||
======================================
|
||||
========================================================
|
||||
Install ScyllaDB |CURRENT_VERSION| Linux Packages
|
||||
========================================================
|
||||
|
||||
We recommend installing ScyllaDB using :doc:`ScyllaDB Web Installer for Linux </getting-started/installation-common/scylla-web-installer/>`,
|
||||
a platform-agnostic installation script, to install ScyllaDB on any supported Linux platform.
|
||||
@@ -46,8 +46,8 @@ Install ScyllaDB
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys a43e06657bac99e3
|
||||
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --export --armor a43e06657bac99e3 | gpg --dearmor > /etc/apt/keyrings/scylladb.gpg
|
||||
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys c503c686b007f39e
|
||||
sudo gpg --homedir /tmp --no-default-keyring --keyring /tmp/temp.gpg --export --armor c503c686b007f39e | gpg --dearmor > /etc/apt/keyrings/scylladb.gpg
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
==========================
|
||||
Launch ScyllaDB on AWS
|
||||
==========================
|
||||
===============================================
|
||||
Launch ScyllaDB |CURRENT_VERSION| on AWS
|
||||
===============================================
|
||||
|
||||
This article will guide you through self-managed ScyllaDB deployment on AWS. For a fully-managed deployment of ScyllaDB
|
||||
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
==========================
|
||||
Launch ScyllaDB on Azure
|
||||
==========================
|
||||
===============================================
|
||||
Launch ScyllaDB |CURRENT_VERSION| on Azure
|
||||
===============================================
|
||||
|
||||
This article will guide you through self-managed ScyllaDB deployment on Azure. For a fully-managed deployment of ScyllaDB
|
||||
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
==========================
|
||||
Launch ScyllaDB on GCP
|
||||
==========================
|
||||
=============================================
|
||||
Launch ScyllaDB |CURRENT_VERSION| on GCP
|
||||
=============================================
|
||||
|
||||
This article will guide you through self-managed ScyllaDB deployment on GCP. For a fully-managed deployment of ScyllaDB
|
||||
as-a-service, see `ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/>`_.
|
||||
|
||||
@@ -58,4 +58,12 @@ See also
|
||||
|
||||
* `Blog: ScyllaDB Open Source 3.1: Efficiently Maintaining Consistency with Row-Level Repair <https://www.scylladb.com/2019/08/13/scylla-open-source-3-1-efficiently-maintaining-consistency-with-row-level-repair/>`_
|
||||
|
||||
Incremental Repair
|
||||
------------------
|
||||
|
||||
Built on top of `Row-level Repair <row-level-repair_>`_ and `Tablets </architecture/tablets>`_, Incremental Repair enables frequent and quick repairs. For more details, see `Incremental Repair </features/incremental-repair>`_.
|
||||
|
||||
Automatic Repair
|
||||
----------------
|
||||
|
||||
Built on top of `Incremental Repair </features/incremental-repair>`_, `Automatic Repair </features/automatic-repair>`_ offers repair scheduling and execution directly in ScyllaDB, without external processes.
|
||||
|
||||
@@ -8,7 +8,6 @@ Troubleshooting ScyllaDB
|
||||
|
||||
support/index
|
||||
startup/index
|
||||
upgrade/index
|
||||
cluster/index
|
||||
modeling/index
|
||||
storage/index
|
||||
@@ -29,7 +28,6 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
|
||||
* :doc:`Errors and ScyllaDB Customer Support <support/index>`
|
||||
* :doc:`ScyllaDB Startup <startup/index>`
|
||||
* :doc:`ScyllaDB Cluster and Node <cluster/index>`
|
||||
* :doc:`ScyllaDB Upgrade <upgrade/index>`
|
||||
* :doc:`Data Modeling <modeling/index>`
|
||||
* :doc:`Data Storage and SSTables <storage/index>`
|
||||
* :doc:`CQL errors <CQL/index>`
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
Inaccessible "/var/lib/scylla" and "/var/lib/systemd/coredump" after ScyllaDB upgrade
|
||||
======================================================================================
|
||||
|
||||
Problem
|
||||
^^^^^^^
|
||||
When you reboot the machine after a ScyllaDB upgrade, you cannot access data directories under ``/var/lib/scylla``, and
|
||||
coredump saves to ``rootfs``.
|
||||
|
||||
|
||||
The problem may occur when you upgrade ScylaDB Open Source 4.6 or later to a version of ScyllaDB Enterprise if
|
||||
the ``/etc/systemd/system/var-lib-scylla.mount`` and ``/etc/systemd/system/var-lib-systemd-coredump.mount`` are
|
||||
deleted by RPM.
|
||||
|
||||
To avoid losing the files, the upgrade procedure includes a step to backup the .mount files. The following
|
||||
example shows the command to backup the files before the upgrade from version 5.0:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
for conf in $( rpm -qc $(rpm -qa | grep scylla) | grep -v contains ) /etc/systemd/system/{var-lib-scylla,var-lib-systemd-coredump}.mount; do sudo cp -v $conf $conf.backup-5.0; done
|
||||
|
||||
If you don't backup the .mount files before the upgrade, the files may be lost.
|
||||
|
||||
|
||||
Solution
|
||||
^^^^^^^^
|
||||
|
||||
If you didn't backup the .mount files before the upgrade and the files were deleted during the upgrade,
|
||||
you need to restore them manually.
|
||||
|
||||
To restore ``/etc/systemd/system/var-lib-systemd-coredump.mount``, run the following:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ cat << EOS | sudo tee /etc/systemd/system/var-lib-systemd-coredump.mount
|
||||
[Unit]
|
||||
Description=Save coredump to scylla data directory
|
||||
Conflicts=umount.target
|
||||
Before=scylla-server.service
|
||||
After=local-fs.target
|
||||
DefaultDependencies=no
|
||||
[Mount]
|
||||
What=/var/lib/scylla/coredump
|
||||
Where=/var/lib/systemd/coredump
|
||||
Type=none
|
||||
Options=bind
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
EOS
|
||||
|
||||
To restore ``/etc/systemd/system/var-lib-scylla.mount``, run the following (specifying your data disk):
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ UUID=`blkid -s UUID -o value <specify your data disk, eg: /dev/md0>`
|
||||
$ cat << EOS | sudo tee /etc/systemd/system/var-lib-scylla.mount
|
||||
[Unit]
|
||||
Description=ScyllaDB data directory
|
||||
Before=scylla-server.service
|
||||
After=local-fs.target
|
||||
DefaultDependencies=no
|
||||
[Mount]
|
||||
What=/dev/disk/by-uuid/$UUID
|
||||
Where=/var/lib/scylla
|
||||
Type=xfs
|
||||
Options=noatime
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
EOS
|
||||
|
||||
After restoring .mount files, you need to enable them:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ sudo systemctl daemon-reload
|
||||
$ sudo systemctl enable --now var-lib-scylla.mount
|
||||
$ sudo systemctl enable --now var-lib-systemd-coredump.mount
|
||||
|
||||
|
||||
.. include:: /troubleshooting/_common/ts-return.rst
|
||||
@@ -1,16 +0,0 @@
|
||||
Upgrade
|
||||
=================
|
||||
|
||||
.. toctree::
|
||||
:hidden:
|
||||
:maxdepth: 2
|
||||
|
||||
Inaccessible configuration files after ScyllaDB upgrade </troubleshooting/missing-dotmount-files>
|
||||
|
||||
.. panel-box::
|
||||
:title: Upgrade Issues
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* :doc:`Inaccessible "/var/lib/scylla" and "/var/lib/systemd/coredump" after ScyllaDB upgrade </troubleshooting//missing-dotmount-files>`
|
||||
|
||||
@@ -17,11 +17,11 @@
|
||||
#include "index/secondary_index.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
|
||||
namespace secondary_index {
|
||||
|
||||
static void validate_positive_option(int max, const sstring& value_name, const sstring& value) {
|
||||
@@ -147,17 +147,88 @@ std::optional<cql3::description> vector_index::describe(const index_metadata& im
|
||||
}
|
||||
|
||||
void vector_index::check_target(const schema& schema, const std::vector<::shared_ptr<cql3::statements::index_target>>& targets) const {
|
||||
if (targets.size() != 1) {
|
||||
throw exceptions::invalid_request_exception("Vector index can only be created on a single column");
|
||||
}
|
||||
auto target = targets[0];
|
||||
auto c_def = schema.get_column_definition(to_bytes(target->column_name()));
|
||||
if (!c_def) {
|
||||
throw exceptions::invalid_request_exception(format("Column {} not found in schema", target->column_name()));
|
||||
}
|
||||
auto type = c_def->type;
|
||||
if (!type->is_vector() || static_cast<const vector_type_impl*>(type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception(format("Vector indexes are only supported on columns of vectors of floats", target->column_name()));
|
||||
|
||||
struct validate_visitor {
|
||||
const class schema& schema;
|
||||
bool& is_vector;
|
||||
|
||||
/// Vector indexes support filtering on native types that can be used as primary key columns.
|
||||
/// There is no counter (it cannot be used with vector columns)
|
||||
/// and no duration (it cannot be used as a primary key or in secondary indexes).
|
||||
static bool is_supported_filtering_column(abstract_type const & kind_type) {
|
||||
switch (kind_type.get_kind()) {
|
||||
case abstract_type::kind::ascii:
|
||||
case abstract_type::kind::boolean:
|
||||
case abstract_type::kind::byte:
|
||||
case abstract_type::kind::bytes:
|
||||
case abstract_type::kind::date:
|
||||
case abstract_type::kind::decimal:
|
||||
case abstract_type::kind::double_kind:
|
||||
case abstract_type::kind::float_kind:
|
||||
case abstract_type::kind::inet:
|
||||
case abstract_type::kind::int32:
|
||||
case abstract_type::kind::long_kind:
|
||||
case abstract_type::kind::short_kind:
|
||||
case abstract_type::kind::simple_date:
|
||||
case abstract_type::kind::time:
|
||||
case abstract_type::kind::timestamp:
|
||||
case abstract_type::kind::timeuuid:
|
||||
case abstract_type::kind::utf8:
|
||||
case abstract_type::kind::uuid:
|
||||
case abstract_type::kind::varint:
|
||||
return true;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void validate(cql3::column_identifier const& column, bool is_vector) const {
|
||||
auto const& c_name = column.to_string();
|
||||
auto const* c_def = schema.get_column_definition(column.name());
|
||||
if (c_def == nullptr) {
|
||||
throw exceptions::invalid_request_exception(format("Column {} not found in schema", c_name));
|
||||
}
|
||||
|
||||
auto type = c_def->type;
|
||||
|
||||
if (is_vector) {
|
||||
auto const* vector_type = dynamic_cast<const vector_type_impl*>(type.get());
|
||||
if (vector_type == nullptr) {
|
||||
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
|
||||
}
|
||||
|
||||
auto elements_type = vector_type->get_elements_type();
|
||||
if (elements_type->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!is_supported_filtering_column(*type)) {
|
||||
throw exceptions::invalid_request_exception(format("Unsupported vector index filtering column {} type", c_name));
|
||||
}
|
||||
}
|
||||
|
||||
void operator()(const std::vector<::shared_ptr<cql3::column_identifier>>& columns) const {
|
||||
for (const auto& column : columns) {
|
||||
// CQL restricts the secondary local index to have multiple columns with partition key only.
|
||||
// Vectors shouldn't be partition key columns and they aren't supported as a filtering column,
|
||||
// so we can assume here that these are non-vectors filtering columns.
|
||||
validate(*column, false);
|
||||
}
|
||||
}
|
||||
|
||||
void operator()(const ::shared_ptr<cql3::column_identifier>& column) {
|
||||
validate(*column, is_vector);
|
||||
// The first column is the vector column, the rest mustn't be vectors.
|
||||
is_vector = false;
|
||||
}
|
||||
};
|
||||
|
||||
bool is_vector = true;
|
||||
for (const auto& target : targets) {
|
||||
std::visit(validate_visitor{.schema = schema, .is_vector = is_vector}, target->value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:a4710f1f0b0bb329721c21d133618e811e820f2e70553b0aca28fb278bff89c9
|
||||
size 6492280
|
||||
oid sha256:a7c482a396374b635341f7923969ac0b649bda69810b12de22407938bb6505f7
|
||||
size 6524480
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:2433f7a1fc5cda0dd990ab59587eb6046dca0fe1ae48d599953d1936fe014ed9
|
||||
size 6492176
|
||||
oid sha256:f0303b6705733d1d236700c3e36652c97eb02e8e78b2e04e8008dffd23804759
|
||||
size 6526408
|
||||
|
||||
@@ -90,14 +90,14 @@ load_balancer_stats_manager::load_balancer_stats_manager(sstring group_name):
|
||||
setup_metrics(_cluster_stats);
|
||||
}
|
||||
|
||||
load_balancer_dc_stats& load_balancer_stats_manager::for_dc(const dc_name& dc) {
|
||||
const lw_shared_ptr<load_balancer_dc_stats>& load_balancer_stats_manager::for_dc(const dc_name& dc) {
|
||||
auto it = _dc_stats.find(dc);
|
||||
if (it == _dc_stats.end()) {
|
||||
auto stats = std::make_unique<load_balancer_dc_stats>();
|
||||
auto stats = make_lw_shared<load_balancer_dc_stats>();
|
||||
setup_metrics(dc, *stats);
|
||||
it = _dc_stats.emplace(dc, std::move(stats)).first;
|
||||
}
|
||||
return *it->second;
|
||||
return it->second;
|
||||
}
|
||||
|
||||
load_balancer_node_stats& load_balancer_stats_manager::for_node(const dc_name& dc, host_id node) {
|
||||
@@ -149,22 +149,22 @@ db::tablet_options combine_tablet_options(R&& opts) {
|
||||
|
||||
static std::unordered_set<locator::tablet_id> split_string_to_tablet_id(std::string_view s, char delimiter) {
|
||||
auto tokens_view = s | std::views::split(delimiter)
|
||||
| std::views::transform([](auto&& range) {
|
||||
return std::string_view(&*range.begin(), std::ranges::distance(range));
|
||||
})
|
||||
| std::views::transform([](std::string_view sv) {
|
||||
return locator::tablet_id(std::stoul(std::string(sv)));
|
||||
});
|
||||
| std::views::transform([](auto&& range) {
|
||||
return std::string_view(&*range.begin(), std::ranges::distance(range));
|
||||
})
|
||||
| std::views::transform([](std::string_view sv) {
|
||||
return locator::tablet_id(std::stoul(std::string(sv)));
|
||||
});
|
||||
return std::unordered_set<locator::tablet_id>{tokens_view.begin(), tokens_view.end()};
|
||||
}
|
||||
|
||||
struct repair_plan {
|
||||
locator::global_tablet_id gid;
|
||||
locator::tablet_info tinfo;
|
||||
dht::token_range range;
|
||||
dht::token last_token;
|
||||
db_clock::duration repair_time_diff;
|
||||
bool is_user_reuqest;
|
||||
locator::global_tablet_id gid;
|
||||
locator::tablet_info tinfo;
|
||||
dht::token_range range;
|
||||
dht::token last_token;
|
||||
db_clock::duration repair_time_diff;
|
||||
bool is_user_reuqest;
|
||||
};
|
||||
|
||||
// Used to compare different migration choices in regard to impact on load imbalance.
|
||||
@@ -291,6 +291,12 @@ struct rack_list_colocation_state {
|
||||
}
|
||||
};
|
||||
|
||||
/// Formattable wrapper for migration_plan, whose formatter prints a short summary of the plan.
|
||||
struct plan_summary {
|
||||
migration_plan& plan;
|
||||
explicit plan_summary(migration_plan& plan) : plan(plan) {}
|
||||
};
|
||||
|
||||
future<rack_list_colocation_state> find_required_rack_list_colocations(
|
||||
replica::database& db,
|
||||
token_metadata_ptr tmptr,
|
||||
@@ -452,7 +458,36 @@ struct fmt::formatter<service::repair_plan> : fmt::formatter<std::string_view> {
|
||||
template <typename FormatContext>
|
||||
auto format(const service::repair_plan& p, FormatContext& ctx) const {
|
||||
auto diff_seconds = std::chrono::duration<float>(p.repair_time_diff).count();
|
||||
fmt::format_to(ctx.out(), "{{tablet={} last_token={} is_user_req={} diff_seconds={}}}", p.gid, p.last_token, p.is_user_reuqest, diff_seconds);
|
||||
fmt::format_to(ctx.out(), "{{tablet={} last_token={} is_user_req={} diff_seconds={}}}", p.gid, p.last_token, p.is_user_reuqest, diff_seconds);
|
||||
return ctx.out();
|
||||
}
|
||||
};
|
||||
|
||||
template<>
|
||||
struct fmt::formatter<service::plan_summary> : fmt::formatter<std::string_view> {
|
||||
template <typename FormatContext>
|
||||
auto format(const service::plan_summary& p, FormatContext& ctx) const {
|
||||
auto& plan = p.plan;
|
||||
std::string_view delim = "";
|
||||
auto get_delim = [&] { return std::exchange(delim, ", "); };
|
||||
if (plan.migrations().size()) {
|
||||
fmt::format_to(ctx.out(), "{}migrations: {}", get_delim(), plan.migrations().size());
|
||||
}
|
||||
if (plan.repair_plan().repairs().size()) {
|
||||
fmt::format_to(ctx.out(), "{}repairs: {}", get_delim(), plan.repair_plan().repairs().size());
|
||||
}
|
||||
if (plan.resize_plan().resize.size()) {
|
||||
fmt::format_to(ctx.out(), "{}resize: {}", get_delim(), plan.resize_plan().resize.size());
|
||||
}
|
||||
if (plan.resize_plan().finalize_resize.size()) {
|
||||
fmt::format_to(ctx.out(), "{}resize-ready: {}", get_delim(), plan.resize_plan().finalize_resize.size());
|
||||
}
|
||||
if (plan.rack_list_colocation_plan().size()) {
|
||||
fmt::format_to(ctx.out(), "{}rack-list colocation ready: {}", get_delim(), plan.rack_list_colocation_plan().request_to_resume());
|
||||
}
|
||||
if (delim.empty()) {
|
||||
fmt::format_to(ctx.out(), "empty");
|
||||
}
|
||||
return ctx.out();
|
||||
}
|
||||
};
|
||||
@@ -868,9 +903,12 @@ class load_balancer {
|
||||
absl::flat_hash_map<table_id, uint64_t> _disk_used_per_table;
|
||||
dc_name _dc;
|
||||
std::optional<sstring> _rack; // Set when plan making is limited to a single rack.
|
||||
sstring _location; // Name of the current scope of plan making. DC or DC+rack.
|
||||
lw_shared_ptr<load_balancer_dc_stats> _current_stats; // Stats for current scope of plan making.
|
||||
size_t _total_capacity_shards; // Total number of non-drained shards in the balanced node set.
|
||||
size_t _total_capacity_nodes; // Total number of non-drained nodes in the balanced node set.
|
||||
uint64_t _total_capacity_storage; // Total storage of non-drained nodes in the balanced node set.
|
||||
size_t _migrating_candidates; // Number of candidate replicas skipped because tablet is migrating.
|
||||
locator::load_stats_ptr _table_load_stats;
|
||||
load_balancer_stats_manager& _stats;
|
||||
std::unordered_set<host_id> _skiplist;
|
||||
@@ -995,22 +1033,21 @@ public:
|
||||
migration_plan plan;
|
||||
|
||||
auto rack_list_colocation = ongoing_rack_list_colocation();
|
||||
if (!utils::get_local_injector().enter("tablet_migration_bypass")) {
|
||||
// Prepare plans for each DC separately and combine them to be executed in parallel.
|
||||
for (auto&& dc : topo.get_datacenters()) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) {
|
||||
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
||||
auto rack_plan = co_await make_plan(dc, rack);
|
||||
auto level = rack_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migrations in rack {} in DC {}", rack_plan.size(), rack, dc);
|
||||
plan.merge(std::move(rack_plan));
|
||||
}
|
||||
} else {
|
||||
auto dc_plan = co_await make_plan(dc);
|
||||
auto level = dc_plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migrations in DC {}", dc_plan.size(), dc);
|
||||
plan.merge(std::move(dc_plan));
|
||||
|
||||
// Prepare plans for each DC separately and combine them to be executed in parallel.
|
||||
for (auto&& dc : topo.get_datacenters()) {
|
||||
if (_db.get_config().rf_rack_valid_keyspaces() || _db.get_config().enforce_rack_list() || rack_list_colocation) {
|
||||
for (auto rack : topo.get_datacenter_racks().at(dc) | std::views::keys) {
|
||||
auto rack_plan = co_await make_plan(dc, rack);
|
||||
auto level = rack_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
||||
lblogger.log(level, "Plan for {}/{}: {}", dc, rack, plan_summary(rack_plan));
|
||||
plan.merge(std::move(rack_plan));
|
||||
}
|
||||
} else {
|
||||
auto dc_plan = co_await make_plan(dc);
|
||||
auto level = dc_plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
||||
lblogger.log(level, "Plan for {}: {}", dc, plan_summary(dc_plan));
|
||||
plan.merge(std::move(dc_plan));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1027,9 +1064,8 @@ public:
|
||||
plan.set_repair_plan(co_await make_repair_plan(plan));
|
||||
}
|
||||
|
||||
auto level = plan.size() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migration plans, out of which there were {} tablet migration(s) and {} resize decision(s) and {} tablet repair(s) and {} rack-list colocation(s)",
|
||||
plan.size(), plan.tablet_migration_count(), plan.resize_decision_count(), plan.tablet_repair_count(), plan.tablet_rack_list_colocation_count());
|
||||
auto level = plan.empty() ? seastar::log_level::debug : seastar::log_level::info;
|
||||
lblogger.log(level, "Prepared plan: {}", plan_summary(plan));
|
||||
co_return std::move(plan);
|
||||
}
|
||||
|
||||
@@ -1408,7 +1444,7 @@ public:
|
||||
co_return all_colocated;
|
||||
}
|
||||
|
||||
future<migration_plan> make_merge_colocation_plan(const dc_name& dc, node_load_map& nodes) {
|
||||
future<migration_plan> make_merge_colocation_plan(node_load_map& nodes) {
|
||||
migration_plan plan;
|
||||
table_resize_plan resize_plan;
|
||||
|
||||
@@ -1565,7 +1601,7 @@ public:
|
||||
if (cross_rack_migration(src, dst)) {
|
||||
// FIXME: This is illegal if table has views, as it breaks base-view pairing.
|
||||
// Can happen when RF!=#racks.
|
||||
_stats.for_dc(_dc).cross_rack_collocations++;
|
||||
_current_stats->cross_rack_collocations++;
|
||||
lblogger.debug("Cross-rack co-location migration for {}@{} (rack: {}) to co-habit {}@{} (rack: {})",
|
||||
t2_id, src, rack_of(src), t1_id, dst, rack_of(dst));
|
||||
utils::get_local_injector().inject("forbid_cross_rack_migration_attempt", [&] {
|
||||
@@ -2215,7 +2251,7 @@ public:
|
||||
|
||||
// Evaluates impact on load balance of migrating a tablet set of a given table to dst.
|
||||
migration_badness evaluate_dst_badness(node_load_map& nodes, table_id table, tablet_replica dst, uint64_t tablet_set_disk_size) {
|
||||
_stats.for_dc(_dc).candidates_evaluated++;
|
||||
_current_stats->candidates_evaluated++;
|
||||
|
||||
auto& node_info = nodes[dst.host];
|
||||
|
||||
@@ -2254,7 +2290,7 @@ public:
|
||||
|
||||
// Evaluates impact on load balance of migrating a tablet set of a given table from src.
|
||||
migration_badness evaluate_src_badness(node_load_map& nodes, table_id table, tablet_replica src, uint64_t tablet_set_disk_size) {
|
||||
_stats.for_dc(_dc).candidates_evaluated++;
|
||||
_current_stats->candidates_evaluated++;
|
||||
|
||||
auto& node_info = nodes[src.host];
|
||||
|
||||
@@ -2603,15 +2639,15 @@ public:
|
||||
auto mig_streaming_info = get_migration_streaming_infos(_tm->get_topology(), tmap, mig);
|
||||
|
||||
if (!can_accept_load(nodes, mig_streaming_info)) {
|
||||
_stats.for_dc(node_load.dc()).migrations_skipped++;
|
||||
_current_stats->migrations_skipped++;
|
||||
lblogger.debug("Unable to balance {}: load limit reached", host);
|
||||
break;
|
||||
}
|
||||
|
||||
apply_load(nodes, mig_streaming_info);
|
||||
lblogger.debug("Adding migration: {} size: {}", mig, tablets.tablet_set_disk_size);
|
||||
_stats.for_dc(node_load.dc()).migrations_produced++;
|
||||
_stats.for_dc(node_load.dc()).intranode_migrations_produced++;
|
||||
_current_stats->migrations_produced++;
|
||||
_current_stats->intranode_migrations_produced++;
|
||||
mark_as_scheduled(mig);
|
||||
plan.add(std::move(mig));
|
||||
|
||||
@@ -2718,21 +2754,21 @@ public:
|
||||
auto targets = get_viable_targets();
|
||||
if (rs->is_rack_based(_dc)) {
|
||||
lblogger.debug("candidate tablet {} skipped because RF is rack-based and it's in a different rack", tablet);
|
||||
_stats.for_dc(src_info.dc()).tablets_skipped_rack++;
|
||||
_current_stats->tablets_skipped_rack++;
|
||||
return skip_info{std::move(targets)};
|
||||
}
|
||||
if (!targets.contains(dst_info.id)) {
|
||||
auto new_rack_load = rack_load[dst_info.rack()] + 1;
|
||||
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
|
||||
tablet, dst_info.rack(), new_rack_load, max_rack_load);
|
||||
_stats.for_dc(src_info.dc()).tablets_skipped_rack++;
|
||||
_current_stats->tablets_skipped_rack++;
|
||||
return skip_info{std::move(targets)};
|
||||
}
|
||||
}
|
||||
|
||||
for (auto&& r : tmap.get_tablet_info(tablet.tablet).replicas) {
|
||||
if (r.host == dst_info.id) {
|
||||
_stats.for_dc(src_info.dc()).tablets_skipped_node++;
|
||||
_current_stats->tablets_skipped_node++;
|
||||
lblogger.debug("candidate tablet {} skipped because it has a replica on target node", tablet);
|
||||
if (need_viable_targets) {
|
||||
return skip_info{get_viable_targets()};
|
||||
@@ -2939,7 +2975,7 @@ public:
|
||||
};
|
||||
|
||||
if (min_candidate.badness.is_bad() && _use_table_aware_balancing) {
|
||||
_stats.for_dc(_dc).bad_first_candidates++;
|
||||
_current_stats->bad_first_candidates++;
|
||||
|
||||
// Consider better alternatives.
|
||||
if (drain_skipped) {
|
||||
@@ -3060,7 +3096,7 @@ public:
|
||||
lblogger.debug("Table {} shard overcommit: {}", table, overcommit);
|
||||
}
|
||||
|
||||
future<migration_plan> make_internode_plan(const dc_name& dc, node_load_map& nodes,
|
||||
future<migration_plan> make_internode_plan(node_load_map& nodes,
|
||||
const std::unordered_set<host_id>& nodes_to_drain,
|
||||
host_id target) {
|
||||
migration_plan plan;
|
||||
@@ -3120,7 +3156,7 @@ public:
|
||||
|
||||
if (nodes_by_load.empty()) {
|
||||
lblogger.debug("No more candidate nodes");
|
||||
_stats.for_dc(dc).stop_no_candidates++;
|
||||
_current_stats->stop_no_candidates++;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -3191,7 +3227,7 @@ public:
|
||||
|
||||
if (nodes_by_load_dst.empty()) {
|
||||
lblogger.debug("No more target nodes");
|
||||
_stats.for_dc(dc).stop_no_candidates++;
|
||||
_current_stats->stop_no_candidates++;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -3221,7 +3257,7 @@ public:
|
||||
const load_type max_load = std::max(max_off_candidate_load, src_node_info.avg_load);
|
||||
if (is_balanced(target_info.avg_load, max_load)) {
|
||||
lblogger.debug("Balance achieved.");
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
_current_stats->stop_balance++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -3255,7 +3291,7 @@ public:
|
||||
auto& tmap = tmeta.get_tablet_map(source_tablets.table());
|
||||
if (can_check_convergence && !check_convergence(src_node_info, target_info, source_tablets)) {
|
||||
lblogger.debug("No more candidates. Load would be inverted.");
|
||||
_stats.for_dc(dc).stop_load_inversion++;
|
||||
_current_stats->stop_load_inversion++;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -3289,11 +3325,11 @@ public:
|
||||
}
|
||||
}
|
||||
if (candidate.badness.is_bad()) {
|
||||
_stats.for_dc(_dc).bad_migrations++;
|
||||
_current_stats->bad_migrations++;
|
||||
}
|
||||
|
||||
if (drain_skipped) {
|
||||
_stats.for_dc(_dc).migrations_from_skiplist++;
|
||||
_current_stats->migrations_from_skiplist++;
|
||||
}
|
||||
|
||||
if (src_node_info.req && *src_node_info.req == topology_request::leave && src_node_info.excluded) {
|
||||
@@ -3313,7 +3349,7 @@ public:
|
||||
if (can_accept_load(nodes, mig_streaming_info)) {
|
||||
apply_load(nodes, mig_streaming_info);
|
||||
lblogger.debug("Adding migration: {} size: {}", mig, source_tablets.tablet_set_disk_size);
|
||||
_stats.for_dc(dc).migrations_produced++;
|
||||
_current_stats->migrations_produced++;
|
||||
mark_as_scheduled(mig);
|
||||
plan.add(std::move(mig));
|
||||
} else {
|
||||
@@ -3324,10 +3360,10 @@ public:
|
||||
// Just because the next migration is blocked doesn't mean we could not proceed with migrations
|
||||
// for other shards which are produced by the planner subsequently.
|
||||
skipped_migrations++;
|
||||
_stats.for_dc(dc).migrations_skipped++;
|
||||
_current_stats->migrations_skipped++;
|
||||
if (skipped_migrations >= max_skipped_migrations) {
|
||||
lblogger.debug("Too many migrations skipped, aborting balancing");
|
||||
_stats.for_dc(dc).stop_skip_limit++;
|
||||
_current_stats->stop_skip_limit++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -3346,7 +3382,7 @@ public:
|
||||
}
|
||||
|
||||
if (plan.size() == batch_size) {
|
||||
_stats.for_dc(dc).stop_batch_size++;
|
||||
_current_stats->stop_batch_size++;
|
||||
}
|
||||
|
||||
if (plan.empty()) {
|
||||
@@ -3363,7 +3399,13 @@ public:
|
||||
// If there are 7 tablets and RF=3, each node must have 1 tablet replica.
|
||||
// So node3 will have average load of 1, and node1 and node2 will have
|
||||
// average shard load of 7.
|
||||
lblogger.info("Not possible to achieve balance.");
|
||||
|
||||
// Show when this is the final plan with no active migrations left to execute,
|
||||
// otherwise it may just be a temporary situation due to lack of candidates.
|
||||
if (_migrating_candidates == 0) {
|
||||
lblogger.info("Not possible to achieve balance in {}", _location);
|
||||
print_node_stats(nodes, only_active::no);
|
||||
}
|
||||
}
|
||||
|
||||
co_return std::move(plan);
|
||||
@@ -3420,11 +3462,37 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
using only_active = bool_class<struct only_active_tag>;
|
||||
|
||||
void print_node_stats(node_load_map& nodes, only_active only_active_) {
|
||||
for (auto&& [host, load] : nodes) {
|
||||
size_t read = 0;
|
||||
size_t write = 0;
|
||||
for (auto& shard_load : load.shards) {
|
||||
read += shard_load.streaming_read_load;
|
||||
write += shard_load.streaming_write_load;
|
||||
}
|
||||
auto level = !only_active_ || (read + write) > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Node {}: {}/{} load={:.6f} tablets={} shards={} tablets/shard={:.3f} state={} cap={}"
|
||||
" rd={} wr={}",
|
||||
host, load.dc(), load.rack(), load.avg_load, load.tablet_count, load.shard_count,
|
||||
load.tablets_per_shard(), load.state(), load.dusage->capacity, read, write);
|
||||
}
|
||||
}
|
||||
|
||||
future<migration_plan> make_plan(dc_name dc, std::optional<sstring> rack = std::nullopt) {
|
||||
migration_plan plan;
|
||||
|
||||
if (utils::get_local_injector().enter("tablet_migration_bypass")) {
|
||||
co_return std::move(plan);
|
||||
}
|
||||
|
||||
_dc = dc;
|
||||
_rack = rack;
|
||||
_location = fmt::format("{}{}", dc, rack ? fmt::format("/{}", *rack) : "");
|
||||
_current_stats = _stats.for_dc(dc);
|
||||
auto _ = seastar::defer([&] { _current_stats = nullptr; });
|
||||
_migrating_candidates = 0;
|
||||
|
||||
auto node_filter = [&] (const locator::node& node) {
|
||||
return node.dc_rack().dc == dc && (!rack || node.dc_rack().rack == *rack);
|
||||
@@ -3433,7 +3501,7 @@ public:
|
||||
// Causes load balancer to move some tablet even though load is balanced.
|
||||
auto shuffle = in_shuffle_mode();
|
||||
|
||||
_stats.for_dc(dc).calls++;
|
||||
_current_stats->calls++;
|
||||
lblogger.debug("Examining DC {} rack {} (shuffle={}, balancing={}, tablets_per_shard_goal={}, force_capacity_based_balancing={})",
|
||||
dc, rack, shuffle, _tm->tablets().balancing_enabled(), _tablets_per_shard_goal, _force_capacity_based_balancing);
|
||||
|
||||
@@ -3529,7 +3597,7 @@ public:
|
||||
|
||||
if (nodes.empty()) {
|
||||
lblogger.debug("No nodes to balance.");
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
_current_stats->stop_balance++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
@@ -3552,15 +3620,23 @@ public:
|
||||
|
||||
// If we don't have nodes to drain, remove nodes which don't have complete tablet sizes
|
||||
if (nodes_to_drain.empty()) {
|
||||
std::optional<host_id> incomplete_host;
|
||||
size_t incomplete_count = 0;
|
||||
|
||||
for (auto nodes_i = nodes.begin(); nodes_i != nodes.end();) {
|
||||
host_id host = nodes_i->first;
|
||||
if (!_load_sketch->has_complete_data(host)) {
|
||||
lblogger.info("Node {} does not have complete tablet stats, ignoring", nodes_i->first);
|
||||
incomplete_host.emplace(host);
|
||||
incomplete_count++;
|
||||
nodes_i = nodes.erase(nodes_i);
|
||||
} else {
|
||||
++nodes_i;
|
||||
}
|
||||
}
|
||||
|
||||
if (incomplete_host) {
|
||||
lblogger.info("Ignoring {} node(s) with incomplete tablet stats, e.g. {}", incomplete_count, *incomplete_host);
|
||||
}
|
||||
}
|
||||
|
||||
plan.set_has_nodes_to_drain(!nodes_to_drain.empty());
|
||||
@@ -3594,11 +3670,11 @@ public:
|
||||
});
|
||||
if (!has_dest_nodes) {
|
||||
for (auto host : nodes_to_drain) {
|
||||
plan.add(drain_failure(host, format("No candidate nodes in DC {} to drain {}."
|
||||
" Consider adding new nodes or reducing replication factor.", dc, host)));
|
||||
plan.add(drain_failure(host, format("No candidate nodes in {} to drain {}."
|
||||
" Consider adding new nodes or reducing replication factor.", _location, host)));
|
||||
}
|
||||
lblogger.debug("No candidate nodes");
|
||||
_stats.for_dc(dc).stop_no_candidates++;
|
||||
_current_stats->stop_no_candidates++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
@@ -3704,6 +3780,8 @@ public:
|
||||
if (!migrating(t1) && !migrating(t2)) {
|
||||
auto candidate = colocated_tablets{global_tablet_id{table, t1.tid}, global_tablet_id{table, t2->tid}};
|
||||
add_candidate(shard_load_info, migration_tablet_set{std::move(candidate), tablet_sizes_sum});
|
||||
} else {
|
||||
_migrating_candidates++;
|
||||
}
|
||||
} else {
|
||||
if (tids.size() != tablet_sizes.size()) {
|
||||
@@ -3712,6 +3790,8 @@ public:
|
||||
for (size_t i = 0; i < tids.size(); i++) {
|
||||
if (!migrating(get_table_desc(tids[i]))) { // migrating tablets are not candidates
|
||||
add_candidate(shard_load_info, migration_tablet_set{global_tablet_id{table, tids[i]}, tablet_sizes[i]});
|
||||
} else {
|
||||
_migrating_candidates++;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3749,26 +3829,14 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
for (auto&& [host, load] : nodes) {
|
||||
size_t read = 0;
|
||||
size_t write = 0;
|
||||
for (auto& shard_load : load.shards) {
|
||||
read += shard_load.streaming_read_load;
|
||||
write += shard_load.streaming_write_load;
|
||||
}
|
||||
auto level = (read + write) > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Node {}: dc={} rack={} load={} tablets={} shards={} tablets/shard={} state={} cap={}"
|
||||
" stream_read={} stream_write={}",
|
||||
host, dc, load.rack(), load.avg_load, load.tablet_count, load.shard_count,
|
||||
load.tablets_per_shard(), load.state(), load.dusage->capacity, read, write);
|
||||
}
|
||||
print_node_stats(nodes, only_active::yes);
|
||||
|
||||
if (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || !is_balanced(min_load, max_load)))) {
|
||||
host_id target = *min_load_node;
|
||||
lblogger.info("target node: {}, avg_load: {}, max: {}", target, min_load, max_load);
|
||||
plan.merge(co_await make_internode_plan(dc, nodes, nodes_to_drain, target));
|
||||
plan.merge(co_await make_internode_plan(nodes, nodes_to_drain, target));
|
||||
} else {
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
_current_stats->stop_balance++;
|
||||
}
|
||||
|
||||
if (_tm->tablets().balancing_enabled()) {
|
||||
@@ -3776,9 +3844,9 @@ public:
|
||||
}
|
||||
|
||||
if (_tm->tablets().balancing_enabled() && plan.empty() && !ongoing_rack_list_colocation()) {
|
||||
auto dc_merge_plan = co_await make_merge_colocation_plan(dc, nodes);
|
||||
auto dc_merge_plan = co_await make_merge_colocation_plan(nodes);
|
||||
auto level = dc_merge_plan.tablet_migration_count() > 0 ? seastar::log_level::info : seastar::log_level::debug;
|
||||
lblogger.log(level, "Prepared {} migrations for co-locating sibling tablets in DC {}", dc_merge_plan.tablet_migration_count(), dc);
|
||||
lblogger.log(level, "Prepared {} migrations for co-locating sibling tablets in {}", dc_merge_plan.tablet_migration_count(), _location);
|
||||
plan.merge(std::move(dc_merge_plan));
|
||||
}
|
||||
|
||||
|
||||
@@ -100,7 +100,7 @@ class load_balancer_stats_manager {
|
||||
using host_id = locator::host_id;
|
||||
|
||||
sstring group_name;
|
||||
std::unordered_map<dc_name, std::unique_ptr<load_balancer_dc_stats>> _dc_stats;
|
||||
std::unordered_map<dc_name, lw_shared_ptr<load_balancer_dc_stats>> _dc_stats;
|
||||
std::unordered_map<host_id, std::unique_ptr<load_balancer_node_stats>> _node_stats;
|
||||
load_balancer_cluster_stats _cluster_stats;
|
||||
seastar::metrics::label dc_label{"target_dc"};
|
||||
@@ -113,7 +113,7 @@ class load_balancer_stats_manager {
|
||||
public:
|
||||
load_balancer_stats_manager(sstring group_name);
|
||||
|
||||
load_balancer_dc_stats& for_dc(const dc_name& dc);
|
||||
const lw_shared_ptr<load_balancer_dc_stats>& for_dc(const dc_name& dc);
|
||||
load_balancer_node_stats& for_node(const dc_name& dc, host_id node);
|
||||
load_balancer_cluster_stats& for_cluster();
|
||||
|
||||
@@ -196,7 +196,7 @@ public:
|
||||
bool has_nodes_to_drain() const { return _has_nodes_to_drain; }
|
||||
|
||||
const migrations_vector& migrations() const { return _migrations; }
|
||||
bool empty() const { return _migrations.empty() && !_resize_plan.size() && !_repair_plan.size() && !_rack_list_colocation_plan.size() && _drain_failures.empty(); }
|
||||
bool empty() const { return !size(); }
|
||||
size_t size() const { return _migrations.size() + _resize_plan.size() + _repair_plan.size() + _rack_list_colocation_plan.size() + _drain_failures.size(); }
|
||||
size_t tablet_migration_count() const { return _migrations.size(); }
|
||||
size_t resize_decision_count() const { return _resize_plan.size(); }
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "utils/s3/aws_error.hh"
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/http/exception.hh>
|
||||
|
||||
enum class message_style : uint8_t { singular = 1, plural = 2 };
|
||||
|
||||
@@ -122,7 +123,7 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
|
||||
std::throw_with_nested(std::logic_error("Higher level logic_error"));
|
||||
}
|
||||
} catch (...) {
|
||||
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
|
||||
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::NETWORK_CONNECTION, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("Software caused connection abort", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
@@ -136,7 +137,7 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
|
||||
std::throw_with_nested(std::runtime_error("Higher level runtime_error"));
|
||||
}
|
||||
} catch (...) {
|
||||
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
|
||||
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("Higher level runtime_error", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
@@ -146,7 +147,7 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
|
||||
try {
|
||||
throw std::runtime_error("Something bad happened");
|
||||
} catch (...) {
|
||||
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
|
||||
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("Something bad happened", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
@@ -156,9 +157,39 @@ BOOST_AUTO_TEST_CASE(TestNestedException) {
|
||||
try {
|
||||
throw "foo";
|
||||
} catch (...) {
|
||||
auto error = aws::aws_error::from_maybe_nested_exception(std::current_exception());
|
||||
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("No error message was provided, exception content: char const*", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
}
|
||||
|
||||
// Test system_error
|
||||
try {
|
||||
throw std::system_error(std::error_code(ECONNABORTED, std::system_category()));
|
||||
} catch (...) {
|
||||
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::NETWORK_CONNECTION, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("Software caused connection abort", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
}
|
||||
|
||||
// Test aws_exception
|
||||
try {
|
||||
throw aws::aws_exception(aws::aws_error::get_errors().at("HTTP_TOO_MANY_REQUESTS"));
|
||||
} catch (...) {
|
||||
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::HTTP_TOO_MANY_REQUESTS, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
}
|
||||
|
||||
// Test httpd::unexpected_status_error
|
||||
try {
|
||||
throw seastar::httpd::unexpected_status_error(seastar::http::reply::status_type::network_connect_timeout);
|
||||
} catch (...) {
|
||||
auto error = aws::aws_error::from_exception_ptr(std::current_exception());
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::HTTP_NETWORK_CONNECT_TIMEOUT, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(" HTTP code: 599 Network Connect Timeout", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -767,7 +767,6 @@ void test_chunked_download_data_source(const client_maker_function& client_maker
|
||||
#endif
|
||||
|
||||
cln->delete_object(object_name).get();
|
||||
cln->close().get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_chunked_download_data_source_with_delays_minio) {
|
||||
|
||||
@@ -26,6 +26,7 @@ import pytest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.pylib.internal_types import ServerUpState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -198,8 +199,14 @@ ALTERNATOR_PROXY_SERVER_CONFIG = {
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
async def alternator_proxy_server(manager: ManagerClient):
|
||||
"""Fixture that creates a server with Alternator proxy protocol ports enabled."""
|
||||
server = await manager.server_add(config=ALTERNATOR_PROXY_SERVER_CONFIG)
|
||||
"""Fixture that creates a server with Alternator proxy protocol ports enabled.
|
||||
|
||||
Waits for SERVING state to ensure Alternator ports are ready.
|
||||
"""
|
||||
server = await manager.server_add(
|
||||
config=ALTERNATOR_PROXY_SERVER_CONFIG,
|
||||
expected_server_up_state=ServerUpState.SERVING
|
||||
)
|
||||
yield (server, manager)
|
||||
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ async def test_crashed_node_substitution(manager: ManagerClient):
|
||||
|
||||
log = await manager.server_open_log(failed_server.server_id)
|
||||
await log.wait_for("finished do_send_ack2_msg")
|
||||
failed_id = await manager.get_host_id(failed_server.server_id)
|
||||
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
|
||||
|
||||
await task
|
||||
@@ -50,7 +51,6 @@ async def test_crashed_node_substitution(manager: ManagerClient):
|
||||
[await manager.api.message_injection(s.ip_addr, 'fast_orphan_removal_fiber') for s in servers]
|
||||
|
||||
log = await manager.server_open_log(servers[0].server_id)
|
||||
failed_id = await manager.get_host_id(failed_server.server_id)
|
||||
await log.wait_for(f"Finished to force remove node {failed_id}")
|
||||
|
||||
post_wait_live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)
|
||||
|
||||
@@ -96,9 +96,8 @@ def dotestCreateAndDropIndex(cql, table, indexName, addKeyspaceOnDrop):
|
||||
f"DROP INDEX {KEYSPACE}.{indexName}")
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
# FIXME: LWT is not supported with tablets yet. See #18066
|
||||
def table1(cql, test_keyspace_vnodes):
|
||||
with create_table(cql, test_keyspace_vnodes, "(a int primary key, b int)") as table:
|
||||
def table1(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(a int primary key, b int)") as table:
|
||||
yield table
|
||||
|
||||
# Reproduces #8717 (CREATE INDEX IF NOT EXISTS was broken):
|
||||
@@ -454,7 +453,7 @@ TOO_BIG = 1024 * 65
|
||||
# Reproduces #8627
|
||||
@pytest.mark.xfail(reason="issue #8627")
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testIndexOnCompositeValueOver64k(cql, test_keyspace):
|
||||
too_big = bytearray([1])*TOO_BIG
|
||||
@@ -476,7 +475,7 @@ def testIndexOnCompositeValueOver64k(cql, test_keyspace):
|
||||
too_big)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testIndexOnPartitionKeyInsertValueOver64k(cql, test_keyspace):
|
||||
too_big = bytearray([1])*TOO_BIG
|
||||
@@ -533,7 +532,7 @@ def testIndexOnPartitionKeyWithStaticColumnAndNoRows(cql, test_keyspace):
|
||||
assert_rows(execute(cql, table, "SELECT * FROM %s WHERE pk2 = ?", 20), [1, 20, None, 9, None])
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testIndexOnClusteringColumnInsertValueOver64k(cql, test_keyspace):
|
||||
too_big = bytearray([1])*TOO_BIG
|
||||
@@ -568,7 +567,7 @@ def testIndexOnClusteringColumnInsertValueOver64k(cql, test_keyspace):
|
||||
# Reproduces #8627
|
||||
@pytest.mark.xfail(reason="issue #8627")
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testIndexOnFullCollectionEntryInsertCollectionValueOver64k(cql, test_keyspace):
|
||||
too_big = bytearray([1])*TOO_BIG
|
||||
|
||||
@@ -62,7 +62,7 @@ def testTimestampTTL(cql, test_keyspace):
|
||||
|
||||
# Migrated from cql_tests.py:TestCQL.invalid_custom_timestamp_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testInvalidCustomTimestamp(cql, test_keyspace):
|
||||
# Conditional updates
|
||||
|
||||
@@ -1239,7 +1239,7 @@ def testInsertWithCompactStorageAndTwoClusteringColumns(cql, test_keyspace, forc
|
||||
# Test for CAS with compact storage table, and CASSANDRA-6813 in particular,
|
||||
# migrated from cql_tests.py:TestCQL.cas_and_compact_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testCompactStorage(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(partition text, key text, owner text, PRIMARY KEY (partition, key)) WITH COMPACT STORAGE") as table:
|
||||
|
||||
@@ -28,7 +28,7 @@ def is_scylla(cql):
|
||||
yield any('scylla' in name for name in names)
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testInsertSetIfNotExists(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(k int PRIMARY KEY, s set<int>)") as table:
|
||||
@@ -478,7 +478,7 @@ def check_invalid_list(cql, table, condition, expected):
|
||||
|
||||
# Migrated from cql_tests.py:TestCQL.list_item_conditional_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testListItem(cql, test_keyspace):
|
||||
for frozen in [False, True]:
|
||||
@@ -505,7 +505,7 @@ def testListItem(cql, test_keyspace):
|
||||
# Test expanded functionality from CASSANDRA-6839,
|
||||
# migrated from cql_tests.py:TestCQL.expanded_list_item_conditional_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testExpandedListItem(cql, test_keyspace):
|
||||
for frozen in [False, True]:
|
||||
@@ -682,7 +682,7 @@ def testWholeMap(cql, test_keyspace):
|
||||
|
||||
# Migrated from cql_tests.py:TestCQL.map_item_conditional_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testMapItem(cql, test_keyspace):
|
||||
for frozen in [False, True]:
|
||||
@@ -711,7 +711,7 @@ def testMapItem(cql, test_keyspace):
|
||||
assert list(execute(cql, table, "UPDATE %s set m['foo'] = 'bar', m['bar'] = 'foo' WHERE k = 1 IF m[?] IN (?, ?)", "foo", "blah", None))[0][0] == True
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testFrozenWithNullValues(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, f"(k int PRIMARY KEY, m frozen<list<text>>)") as table:
|
||||
@@ -732,7 +732,7 @@ def testFrozenWithNullValues(cql, test_keyspace):
|
||||
# Test expanded functionality from CASSANDRA-6839,
|
||||
# migrated from cql_tests.py:TestCQL.expanded_map_item_conditional_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testExpandedMapItem(cql, test_keyspace):
|
||||
for frozen in [False, True]:
|
||||
|
||||
@@ -32,7 +32,7 @@ def is_scylla(cql):
|
||||
|
||||
# Migrated from cql_tests.py:TestCQL.static_columns_cas_test()
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testStaticColumnsCas(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(id int, k text, version int static, v text, PRIMARY KEY (id, k))") as table:
|
||||
@@ -153,7 +153,7 @@ def testStaticColumnsCas(cql, test_keyspace, is_scylla):
|
||||
|
||||
# Test CASSANDRA-10532
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testStaticColumnsCasDelete(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(pk int, ck int, static_col int static, value int, PRIMARY KEY (pk, ck))") as table:
|
||||
@@ -216,7 +216,7 @@ def testStaticColumnsCasDelete(cql, test_keyspace, is_scylla):
|
||||
row(1, 7, null, 8))
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testStaticColumnsCasUpdate(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(pk int, ck int, static_col int static, value int, PRIMARY KEY (pk, ck))") as table:
|
||||
@@ -271,7 +271,7 @@ def testStaticColumnsCasUpdate(cql, test_keyspace, is_scylla):
|
||||
row(1, 7, 1, 8))
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testConditionalUpdatesOnStaticColumns(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(a int, b int, s int static, d text, PRIMARY KEY (a, b))") as table:
|
||||
@@ -305,7 +305,7 @@ def testConditionalUpdatesOnStaticColumns(cql, test_keyspace, is_scylla):
|
||||
row(8, null, 8, null))
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testStaticsWithMultipleConditions(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(a int, b int, s1 int static, s2 int static, d int, PRIMARY KEY (a, b))") as table:
|
||||
@@ -343,7 +343,7 @@ def testStaticsWithMultipleConditions(cql, test_keyspace, is_scylla):
|
||||
[row(false,None,None,None,None,None),row(false,None,None,None,None,None),row(false,None,None,None,None,None),row(false,None,None,None,None,None)] if is_scylla else [row(false)])
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testStaticColumnsCasUpdateWithNullStaticColumn(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(pk int, ck int, s1 int static, s2 int static, value int, PRIMARY KEY (pk, ck))") as table:
|
||||
@@ -363,7 +363,7 @@ def testStaticColumnsCasUpdateWithNullStaticColumn(cql, test_keyspace, is_scylla
|
||||
assertRows(execute(cql, table, "SELECT * FROM %s WHERE pk = ?", 2), row(2, null, 2, 1, null))
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def testStaticColumnsCasDeleteWithNullStaticColumn(cql, test_keyspace, is_scylla):
|
||||
with create_table(cql, test_keyspace, "(pk int, ck int, s1 int static, s2 int static, value int, PRIMARY KEY (pk, ck))") as table:
|
||||
|
||||
@@ -15,10 +15,9 @@ from cassandra.protocol import InvalidRequest
|
||||
from .util import new_test_table, unique_key_int
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
# FIXME: LWT is not supported with tablets yet. See #18066
|
||||
def table1(cql, test_keyspace_vnodes):
|
||||
def table1(cql, test_keyspace):
|
||||
schema='p int, c int, r int, s int static, PRIMARY KEY(p, c)'
|
||||
with new_test_table(cql, test_keyspace_vnodes, schema) as table:
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
yield table
|
||||
|
||||
# An LWT UPDATE whose condition uses non-static columns begins by reading
|
||||
|
||||
@@ -47,31 +47,31 @@ def lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, pk_type, fn):
|
||||
assert len(rows) == num_iterations * 2
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def test_lwt_uuid_fn_pk_insert(cql, test_keyspace):
|
||||
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "uuid", "uuid")
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def test_lwt_currenttimestamp_fn_pk_insert(cql, test_keyspace):
|
||||
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "timestamp", "currenttimestamp")
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def test_lwt_currenttime_fn_pk_insert(cql, test_keyspace):
|
||||
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "time", "currenttime")
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def test_lwt_currenttimeuuid_fn_pk_insert(cql, test_keyspace):
|
||||
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "timeuuid", "currenttimeuuid")
|
||||
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def test_lwt_now_fn_pk_insert(cql, test_keyspace):
|
||||
lwt_nondeterm_fn_repeated_execute(cql, test_keyspace, "timeuuid", "now")
|
||||
|
||||
@@ -201,12 +201,9 @@ def test_unset_insert_where(cql, table2):
|
||||
# NOT EXISTS"). Test that using an UNSET_VALUE in an LWT condition causes
|
||||
# a clear error, not silent skip and not a crash as in issue #13001.
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def test_unset_insert_where_lwt(cql, test_keyspace):
|
||||
# FIXME: new_test_table is used here due to https://github.com/scylladb/scylladb/issues/18066
|
||||
# When fixed, this test can go back to using the `table2` fixture.
|
||||
with new_test_table(cql, test_keyspace, "p int, c int, PRIMARY KEY (p, c)") as table2:
|
||||
def test_unset_insert_where_lwt(cql, table2):
|
||||
p = unique_key_int()
|
||||
stmt = cql.prepare(f'INSERT INTO {table2} (p, c) VALUES ({p}, ?) IF NOT EXISTS')
|
||||
with pytest.raises(InvalidRequest, match="unset"):
|
||||
@@ -225,12 +222,9 @@ def test_unset_update_where(cql, table3):
|
||||
# Python driver doesn't allow sending an UNSET_VALUE for the partition key,
|
||||
# so only the clustering key is tested.
|
||||
@pytest.mark.parametrize("test_keyspace",
|
||||
[pytest.param("tablets", marks=[pytest.mark.xfail(reason="issue #18066")]), "vnodes"],
|
||||
["tablets", "vnodes"],
|
||||
indirect=True)
|
||||
def test_unset_update_where_lwt(cql, test_keyspace):
|
||||
# FIXME: new_test_table is used here due to https://github.com/scylladb/scylladb/issues/18066
|
||||
# When fixed, this test can go back to using the `table3` fixture.
|
||||
with new_test_table(cql, test_keyspace, "p int, c int, r int, PRIMARY KEY (p, c)") as table3:
|
||||
def test_unset_update_where_lwt(cql, table3):
|
||||
stmt = cql.prepare(f"UPDATE {table3} SET r = 42 WHERE p = 0 AND c = ? IF r = ?")
|
||||
|
||||
with pytest.raises(InvalidRequest, match="unset"):
|
||||
|
||||
@@ -10,6 +10,41 @@ import pytest
|
||||
from .util import new_test_table, is_scylla, unique_name
|
||||
from cassandra.protocol import InvalidRequest, ConfigurationException
|
||||
|
||||
supported_filtering_types = [
|
||||
'ascii',
|
||||
'bigint',
|
||||
'blob',
|
||||
'boolean',
|
||||
'date',
|
||||
'decimal',
|
||||
'double',
|
||||
'float',
|
||||
'inet',
|
||||
'int',
|
||||
'smallint',
|
||||
'text',
|
||||
'varchar',
|
||||
'time',
|
||||
'timestamp',
|
||||
'timeuuid',
|
||||
'tinyint',
|
||||
'uuid',
|
||||
'varint',
|
||||
]
|
||||
|
||||
unsupported_filtering_types = [
|
||||
'duration',
|
||||
'map<int, int>',
|
||||
'list<int>',
|
||||
'set<int>',
|
||||
'tuple<int, int>',
|
||||
'vector<float, 3>',
|
||||
'frozen<map<int, int>>',
|
||||
'frozen<list<int>>',
|
||||
'frozen<set<int>>',
|
||||
'frozen<tuple<int, int>>',
|
||||
]
|
||||
|
||||
def test_create_vector_search_index(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p int primary key, v vector<float, 3>'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
@@ -45,6 +80,57 @@ def test_create_vector_search_index_on_nonvector_column(cql, test_keyspace, scyl
|
||||
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_global_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v, f1, f2) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_local_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_local_index_with_filtering_columns_on_nonvector_column(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p1 int, p2 int, c1 int, c2 int, v int, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_index_with_supported_and_unsupported_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
supported_columns = ', '.join([f's{idx} {typ}' for idx, typ in enumerate(supported_filtering_types)])
|
||||
unsupported_columns = ', '.join([f'u{idx} {typ}' for idx, typ in enumerate(unsupported_filtering_types)])
|
||||
schema = f'p int, c int, v vector<float, 3>, {supported_columns}, {unsupported_columns}, primary key (p, c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
for idx in range(len(supported_filtering_types)):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, s{idx}) USING 'vector_index'")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.global_idx")
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, s{idx}) USING 'vector_index'")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.local_idx")
|
||||
for idx in range(len(unsupported_filtering_types)):
|
||||
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, u{idx}) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, u{idx}) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_local_index_with_unsupported_partition_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
for filter_type in unsupported_filtering_types:
|
||||
schema = f'p {filter_type}, c int, v vector<float, 3>, f int, primary key (p, c)'
|
||||
with pytest.raises(InvalidRequest, match="Unsupported|Invalid"):
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p), v, f) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_index_with_duplicated_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = f'p int, c int, v vector<float, 3>, x int, primary key (p, c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, p) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, x, x) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, p) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, x, x) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_index_with_bad_options(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p int primary key, v vector<float, 3>'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
|
||||
@@ -154,13 +154,13 @@ rebalance_stats rebalance_tablets(cql_test_env& e, locator::load_stats_ptr load_
|
||||
auto max_iterations = 1 + get_tablet_count(stm.get()->tablets()) * 10;
|
||||
|
||||
for (size_t i = 0; i < max_iterations; ++i) {
|
||||
auto prev_lb_stats = talloc.stats().for_dc(dc);
|
||||
auto prev_lb_stats = *talloc.stats().for_dc(dc);
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
|
||||
auto plan = talloc.balance_tablets(stm.get(), nullptr, nullptr, load_stats, skiplist).get();
|
||||
|
||||
auto end_time = std::chrono::steady_clock::now();
|
||||
auto lb_stats = talloc.stats().for_dc(dc) - prev_lb_stats;
|
||||
auto lb_stats = *talloc.stats().for_dc(dc) - prev_lb_stats;
|
||||
|
||||
auto elapsed = std::chrono::duration_cast<seconds_double>(end_time - start_time);
|
||||
rebalance_stats iteration_stats = {
|
||||
|
||||
@@ -39,3 +39,4 @@ class ServerUpState(IntEnum):
|
||||
HOST_ID_QUERIED = auto()
|
||||
CQL_CONNECTED = auto()
|
||||
CQL_QUERIED = auto()
|
||||
SERVING = auto() # Scylla sent sd_notify("serving")
|
||||
|
||||
@@ -44,6 +44,7 @@ import platform
|
||||
import contextlib
|
||||
import fcntl
|
||||
import urllib
|
||||
import socket
|
||||
|
||||
import psutil
|
||||
|
||||
@@ -385,6 +386,10 @@ class ScyllaServer:
|
||||
prefix=f"scylladb-{f'{xdist_worker_id}-' if xdist_worker_id else ''}{self.server_id}-test.py-"
|
||||
)
|
||||
self.maintenance_socket_path = f"{self.maintenance_socket_dir.name}/cql.m"
|
||||
# Unix socket for receiving sd_notify messages from Scylla
|
||||
self.notify_socket_path = pathlib.Path(self.maintenance_socket_dir.name) / "notify.sock"
|
||||
self.notify_socket: Optional[socket.socket] = None
|
||||
self._received_serving = False
|
||||
self.exe = pathlib.Path(version.path).resolve()
|
||||
self.vardir = pathlib.Path(vardir)
|
||||
self.logger = logger
|
||||
@@ -712,6 +717,50 @@ class ScyllaServer:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
|
||||
def _setup_notify_socket(self) -> None:
|
||||
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
|
||||
if self.notify_socket is not None:
|
||||
return
|
||||
# Remove existing socket file if present
|
||||
self.notify_socket_path.unlink(missing_ok=True)
|
||||
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK | socket.SOCK_CLOEXEC)
|
||||
self.notify_socket.bind(str(self.notify_socket_path))
|
||||
self._received_serving = False
|
||||
|
||||
def _cleanup_notify_socket(self) -> None:
|
||||
"""Clean up the sd_notify socket."""
|
||||
if self.notify_socket is not None:
|
||||
self.notify_socket.close()
|
||||
self.notify_socket = None
|
||||
self.notify_socket_path.unlink(missing_ok=True)
|
||||
|
||||
def check_serving_notification(self) -> bool:
|
||||
"""Check if Scylla has sent the 'serving' sd_notify message.
|
||||
|
||||
Returns True if the SERVING state has been reached.
|
||||
"""
|
||||
if self._received_serving:
|
||||
return True
|
||||
if self.notify_socket is None:
|
||||
return False
|
||||
# Try to read all available messages from the socket
|
||||
while True:
|
||||
try:
|
||||
data = self.notify_socket.recv(4096)
|
||||
# sd_notify message format: "STATUS=serving\n" or "READY=1\nSTATUS=serving\n"
|
||||
message = data.decode('utf-8', errors='replace')
|
||||
if 'STATUS=serving' in message:
|
||||
self._received_serving = True
|
||||
self.logger.debug("Received sd_notify 'serving' message")
|
||||
return True
|
||||
except BlockingIOError:
|
||||
# No more messages available
|
||||
break
|
||||
except Exception as e:
|
||||
self.logger.debug("Error reading from notify socket: %s", e)
|
||||
break
|
||||
return False
|
||||
|
||||
async def try_get_host_id(self, api: ScyllaRESTAPIClient) -> Optional[HostID]:
|
||||
"""Try to get the host id (also tests Scylla REST API is serving)"""
|
||||
|
||||
@@ -754,6 +803,10 @@ class ScyllaServer:
|
||||
env['UBSAN_OPTIONS'] = f'halt_on_error=1:abort_on_error=1:suppressions={TOP_SRC_DIR / "ubsan-suppressions.supp"}'
|
||||
env['ASAN_OPTIONS'] = f'disable_coredump=0:abort_on_error=1:detect_stack_use_after_return=1'
|
||||
|
||||
# Set up socket for receiving sd_notify messages from Scylla
|
||||
self._setup_notify_socket()
|
||||
env['NOTIFY_SOCKET'] = self.notify_socket_path
|
||||
|
||||
# Reopen log file if it was closed (e.g., after a previous stop)
|
||||
if self.log_file is None or self.log_file.closed:
|
||||
self.log_file = self.log_filename.open("ab") # append mode to preserve previous logs
|
||||
@@ -808,7 +861,10 @@ class ScyllaServer:
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
server_up_state = await self.get_cql_up_state() or server_up_state
|
||||
if server_up_state == expected_server_up_state:
|
||||
# Check for SERVING state (sd_notify "serving" message)
|
||||
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
|
||||
server_up_state = ServerUpState.SERVING
|
||||
if server_up_state >= expected_server_up_state:
|
||||
if expected_error is not None:
|
||||
await report_error(
|
||||
f"the node has reached {server_up_state} state,"
|
||||
@@ -847,13 +903,14 @@ class ScyllaServer:
|
||||
session.execute("DROP KEYSPACE k")
|
||||
|
||||
async def shutdown_control_connection(self) -> None:
|
||||
"""Shut down driver connection"""
|
||||
"""Shut down driver connection and notify socket"""
|
||||
if self.control_connection is not None:
|
||||
self.control_connection.shutdown()
|
||||
self.control_connection = None
|
||||
if self.control_cluster is not None:
|
||||
self.control_cluster.shutdown()
|
||||
self.control_cluster = None
|
||||
self._cleanup_notify_socket()
|
||||
|
||||
@stop_event
|
||||
@start_stop_lock
|
||||
|
||||
111
utils/http.cc
111
utils/http.cc
@@ -23,39 +23,75 @@ future<shared_ptr<tls::certificate_credentials>> utils::http::system_trust_crede
|
||||
co_return system_trust_credentials;
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::state::state(shared_ptr<tls::certificate_credentials> cin)
|
||||
: creds(std::move(cin))
|
||||
{}
|
||||
|
||||
future<> utils::http::dns_connection_factory::initialize(lw_shared_ptr<state> state, std::string host, int port, bool use_https, logging::logger& logger) {
|
||||
co_await coroutine::all(
|
||||
[state, host, port] () -> future<> {
|
||||
auto hent = co_await net::dns::get_host_by_name(host, net::inet_address::family::INET);
|
||||
state->addr = socket_address(hent.addr_list.front(), port);
|
||||
},
|
||||
[state, use_https] () -> future<> {
|
||||
if (use_https && !state->creds) {
|
||||
state->creds = co_await system_trust_credentials();
|
||||
}
|
||||
if (!use_https) {
|
||||
state->creds = {};
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
state->initialized = true;
|
||||
logger.debug("Initialized factory, address={} tls={}", state->addr, state->creds == nullptr ? "no" : "yes");
|
||||
future<> utils::http::dns_connection_factory::init_credentials() {
|
||||
if (_use_https && !_creds) {
|
||||
_creds = co_await system_trust_credentials();
|
||||
}
|
||||
if (!_use_https) {
|
||||
_creds = {};
|
||||
}
|
||||
_logger.debug("Initialized credentials, tls={}", _creds == nullptr ? "no" : "yes");
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::dns_connection_factory(dns_connection_factory&&) = default;
|
||||
future<net::inet_address> utils::http::dns_connection_factory::get_address() {
|
||||
auto get_addr = [this] -> net::inet_address {
|
||||
const auto& addresses = _addr_list.value();
|
||||
return addresses[_addr_pos++ % addresses.size()];
|
||||
};
|
||||
|
||||
if (_addr_list) {
|
||||
co_return get_addr();
|
||||
}
|
||||
auto units = co_await get_units(_init_semaphore, 1);
|
||||
if (!_addr_list) {
|
||||
auto hent = co_await net::dns::get_host_by_name(_host, net::inet_address::family::INET);
|
||||
_address_ttl = std::ranges::min_element(hent.addr_entries, [](const net::hostent::address_entry& lhs, const net::hostent::address_entry& rhs) {
|
||||
return lhs.ttl < rhs.ttl;
|
||||
})->ttl;
|
||||
if (_address_ttl.count() == 0) {
|
||||
co_return hent.addr_entries[_addr_pos++ % hent.addr_entries.size()].addr;
|
||||
}
|
||||
_addr_list = hent.addr_entries | std::views::transform(&net::hostent::address_entry::addr) | std::ranges::to<std::vector>();
|
||||
_addr_update_timer.rearm(lowres_clock::now() + _address_ttl);
|
||||
}
|
||||
|
||||
co_return get_addr();
|
||||
}
|
||||
|
||||
future<shared_ptr<tls::certificate_credentials>> utils::http::dns_connection_factory::get_creds() {
|
||||
if (!_creds_init) [[unlikely]] {
|
||||
auto units = co_await get_units(_init_semaphore, 1);
|
||||
if (!_creds_init) {
|
||||
co_await init_credentials();
|
||||
_creds_init = true;
|
||||
}
|
||||
}
|
||||
co_return _creds;
|
||||
}
|
||||
|
||||
future<connected_socket> utils::http::dns_connection_factory::connect(net::inet_address address) {
|
||||
auto socket_addr = socket_address(address, _port);
|
||||
if (auto creds = co_await get_creds()) {
|
||||
_logger.debug("Making new HTTPS connection addr={} host={}", socket_addr, _host);
|
||||
co_return co_await tls::connect(creds, socket_addr, tls::tls_options{.server_name = _host});
|
||||
}
|
||||
_logger.debug("Making new HTTP connection addr={} host={}", socket_addr, _host);
|
||||
co_return co_await seastar::connect(socket_addr, {}, transport::TCP);
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::dns_connection_factory(std::string host, int port, bool use_https, logging::logger& logger, shared_ptr<tls::certificate_credentials> certs)
|
||||
: _host(std::move(host))
|
||||
, _port(port)
|
||||
, _logger(logger)
|
||||
, _state(make_lw_shared<state>(std::move(certs)))
|
||||
, _done(initialize(_state, _host, _port, use_https, _logger))
|
||||
{}
|
||||
,_creds(std::move(certs))
|
||||
, _use_https(use_https)
|
||||
, _addr_update_timer([this] {
|
||||
if (auto units = try_get_units(_init_semaphore, 1)) {
|
||||
_addr_list.reset();
|
||||
}
|
||||
}) {
|
||||
_addr_update_timer.arm(lowres_clock::now());
|
||||
}
|
||||
|
||||
utils::http::dns_connection_factory::dns_connection_factory(std::string uri, logging::logger& logger, shared_ptr<tls::certificate_credentials> certs)
|
||||
: dns_connection_factory([&] {
|
||||
@@ -68,18 +104,21 @@ utils::http::dns_connection_factory::dns_connection_factory(std::string uri, log
|
||||
{}
|
||||
|
||||
future<connected_socket> utils::http::dns_connection_factory::make(abort_source*) {
|
||||
if (!_state->initialized) {
|
||||
_logger.debug("Waiting for factory to initialize");
|
||||
co_await _done.get_future();
|
||||
try {
|
||||
auto address = co_await get_address();
|
||||
co_return co_await connect(address);
|
||||
} catch (...) {
|
||||
// On failure, forcefully renew address resolution and try again
|
||||
_logger.debug("Connection failed, resetting address provider and retrying: {}", std::current_exception());
|
||||
}
|
||||
_addr_list.reset();
|
||||
auto address = co_await get_address();
|
||||
co_return co_await connect(address);
|
||||
}
|
||||
|
||||
if (_state->creds) {
|
||||
_logger.debug("Making new HTTPS connection addr={} host={}", _state->addr, _host);
|
||||
co_return co_await tls::connect(_state->creds, _state->addr, tls::tls_options{.server_name = _host});
|
||||
} else {
|
||||
_logger.debug("Making new HTTP connection addr={} host={}", _state->addr, _host);
|
||||
co_return co_await seastar::connect(_state->addr, {}, transport::TCP);
|
||||
}
|
||||
future<> utils::http::dns_connection_factory::close() {
|
||||
_addr_update_timer.cancel();
|
||||
co_await get_units(_init_semaphore, 1);
|
||||
}
|
||||
|
||||
static const char HTTPS[] = "https";
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/http/client.hh>
|
||||
#include <seastar/net/dns.hh>
|
||||
#include <seastar/net/tls.hh>
|
||||
@@ -26,23 +25,26 @@ protected:
|
||||
std::string _host;
|
||||
int _port;
|
||||
logging::logger& _logger;
|
||||
struct state {
|
||||
bool initialized = false;
|
||||
socket_address addr;
|
||||
shared_ptr<tls::certificate_credentials> creds;
|
||||
state(shared_ptr<tls::certificate_credentials>);
|
||||
};
|
||||
lw_shared_ptr<state> _state;
|
||||
shared_future<> _done;
|
||||
semaphore _init_semaphore{1};
|
||||
bool _creds_init = false;
|
||||
std::optional<std::vector<net::inet_address>> _addr_list;
|
||||
shared_ptr<tls::certificate_credentials> _creds;
|
||||
uint16_t _addr_pos{0};
|
||||
bool _use_https;
|
||||
std::chrono::seconds _address_ttl{0};
|
||||
timer<lowres_clock> _addr_update_timer;
|
||||
|
||||
// This method can out-live the factory instance, in case `make()` is never called before the instance is destroyed.
|
||||
static future<> initialize(lw_shared_ptr<state> state, std::string host, int port, bool use_https, logging::logger& logger);
|
||||
future<> init_credentials();
|
||||
future<net::inet_address> get_address();
|
||||
future<shared_ptr<tls::certificate_credentials>> get_creds();
|
||||
future<connected_socket> connect(net::inet_address address);
|
||||
public:
|
||||
dns_connection_factory(dns_connection_factory&&);
|
||||
dns_connection_factory(dns_connection_factory&&) = default;
|
||||
dns_connection_factory(std::string host, int port, bool use_https, logging::logger& logger, shared_ptr<tls::certificate_credentials> = {});
|
||||
dns_connection_factory(std::string endpoint_url, logging::logger& logger, shared_ptr<tls::certificate_credentials> = {});
|
||||
|
||||
virtual future<connected_socket> make(abort_source*) override;
|
||||
future<> close() override;
|
||||
};
|
||||
|
||||
// simple URL parser, just enough to handle required aspects for normal endpoint usage
|
||||
|
||||
@@ -160,43 +160,9 @@ aws_error aws_error::from_system_error(const std::system_error& system_error) {
|
||||
}
|
||||
}
|
||||
|
||||
aws_error aws_error::from_maybe_nested_exception(std::exception_ptr eptr) {
|
||||
std::string original_message;
|
||||
while (eptr) {
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
} catch (const std::exception& e) {
|
||||
if (original_message.empty()) {
|
||||
original_message = e.what();
|
||||
}
|
||||
|
||||
if (auto* sys = dynamic_cast<const std::system_error*>(&e)) {
|
||||
return from_system_error(*sys);
|
||||
}
|
||||
|
||||
try {
|
||||
std::rethrow_if_nested(e);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
} catch (...) {
|
||||
// Non-std::exception, should not happen in general
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (original_message.empty()) {
|
||||
original_message = fmt::format("No error message was provided, exception content: {}", eptr);
|
||||
}
|
||||
|
||||
return {aws_error_type::UNKNOWN, std::move(original_message), retryable::no};
|
||||
}
|
||||
|
||||
|
||||
aws_error aws_error::from_exception_ptr(std::exception_ptr exception) {
|
||||
if (exception) {
|
||||
std::string original_message;
|
||||
while (exception) {
|
||||
try {
|
||||
std::rethrow_exception(exception);
|
||||
} catch (const aws_exception& ex) {
|
||||
@@ -205,10 +171,20 @@ aws_error aws_error::from_exception_ptr(std::exception_ptr exception) {
|
||||
return from_http_code(ex.status());
|
||||
} catch (const std::system_error& ex) {
|
||||
return from_system_error(ex);
|
||||
} catch (const std::exception&) {
|
||||
return from_maybe_nested_exception(std::current_exception());
|
||||
} catch (const std::exception& ex) {
|
||||
if (original_message.empty()) {
|
||||
original_message = ex.what();
|
||||
}
|
||||
|
||||
try {
|
||||
std::rethrow_if_nested(ex);
|
||||
} catch (...) {
|
||||
exception = std::current_exception();
|
||||
continue;
|
||||
}
|
||||
return aws_error{aws_error_type::UNKNOWN, std::move(original_message), retryable::no};
|
||||
} catch (...) {
|
||||
return aws_error{aws_error_type::UNKNOWN, seastar::format("{}", std::current_exception()), retryable::no};
|
||||
return aws_error{aws_error_type::UNKNOWN, seastar::format("No error message was provided, exception content: {}", std::current_exception()), retryable::no};
|
||||
}
|
||||
}
|
||||
return aws_error{aws_error_type::UNKNOWN, "No exception was provided to `aws_error::from_exception_ptr` function call", retryable::no};
|
||||
|
||||
@@ -106,7 +106,6 @@ public:
|
||||
static std::optional<aws_error> parse(seastar::sstring&& body);
|
||||
static aws_error from_http_code(seastar::http::reply::status_type http_code);
|
||||
static aws_error from_system_error(const std::system_error& system_error);
|
||||
static aws_error from_maybe_nested_exception(std::exception_ptr maybe_nested_error);
|
||||
static aws_error from_exception_ptr(std::exception_ptr exception);
|
||||
static const aws_errors& get_errors();
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user