Compare commits
24 Commits
debug_form
...
scylla-202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28b8896680 | ||
|
|
e9cae4be17 | ||
|
|
daf1c96ad3 | ||
|
|
1a1893078a | ||
|
|
d7e3ab2226 | ||
|
|
cf589222a0 | ||
|
|
156800a3dd | ||
|
|
d1e8b02260 | ||
|
|
a51888694e | ||
|
|
68f134ee23 | ||
|
|
b623c237bc | ||
|
|
8379d545c5 | ||
|
|
58d13d0daf | ||
|
|
4def507b1b | ||
|
|
69ad9350cc | ||
|
|
29e5f5f54d | ||
|
|
379b3fa46c | ||
|
|
3bb8039359 | ||
|
|
9f3838e614 | ||
|
|
366212f997 | ||
|
|
c0637aff81 | ||
|
|
dcf436eb84 | ||
|
|
8e754e9d41 | ||
|
|
f407799f25 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.1.0-dev
|
||||
VERSION=2025.1.0-rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1112,7 +1112,9 @@ future<bool> generation_service::legacy_do_handle_cdc_generation(cdc::generation
|
||||
auto sys_dist_ks = get_sys_dist_ks();
|
||||
auto gen = co_await retrieve_generation_data(gen_id, _sys_ks.local(), *sys_dist_ks, { _token_metadata.get()->count_normal_token_owners() });
|
||||
if (!gen) {
|
||||
throw std::runtime_error(fmt::format(
|
||||
// This may happen during raft upgrade when a node gossips about a generation that
|
||||
// was propagated through raft and we didn't apply it yet.
|
||||
throw generation_handling_nonfatal_exception(fmt::format(
|
||||
"Could not find CDC generation {} in distributed system tables (current time: {}),"
|
||||
" even though some node gossiped about it.",
|
||||
gen_id, db_clock::now()));
|
||||
|
||||
@@ -186,7 +186,7 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
|
||||
}
|
||||
|
||||
auto ts = to_ts(tp);
|
||||
auto emplaced = _gens.emplace(to_ts(tp), std::nullopt).second;
|
||||
auto [it, emplaced] = _gens.emplace(to_ts(tp), std::nullopt);
|
||||
|
||||
if (_last_stream_timestamp != api::missing_timestamp) {
|
||||
auto last_correct_gen = gen_used_at(_last_stream_timestamp);
|
||||
@@ -201,5 +201,5 @@ bool cdc::metadata::prepare(db_clock::time_point tp) {
|
||||
}
|
||||
}
|
||||
|
||||
return emplaced;
|
||||
return !it->second;
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ SSTable Version Support
|
||||
- ScyllaDB Enterprise Version
|
||||
- ScyllaDB Open Source Version
|
||||
* - 3.x ('me')
|
||||
- 2022.2
|
||||
- 2022.2 and above
|
||||
- 5.1 and above
|
||||
* - 3.x ('md')
|
||||
- 2021.1
|
||||
|
||||
@@ -9,11 +9,7 @@ ScyllaDB SSTable Format
|
||||
|
||||
.. include:: _common/sstable_what_is.rst
|
||||
|
||||
* In ScyllaDB 6.0 and above, *me* format is enabled by default.
|
||||
|
||||
* In ScyllaDB Enterprise 2021.1, ScyllaDB 4.3 and above, *md* format is enabled by default.
|
||||
|
||||
* In ScyllaDB 3.1 and above, *mc* format is enabled by default.
|
||||
In ScyllaDB 6.0 and above, *me* format is enabled by default.
|
||||
|
||||
For more information on each of the SSTable formats, see below:
|
||||
|
||||
|
||||
@@ -12,17 +12,7 @@ ScyllaDB SSTable - 3.x
|
||||
|
||||
.. include:: ../_common/sstable_what_is.rst
|
||||
|
||||
* In ScyllaDB 6.0 and above, the ``me`` format is mandatory, and ``md`` format is used only when upgrading from an existing cluster using ``md``. The ``sstable_format`` parameter is ignored if it is set to ``md``.
|
||||
* In ScyllaDB 5.1 and above, the ``me`` format is enabled by default.
|
||||
* In ScyllaDB 4.3 to 5.0, the ``md`` format is enabled by default.
|
||||
* In ScyllaDB 3.1 to 4.2, the ``mc`` format is enabled by default.
|
||||
* In ScyllaDB 3.0, the ``mc`` format is disabled by default. You can enable it by adding the ``enable_sstables_mc_format`` parameter set to ``true`` in the ``scylla.yaml`` file. For example:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
enable_sstables_mc_format: true
|
||||
|
||||
.. REMOVE IN FUTURE VERSIONS - Remove the note above in version 5.2.
|
||||
In ScyllaDB 6.0 and above, the ``me`` format is mandatory, and ``md`` format is used only when upgrading from an existing cluster using ``md``. The ``sstable_format`` parameter is ignored if it is set to ``md``.
|
||||
|
||||
Additional Information
|
||||
-------------------------
|
||||
|
||||
@@ -202,18 +202,14 @@ An example that excludes a datacenter while using ``replication_factor``::
|
||||
|
||||
DESCRIBE KEYSPACE excalibur
|
||||
CREATE KEYSPACE excalibur WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': '3'} AND durable_writes = true;
|
||||
|
||||
|
||||
|
||||
.. only:: opensource
|
||||
|
||||
Keyspace storage options :label-caution:`Experimental`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Keyspace storage options :label-caution:`Experimental`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
By default, SStables of a keyspace are stored locally.
|
||||
As an alternative, you can configure your keyspace to be stored
|
||||
on Amazon S3 or another S3-compatible object store.
|
||||
See :ref:`Keyspace storage options <admin-keyspace-storage-options>` for details.
|
||||
By default, SStables of a keyspace are stored locally.
|
||||
As an alternative, you can configure your keyspace to be stored
|
||||
on Amazon S3 or another S3-compatible object store.
|
||||
See :ref:`Keyspace storage options <admin-keyspace-storage-options>` for details.
|
||||
|
||||
.. _tablets:
|
||||
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-prerequisites>`_ on other x86_64 or aarch64 platforms, without any guarantees.
|
||||
|
||||
+----------------------------+--------------------+-------+---------------+
|
||||
| Linux Distributions |Ubuntu | Debian|Rocky / CentOS |
|
||||
| | | |/ RHEL |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 11 | 8 | 9 |
|
||||
+============================+======+======+======+=======+=======+=======+
|
||||
| 6.2 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+
|
||||
|
||||
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
* All releases are available as a Docker container and EC2 AMI, GCP, and Azure images.
|
||||
|
||||
Supported Architecture
|
||||
-----------------------------
|
||||
|
||||
ScyllaDB Open Source supports x86_64 for all versions and AArch64 starting from ScyllaDB 4.6 and nightly build.
|
||||
In particular, aarch64 support includes AWS EC2 Graviton.
|
||||
@@ -4,7 +4,7 @@ ScyllaDB Web Installer for Linux
|
||||
|
||||
ScyllaDB Web Installer is a platform-agnostic installation script you can run with ``curl`` to install ScyllaDB on Linux.
|
||||
|
||||
See `ScyllaDB Download Center <https://www.scylladb.com/download/#core>`_ for information on manually installing ScyllaDB with platform-specific installation packages.
|
||||
See :doc:`Install ScyllaDB Linux Packages </getting-started/install-scylla/install-on-linux/>` for information on manually installing ScyllaDB with platform-specific installation packages.
|
||||
|
||||
Prerequisites
|
||||
--------------
|
||||
@@ -20,44 +20,50 @@ To install ScyllaDB with Web Installer, run:
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash
|
||||
|
||||
By default, running the script installs the latest official version of ScyllaDB Open Source. You can use the following
|
||||
options to install a different version or ScyllaDB Enterprise:
|
||||
|
||||
.. list-table::
|
||||
:widths: 20 25 55
|
||||
:header-rows: 1
|
||||
|
||||
* - Option
|
||||
- Acceptable values
|
||||
- Description
|
||||
* - ``--scylla-product``
|
||||
- ``scylla`` | ``scylla-enterprise``
|
||||
- Specifies the ScyllaDB product to install: Open Source (``scylla``) or Enterprise (``scylla-enterprise``) The default is ``scylla``.
|
||||
* - ``--scylla-version``
|
||||
- ``<version number>``
|
||||
- Specifies the ScyllaDB version to install. You can specify the major release (``x.y``) to install the latest patch for that version or a specific patch release (``x.y.x``). The default is the latest official version.
|
||||
By default, running the script installs the latest official version of ScyllaDB.
|
||||
|
||||
You can run the command with the ``-h`` or ``--help`` flag to print information about the script.
|
||||
|
||||
Examples
|
||||
===========
|
||||
Installing a Non-default Version
|
||||
---------------------------------------
|
||||
|
||||
Installing ScyllaDB Open Source 6.0.1:
|
||||
You can install a version other than the default.
|
||||
|
||||
Versions 2025.1 and Later
|
||||
==============================
|
||||
|
||||
Run the command with the ``--scylla-version`` option to specify the version
|
||||
you want to install.
|
||||
|
||||
**Example**
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 2025.1.1
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 6.0.1
|
||||
|
||||
Installing the latest patch release for ScyllaDB Open Source 6.0:
|
||||
Versions Earlier than 2025.1
|
||||
================================
|
||||
|
||||
To install a supported version of *ScyllaDB Enterprise*, run the command with:
|
||||
|
||||
* ``--scylla-product scylla-enterprise`` to specify that you want to install
|
||||
ScyllaDB Entrprise.
|
||||
* ``--scylla-version`` to specify the version you want to install.
|
||||
|
||||
For example:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-product scylla-enterprise --scylla-version 2024.1
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 6.0
|
||||
To install a supported version of *ScyllaDB Open Source*, run the command with
|
||||
the ``--scylla-version`` option to specify the version you want to install.
|
||||
|
||||
Installing ScyllaDB Enterprise 2024.1:
|
||||
For example:
|
||||
|
||||
.. code:: console
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-product scylla-enterprise --scylla-version 2024.1
|
||||
|
||||
curl -sSf get.scylladb.com/server | sudo bash -s -- --scylla-version 6.2.1
|
||||
|
||||
.. include:: /getting-started/_common/setup-after-install.rst
|
||||
@@ -1,13 +1,36 @@
|
||||
OS Support by Linux Distributions and Version
|
||||
==============================================
|
||||
|
||||
The following matrix shows which Linux distributions, containers, and images are supported with which versions of ScyllaDB.
|
||||
The following matrix shows which Linux distributions, containers, and images
|
||||
are :ref:`supported <os-support-definition>` with which versions of ScyllaDB.
|
||||
|
||||
Where *supported* in this scope means:
|
||||
+-------------------------------+--------------------+-------+------------------+---------------+
|
||||
| Linux Distributions |Ubuntu | Debian| Rocky / Centos / | Amazon Linux |
|
||||
| | | | RHEL | |
|
||||
+-------------------------------+------+------+------+-------+-------+----------+---------------+
|
||||
| ScyllaDB Version / OS Version |20.04 |22.04 |24.04 | 11 | 8 | 9 | 2023 |
|
||||
+===============================+======+======+======+=======+=======+==========+===============+
|
||||
| Enterprise 2025.1 | |v| | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+-------------------------------+------+------+------+-------+-------+----------+---------------+
|
||||
| Enterprise 2024.2 | |v| | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+-------------------------------+------+------+------+-------+-------+----------+---------------+
|
||||
| Enterprise 2024.1 | |v| | |v| | |x| | |v| | |v| | |v| | |x| |
|
||||
+-------------------------------+------+------+------+-------+-------+----------+---------------+
|
||||
| Open Source 6.2 | |v| | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+-------------------------------+------+------+------+-------+-------+----------+---------------+
|
||||
|
||||
All releases are available as a Docker container, EC2 AMI, GCP, and Azure images.
|
||||
|
||||
.. _os-support-definition:
|
||||
|
||||
By *supported*, it is meant that:
|
||||
|
||||
- A binary installation package is available to `download <https://www.scylladb.com/download/>`_.
|
||||
- The download and install procedures are tested as part of ScyllaDB release process for each version.
|
||||
- An automated install is included from :doc:`ScyllaDB Web Installer for Linux tool </getting-started/installation-common/scylla-web-installer>` (for latest versions)
|
||||
- The download and install procedures are tested as part of the ScyllaDB release process for each version.
|
||||
- An automated install is included from :doc:`ScyllaDB Web Installer for Linux tool </getting-started/installation-common/scylla-web-installer>` (for the latest versions).
|
||||
|
||||
You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-prerequisites>`_
|
||||
on other x86_64 or aarch64 platforms, without any guarantees.
|
||||
|
||||
|
||||
.. scylladb_include_flag:: os-support-info.rst
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ ScyllaDB Requirements
|
||||
:hidden:
|
||||
|
||||
system-requirements
|
||||
os-support
|
||||
OS Support <os-support>
|
||||
Cloud Instance Recommendations <cloud-instance-recommendations>
|
||||
scylla-in-a-shared-environment
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
* :doc:`cassandra-stress </operating-scylla/admin-tools/cassandra-stress/>` A tool for benchmarking and load testing a ScyllaDB and Cassandra clusters.
|
||||
* :doc:`SSTabledump </operating-scylla/admin-tools/sstabledump>`
|
||||
* :doc:`SSTableMetadata </operating-scylla/admin-tools/sstablemetadata>`
|
||||
* configuration_encryptor - :doc:`encrypt at rest </operating-scylla/security/encryption-at-rest>` sensitive scylla configuration entries using system key.
|
||||
* scylla local-file-key-generator - Generate a local file (system) key for :doc:`encryption at rest </operating-scylla/security/encryption-at-rest>`, with the provided length, Key algorithm, Algorithm block mode and Algorithm padding method.
|
||||
* `scyllatop <https://www.scylladb.com/2016/03/22/scyllatop/>`_ - A terminal base top-like tool for scylladb collectd/prometheus metrics.
|
||||
* :doc:`scylla_dev_mode_setup</getting-started/installation-common/dev-mod>` - run ScyllaDB in Developer Mode.
|
||||
|
||||
@@ -2,9 +2,6 @@
|
||||
ScyllaDB Auditing Guide
|
||||
========================
|
||||
|
||||
:label-tip:`ScyllaDB Enterprise`
|
||||
|
||||
|
||||
Auditing allows the administrator to monitor activities on a Scylla cluster, including queries and data changes.
|
||||
The information is stored in a Syslog or a Scylla table.
|
||||
|
||||
|
||||
@@ -143,8 +143,6 @@ Depending on your key provider, you will either have the option of allowing Scyl
|
||||
* Replicated Key Provider - you must generate a system key yourself
|
||||
* Local Key Provider - If you do not generate your own secret key, ScyllaDB will create one for you
|
||||
|
||||
When encrypting ScyllaDB config by ``configuration_encryptor``, you also need to generate a secret key and upload the key to all nodes.
|
||||
|
||||
|
||||
Use the key generator script
|
||||
================================
|
||||
@@ -820,32 +818,6 @@ Once this encryption is enabled, it is used for all system data.
|
||||
|
||||
.. wasn't able to test this successfully
|
||||
|
||||
.. Encrypt and Decrypt Configuration Files
|
||||
.. =======================================
|
||||
|
||||
.. Using the Configuration Encryption tool, you can encrypt parts of the scylla.yaml file which contain encryption configuration settings.
|
||||
|
||||
.. **Procedure**
|
||||
|
||||
.. 1. Run the Configuration Encryption script:
|
||||
|
||||
.. test code-block: none
|
||||
|
||||
.. /bin/configuration_encryptor [options] [key-path]
|
||||
|
||||
.. Where:
|
||||
|
||||
.. * ``-c, --config`` - the path to the configuration file (/etc/scylla/scylla.yaml, for example)
|
||||
.. * ``-d, --decrypt`` - decrypts the configuration file at the specified path
|
||||
.. * ``-o, --output`` - (optional) writes the configuration file to a specified target. This can be the same location as the source file.
|
||||
.. * ``-h. --help`` - help for this command
|
||||
|
||||
.. For example:
|
||||
|
||||
.. test code-block: none
|
||||
|
||||
.. sudo -u scylla /bin/configuration_encryptor -c /etc/scylla/scylla.yaml /etc/scylla/encryption_keys/secret_key
|
||||
.. end of test
|
||||
|
||||
When a Key is Lost
|
||||
----------------------
|
||||
|
||||
@@ -7,10 +7,6 @@ LDAP Authentication
|
||||
|
||||
saslauthd
|
||||
|
||||
:label-tip:`ScyllaDB Enterprise`
|
||||
|
||||
.. versionadded:: 2021.1.2
|
||||
|
||||
Scylla supports user authentication via an LDAP server by leveraging the SaslauthdAuthenticator.
|
||||
By configuring saslauthd correctly against your LDAP server, you enable Scylla to check the user’s credentials through it.
|
||||
|
||||
|
||||
@@ -2,10 +2,6 @@
|
||||
LDAP Authorization (Role Management)
|
||||
=====================================
|
||||
|
||||
:label-tip:`ScyllaDB Enterprise`
|
||||
|
||||
.. versionadded:: 2021.1.2
|
||||
|
||||
Scylla Enterprise customers can manage and authorize users’ privileges via an :abbr:`LDAP (Lightweight Directory Access Protocol)` server.
|
||||
LDAP is an open, vendor-neutral, industry-standard protocol for accessing and maintaining distributed user access control over a standard IP network.
|
||||
If your users are already stored in an LDAP directory, you can now use the same LDAP server to regulate their roles in Scylla.
|
||||
|
||||
@@ -31,11 +31,9 @@ Encryption on Transit, Client to Node and Node to Node
|
||||
Encryption on Transit protects your communication against a 3rd interception on the network connection.
|
||||
Configure ScyllaDB to use TLS/SSL for all the connections. Use TLS/SSL to encrypt communication between ScyllaDB nodes and client applications.
|
||||
|
||||
.. only:: enterprise
|
||||
|
||||
Starting with version 2023.1.1, you can run ScyllaDB Enterprise on FIPS-enabled Ubuntu,
|
||||
which uses FIPS 140-2 certified libraries (such as OpenSSL, GnuTLS, and more) and Linux
|
||||
kernel in FIPS mode.
|
||||
You can run ScyllaDB on FIPS-enabled Ubuntu,
|
||||
which uses FIPS 140-2 certified libraries (such as OpenSSL, GnuTLS, and more) and Linux
|
||||
kernel in FIPS mode.
|
||||
|
||||
* :doc:`Encryption Data in Transit Client to Node </operating-scylla/security/client-node-encryption>`
|
||||
|
||||
|
||||
@@ -223,7 +223,18 @@ size_t encrypted_file_impl::transform(uint64_t pos, const void* buffer, size_t l
|
||||
throw std::invalid_argument("Output data not aligned");
|
||||
}
|
||||
_key->transform_unpadded(m, i + off, align_down(rem, b), o + off, iv.data());
|
||||
return l - pos;
|
||||
// #22236 - ensure we don't wrap numbers here.
|
||||
// If reading past actual end of file (_file_length), we can be decoding
|
||||
// 1-<key block size> bytes here, that are at the boundary of last
|
||||
// (fake) block of the file.
|
||||
// Example:
|
||||
// File data size: 4095 bytes
|
||||
// Physical file size: 4095 + 16 (assume 16 bytes key block)
|
||||
// Read 0:4096 -> 4095 bytes
|
||||
// If caller now ignores this and just reads 4096 (or more)
|
||||
// bytes at next block (4096), we read 15 bytes and decode.
|
||||
// But would be past _file_length -> ensure we return zero here.
|
||||
return std::max(l, pos) - pos;
|
||||
}
|
||||
_key->transform_unpadded(m, i + off, block_size, o + off, iv.data());
|
||||
}
|
||||
@@ -271,6 +282,9 @@ future<size_t> encrypted_file_impl::write_dma(uint64_t pos, std::vector<iovec> i
|
||||
future<size_t> encrypted_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, io_intent* intent) {
|
||||
assert(!(len & (block_size - 1)));
|
||||
return verify_file_length().then([this, pos, buffer, len, intent] {
|
||||
if (pos >= *_file_length) {
|
||||
return make_ready_future<size_t>(0);
|
||||
}
|
||||
return _file.dma_read(pos, buffer, len, intent).then([this, pos, buffer](size_t len) {
|
||||
return transform(pos, buffer, len, buffer, mode::decrypt);
|
||||
});
|
||||
@@ -279,6 +293,9 @@ future<size_t> encrypted_file_impl::read_dma(uint64_t pos, void* buffer, size_t
|
||||
|
||||
future<size_t> encrypted_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) {
|
||||
return verify_file_length().then([this, pos, iov = std::move(iov), intent]() mutable {
|
||||
if (pos >= *_file_length) {
|
||||
return make_ready_future<size_t>(0);
|
||||
}
|
||||
auto f = _file.dma_read(pos, iov, intent);
|
||||
return f.then([this, pos, iov = std::move(iov)](size_t len) mutable {
|
||||
size_t off = 0;
|
||||
@@ -292,6 +309,9 @@ future<size_t> encrypted_file_impl::read_dma(uint64_t pos, std::vector<iovec> io
|
||||
|
||||
future<temporary_buffer<uint8_t>> encrypted_file_impl::dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) {
|
||||
return verify_file_length().then([this, offset, range_size, intent]() mutable {
|
||||
if (offset >= *_file_length) {
|
||||
return make_ready_future<temporary_buffer<uint8_t>>();
|
||||
}
|
||||
auto front = offset & (block_size - 1);
|
||||
offset -= front;
|
||||
range_size += front;
|
||||
@@ -305,7 +325,8 @@ future<temporary_buffer<uint8_t>> encrypted_file_impl::dma_read_bulk(uint64_t of
|
||||
auto s = transform(offset, result.get(), result.size(), result.get_write(), mode::decrypt);
|
||||
// never give back more than asked for.
|
||||
result.trim(std::min(s, range_size));
|
||||
result.trim_front(front);
|
||||
// #22236 - ensure we don't overtrim if we get a short read.
|
||||
result.trim_front(std::min(front, result.size()));
|
||||
return result;
|
||||
});
|
||||
});
|
||||
|
||||
@@ -657,6 +657,8 @@ future<> global_vnode_effective_replication_map::get_keyspace_erms(sharded<repli
|
||||
// all under the lock.
|
||||
auto lk = co_await db.get_shared_token_metadata().get_lock();
|
||||
auto erm = db.find_keyspace(keyspace_name).get_vnode_effective_replication_map();
|
||||
utils::get_local_injector().inject("get_keyspace_erms_throw_no_such_keyspace",
|
||||
[&keyspace_name] { throw data_dictionary::no_such_keyspace{keyspace_name}; });
|
||||
auto ring_version = erm->get_token_metadata().get_ring_version();
|
||||
_erms[0] = make_foreign(std::move(erm));
|
||||
co_await coroutine::parallel_for_each(std::views::iota(1u, smp::count), [this, &sharded_db, keyspace_name, ring_version] (unsigned shard) -> future<> {
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:7cc51cd5c699c529239d1fe6fc3a7ec5dfceb3389236257388d07415f1870340
|
||||
size 5800128
|
||||
oid sha256:ec31910490135f3d85f5fd44d378a6b014ddef2ca8d7e0fe61ff39b9f5ff90f8
|
||||
size 5880332
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:daaeb5d5740d25ef0dcc79034959382042d007d569389940d0a7baf51fa1da59
|
||||
size 5795820
|
||||
oid sha256:2ab4f5e0de2b4ff4a8f1b46f49f239fd527c056fdc1d678bf298ba429d66cee9
|
||||
size 5884124
|
||||
|
||||
@@ -1487,7 +1487,16 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
auto& keyspace = _status.keyspace;
|
||||
auto& sharded_db = rs.get_db();
|
||||
auto& db = sharded_db.local();
|
||||
auto germs = make_lw_shared(co_await locator::make_global_effective_replication_map(sharded_db, keyspace));
|
||||
auto germs_fut = co_await coroutine::as_future(locator::make_global_effective_replication_map(sharded_db, keyspace));
|
||||
if (germs_fut.failed()) {
|
||||
auto ex = germs_fut.get_exception();
|
||||
if (try_catch<data_dictionary::no_such_keyspace>(ex)) {
|
||||
rlogger.warn("sync data: keyspace {} does not exist, skipping", keyspace);
|
||||
co_return;
|
||||
}
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
auto germs = make_lw_shared(germs_fut.get());
|
||||
|
||||
auto id = get_repair_uniq_id();
|
||||
|
||||
|
||||
@@ -413,20 +413,17 @@ static locator::node::state to_topology_node_state(node_state ns) {
|
||||
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
|
||||
}
|
||||
|
||||
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
|
||||
// gossiper) to align it with the other raft topology nodes.
|
||||
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
|
||||
nodes_to_notify_after_sync nodes_to_notify;
|
||||
|
||||
rtlogger.trace("Start sync_raft_topology_nodes target_node={}", target_node);
|
||||
|
||||
const auto& am =_address_map;
|
||||
future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify) {
|
||||
const auto& t = _topology_state_machine._topology;
|
||||
raft::server_id raft_id{id.uuid()};
|
||||
|
||||
auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
|
||||
tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
|
||||
to_topology_node_state(rs.state), rs.shard_count);
|
||||
};
|
||||
std::vector<future<>> sys_ks_futures;
|
||||
|
||||
auto node = t.find(raft_id);
|
||||
|
||||
if (!node) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
using host_id_to_ip_map_t = std::unordered_map<locator::host_id, gms::inet_address>;
|
||||
auto get_host_id_to_ip_map = [&, map = std::optional<host_id_to_ip_map_t>{}]() mutable -> future<const host_id_to_ip_map_t*> {
|
||||
@@ -445,50 +442,13 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
co_return &*map;
|
||||
};
|
||||
|
||||
std::vector<future<>> sys_ks_futures;
|
||||
const auto& rs = node->second;
|
||||
|
||||
auto remove_ip = [&](inet_address ip, locator::host_id host_id, bool notify) -> future<> {
|
||||
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(ip));
|
||||
|
||||
if (const auto ep = _gossiper.get_endpoint_state_ptr(ip); ep && ep->get_host_id() == host_id) {
|
||||
co_await _gossiper.force_remove_endpoint(ip, gms::null_permit_id);
|
||||
if (notify) {
|
||||
nodes_to_notify.left.push_back({ip, host_id});
|
||||
switch (rs.state) {
|
||||
case node_state::normal: {
|
||||
if (is_me(ip)) {
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto process_left_node = [&] (raft::server_id id) -> future<> {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
|
||||
if (const auto ip = am.find(host_id)) {
|
||||
co_await remove_ip(*ip, host_id, true);
|
||||
}
|
||||
|
||||
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
|
||||
update_topology(host_id, t.left_nodes_rs.at(id));
|
||||
}
|
||||
|
||||
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
|
||||
co_await _messaging.local().ban_host(host_id);
|
||||
};
|
||||
|
||||
auto process_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = am.find(host_id);
|
||||
|
||||
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
|
||||
// Save tokens, not needed for raft topology management, but needed by legacy
|
||||
// Also ip -> id mapping is needed for address map recreation on reboot
|
||||
if (is_me(host_id)) {
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
|
||||
co_await _gossiper.add_local_application_state(
|
||||
std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
|
||||
std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
|
||||
std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
|
||||
);
|
||||
} else if (ip && !is_me(*ip)) {
|
||||
// In replace-with-same-ip scenario the replaced node IP will be the same
|
||||
// as ours, we shouldn't put it into system.peers.
|
||||
|
||||
@@ -501,7 +461,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
// Populate the table with the state from the gossiper here since storage_service::on_change()
|
||||
// (which is called each time gossiper state changes) may have skipped it because the tokens
|
||||
// for the node were not in the 'normal' state yet
|
||||
auto info = get_peer_info_for_update(*ip);
|
||||
auto info = get_peer_info_for_update(ip);
|
||||
if (info) {
|
||||
// And then amend with the info from raft
|
||||
info->tokens = rs.ring.value().tokens;
|
||||
@@ -509,29 +469,97 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
info->rack = rs.rack;
|
||||
info->release_version = rs.release_version;
|
||||
info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, *info));
|
||||
}
|
||||
if (!prev_normal.contains(id)) {
|
||||
nodes_to_notify.joined.push_back(*ip);
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, *info));
|
||||
}
|
||||
|
||||
if (const auto it = host_id_to_ip_map.find(host_id); it != host_id_to_ip_map.end() && it->second != *ip) {
|
||||
if (nodes_to_notify) {
|
||||
nodes_to_notify->joined.emplace_back(ip);
|
||||
}
|
||||
|
||||
if (const auto it = host_id_to_ip_map.find(id); it != host_id_to_ip_map.end() && it->second != ip) {
|
||||
utils::get_local_injector().inject("crash-before-prev-ip-removed", [] {
|
||||
slogger.info("crash-before-prev-ip-removed hit, killing the node");
|
||||
_exit(1);
|
||||
});
|
||||
// IP change is not expected to emit REMOVED_NODE notifications
|
||||
co_await remove_ip(it->second, host_id, false);
|
||||
|
||||
auto old_ip = it->second;
|
||||
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
|
||||
|
||||
if (const auto ep = _gossiper.get_endpoint_state_ptr(old_ip); ep && ep->get_host_id() == id) {
|
||||
co_await _gossiper.force_remove_endpoint(old_ip, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
case node_state::bootstrapping:
|
||||
if (!is_me(ip)) {
|
||||
utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
|
||||
rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
|
||||
_exit(1);
|
||||
});
|
||||
|
||||
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
|
||||
}
|
||||
|
||||
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
|
||||
// gossiper) to align it with the other raft topology nodes.
|
||||
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal) {
|
||||
nodes_to_notify_after_sync nodes_to_notify;
|
||||
|
||||
rtlogger.trace("Start sync_raft_topology_nodes");
|
||||
|
||||
const auto& t = _topology_state_machine._topology;
|
||||
|
||||
auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
|
||||
tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
|
||||
to_topology_node_state(rs.state), rs.shard_count);
|
||||
};
|
||||
|
||||
std::vector<future<>> sys_ks_futures;
|
||||
|
||||
auto process_left_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip) -> future<> {
|
||||
if (ip) {
|
||||
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(*ip));
|
||||
|
||||
if (const auto ep = _gossiper.get_endpoint_state_ptr(*ip); ep && ep->get_host_id() == host_id) {
|
||||
co_await _gossiper.force_remove_endpoint(*ip, gms::null_permit_id);
|
||||
nodes_to_notify.left.push_back({*ip, host_id});
|
||||
}
|
||||
}
|
||||
|
||||
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
|
||||
update_topology(host_id, t.left_nodes_rs.at(id));
|
||||
}
|
||||
|
||||
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
|
||||
co_await _messaging.local().ban_host(host_id);
|
||||
};
|
||||
|
||||
auto process_normal_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
|
||||
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
|
||||
// Save tokens, not needed for raft topology management, but needed by legacy
|
||||
// Also ip -> id mapping is needed for address map recreation on reboot
|
||||
if (is_me(host_id)) {
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
|
||||
co_await _gossiper.add_local_application_state(
|
||||
std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
|
||||
std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
|
||||
std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
|
||||
);
|
||||
}
|
||||
update_topology(host_id, rs);
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
|
||||
};
|
||||
|
||||
auto process_transition_node = [&](raft::server_id id, const replica_state& rs) -> future<> {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = am.find(host_id);
|
||||
|
||||
auto process_transition_node = [&](raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
|
||||
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
|
||||
seastar::value_of([&] () -> sstring {
|
||||
@@ -541,29 +569,16 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
switch (rs.state) {
|
||||
case node_state::bootstrapping:
|
||||
if (rs.ring.has_value()) {
|
||||
if (ip) {
|
||||
if (!is_me(*ip)) {
|
||||
utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
|
||||
rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
|
||||
_exit(1);
|
||||
});
|
||||
|
||||
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {}));
|
||||
}
|
||||
update_topology(host_id, rs);
|
||||
if (_topology_state_machine._topology.normal_nodes.empty()) {
|
||||
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
|
||||
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
|
||||
// (such as the CDC generation write).
|
||||
// It doesn't break anything to set the tokens to normal early in this single-node case.
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
|
||||
} else {
|
||||
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
|
||||
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
|
||||
}
|
||||
} else if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_new) {
|
||||
on_internal_error(rtlogger, format("Bootstrapping node {} does not have IP mapping but the topology is in the write_both_read_new state", id));
|
||||
update_topology(host_id, rs);
|
||||
if (_topology_state_machine._topology.normal_nodes.empty()) {
|
||||
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
|
||||
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
|
||||
// (such as the CDC generation write).
|
||||
// It doesn't break anything to set the tokens to normal early in this single-node case.
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
|
||||
} else {
|
||||
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
|
||||
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -576,7 +591,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
case node_state::removing:
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
|
||||
// no need for double writes anymore since op failed
|
||||
co_await process_normal_node(id, rs);
|
||||
co_await process_normal_node(id, host_id, ip, rs);
|
||||
break;
|
||||
}
|
||||
update_topology(host_id, rs);
|
||||
@@ -587,7 +602,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
case node_state::replacing: {
|
||||
SCYLLA_ASSERT(_topology_state_machine._topology.req_param.contains(id));
|
||||
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
|
||||
auto existing_ip = am.find(locator::host_id{replaced_id.uuid()});
|
||||
auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
|
||||
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
|
||||
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
|
||||
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
|
||||
@@ -599,38 +614,43 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
break;
|
||||
case node_state::rebuilding:
|
||||
// Rebuilding node is normal
|
||||
co_await process_normal_node(id, rs);
|
||||
co_await process_normal_node(id, host_id, ip, rs);
|
||||
break;
|
||||
default:
|
||||
on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id));
|
||||
}
|
||||
};
|
||||
|
||||
if (target_node) {
|
||||
raft::server_id raft_id{target_node->uuid()};
|
||||
if (t.left_nodes.contains(raft_id)) {
|
||||
co_await process_left_node(raft_id);
|
||||
} else if (auto it = t.normal_nodes.find(raft_id); it != t.normal_nodes.end()) {
|
||||
co_await process_normal_node(raft_id, it->second);
|
||||
} else if ((it = t.transition_nodes.find(raft_id)) != t.transition_nodes.end()) {
|
||||
co_await process_transition_node(raft_id, it->second);
|
||||
sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
|
||||
|
||||
for (const auto& id: t.left_nodes) {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = _address_map.find(host_id);
|
||||
co_await process_left_node(id, host_id, ip);
|
||||
if (ip) {
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
|
||||
}
|
||||
} else {
|
||||
sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
|
||||
for (const auto& id: t.left_nodes) {
|
||||
co_await process_left_node(id);
|
||||
}
|
||||
for (const auto& [id, rs]: t.normal_nodes) {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = _address_map.find(host_id);
|
||||
co_await process_normal_node(id, host_id, ip, rs);
|
||||
if (ip) {
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify));
|
||||
}
|
||||
for (const auto& [id, rs]: t.normal_nodes) {
|
||||
co_await process_normal_node(id, rs);
|
||||
}
|
||||
for (const auto& [id, rs]: t.transition_nodes) {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = _address_map.find(host_id);
|
||||
co_await process_transition_node(id, host_id, ip, rs);
|
||||
if (ip) {
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
|
||||
}
|
||||
for (const auto& [id, rs]: t.transition_nodes) {
|
||||
co_await process_transition_node(id, rs);
|
||||
}
|
||||
for (auto id : t.get_excluded_nodes()) {
|
||||
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
|
||||
if (n) {
|
||||
n->set_excluded(true);
|
||||
}
|
||||
}
|
||||
for (auto id : t.get_excluded_nodes()) {
|
||||
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
|
||||
if (n) {
|
||||
n->set_excluded(true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -746,6 +766,8 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
[[fallthrough]];
|
||||
case topology::transition_state::left_token_ring:
|
||||
[[fallthrough]];
|
||||
case topology::transition_state::truncate_table:
|
||||
[[fallthrough]];
|
||||
case topology::transition_state::rollback_to_normal:
|
||||
return read_new_t::no;
|
||||
case topology::transition_state::write_both_read_new:
|
||||
@@ -754,7 +776,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
|
||||
}, _topology_state_machine._topology.tstate);
|
||||
tmptr->set_read_new(read_new);
|
||||
|
||||
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::nullopt, std::move(prev_normal));
|
||||
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal));
|
||||
|
||||
std::optional<locator::tablet_metadata> tablets;
|
||||
if (hint.tablets_hint) {
|
||||
@@ -943,14 +965,12 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
|
||||
// If we call sync_raft_topology_nodes here directly, a gossiper lock and
|
||||
// the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock.
|
||||
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
|
||||
(void)futurize_invoke(ensure_alive([this, id, h = _ss._async_gate.hold()]() -> future<> {
|
||||
(void)futurize_invoke(ensure_alive([this, id, endpoint, h = _ss._async_gate.hold()]() -> future<> {
|
||||
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
|
||||
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
|
||||
storage_service::nodes_to_notify_after_sync nodes_to_notify;
|
||||
auto lock = co_await _ss.get_token_metadata_lock();
|
||||
co_await _ss.mutate_token_metadata([this, id, &nodes_to_notify](mutable_token_metadata_ptr t) -> future<> {
|
||||
nodes_to_notify = co_await _ss.sync_raft_topology_nodes(std::move(t), id, {});
|
||||
}, storage_service::acquire_merge_lock::no);
|
||||
// Set notify_join to true since here we detected address change and drivers have to be notified
|
||||
nodes_to_notify_after_sync nodes_to_notify;
|
||||
co_await _ss.raft_topology_update_ip(id, endpoint, &nodes_to_notify);
|
||||
co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -964,11 +964,12 @@ private:
|
||||
std::vector<gms::inet_address> joined;
|
||||
};
|
||||
|
||||
future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify);
|
||||
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
|
||||
// gossiper) to align it with the other raft topology nodes.
|
||||
// Optional target_node can be provided to restrict the synchronization to the specified node.
|
||||
// Returns a structure that describes which notifications to trigger after token metadata is updated.
|
||||
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal);
|
||||
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal);
|
||||
// Triggers notifications (on_joined, on_left) based on the recent changes to token metadata, as described by the passed in structure.
|
||||
// This function should be called on the result of `sync_raft_topology_nodes`, after the global token metadata is updated.
|
||||
future<> notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify);
|
||||
|
||||
@@ -929,130 +929,14 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
break;
|
||||
case global_topology_request::truncate_table: {
|
||||
// Execute a barrier to make sure the nodes we are performing truncate on see the session
|
||||
// and are able to create a topology_guard using the frozen_guard we are sending over RPC
|
||||
// TODO: Exclude nodes which don't contain replicas of the table we are truncating
|
||||
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
|
||||
|
||||
const utils::UUID& global_request_id = _topo_sm._topology.global_request_id.value();
|
||||
std::optional<sstring> error;
|
||||
// We should perform TRUNCATE only if the session is still valid. It could be cleared if a previous truncate
|
||||
// handler performed the truncate and cleared the session, but crashed before finalizing the request
|
||||
if (_topo_sm._topology.session) {
|
||||
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id, true);
|
||||
const table_id& table_id = topology_requests_entry.truncate_table_id;
|
||||
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(table_id);
|
||||
|
||||
if (table) {
|
||||
const sstring& ks_name = table->schema()->ks_name();
|
||||
const sstring& cf_name = table->schema()->cf_name();
|
||||
|
||||
rtlogger.info("Performing TRUNCATE TABLE global topology request for {}.{}", ks_name, cf_name);
|
||||
|
||||
// Collect the IDs of the hosts with replicas, but ignore excluded nodes
|
||||
std::unordered_set<locator::host_id> replica_hosts;
|
||||
const std::unordered_set<raft::server_id> excluded_nodes = _topo_sm._topology.get_excluded_nodes();
|
||||
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
|
||||
for (const locator::tablet_replica& replica: tinfo.replicas) {
|
||||
if (!excluded_nodes.contains(raft::server_id(replica.host.uuid()))) {
|
||||
replica_hosts.insert(replica.host);
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
// Release the guard to avoid blocking group0 for long periods of time while invoking RPCs
|
||||
release_guard(std::move(guard));
|
||||
|
||||
co_await utils::get_local_injector().inject("truncate_table_wait", [] (auto& handler) {
|
||||
rtlogger.info("truncate_table_wait: start");
|
||||
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
|
||||
});
|
||||
|
||||
// Check if all the nodes with replicas are alive
|
||||
for (const locator::host_id& replica_host: replica_hosts) {
|
||||
if (!_gossiper.is_alive(replica_host)) {
|
||||
throw std::runtime_error(::format("Cannot perform TRUNCATE on table {}.{} because host {} is down", ks_name, cf_name, replica_host));
|
||||
}
|
||||
}
|
||||
|
||||
// Send the RPC to all replicas
|
||||
const service::frozen_topology_guard frozen_guard { _topo_sm._topology.session };
|
||||
co_await coroutine::parallel_for_each(replica_hosts, [&] (const locator::host_id& host_id) -> future<> {
|
||||
co_await ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
|
||||
});
|
||||
} else {
|
||||
error = ::format("Table with UUID {} does not exist.", table_id);
|
||||
}
|
||||
|
||||
// Clear the session and save the error message
|
||||
while (true) {
|
||||
if (!guard) {
|
||||
guard = co_await start_operation();
|
||||
}
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.del_session()
|
||||
.build());
|
||||
if (error) {
|
||||
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
|
||||
.set("error", *error)
|
||||
.build());
|
||||
}
|
||||
|
||||
sstring reason = "Clear truncate session";
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("handle_global_request(): concurrent modification, retrying");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
utils::get_local_injector().inject("truncate_crash_after_session_clear", [] {
|
||||
rtlogger.info("truncate_crash_after_session_clear hit, killing the node");
|
||||
_exit(1);
|
||||
});
|
||||
|
||||
// Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point
|
||||
if (!guard) {
|
||||
guard = co_await start_operation();
|
||||
}
|
||||
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
|
||||
|
||||
// Finalize the request
|
||||
while (true) {
|
||||
if (!guard) {
|
||||
guard = co_await start_operation();
|
||||
}
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.build());
|
||||
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
|
||||
.set("end_time", db_clock::now())
|
||||
.set("done", true)
|
||||
.build());
|
||||
|
||||
sstring reason = "Truncate has completed";
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.info("handle_global_request(): concurrent modification, retrying");
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
rtlogger.info("TRUNCATE TABLE requested");
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(topology::transition_state::truncate_table)
|
||||
.build());
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "TRUNCATE TABLE requested");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1738,6 +1622,123 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet split finalization"));
|
||||
}
|
||||
|
||||
future<> handle_truncate_table(group0_guard guard) {
|
||||
// Execute a barrier to make sure the nodes we are performing truncate on see the session
|
||||
// and are able to create a topology_guard using the frozen_guard we are sending over RPC
|
||||
// TODO: Exclude nodes which don't contain replicas of the table we are truncating
|
||||
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
|
||||
|
||||
const utils::UUID& global_request_id = _topo_sm._topology.global_request_id.value();
|
||||
std::optional<sstring> error;
|
||||
// We should perform TRUNCATE only if the session is still valid. It could be cleared if a previous truncate
|
||||
// handler performed the truncate and cleared the session, but crashed before finalizing the request
|
||||
if (_topo_sm._topology.session) {
|
||||
const auto topology_requests_entry = co_await _sys_ks.get_topology_request_entry(global_request_id, true);
|
||||
const table_id& table_id = topology_requests_entry.truncate_table_id;
|
||||
lw_shared_ptr<replica::table> table = _db.get_tables_metadata().get_table_if_exists(table_id);
|
||||
|
||||
if (table) {
|
||||
const sstring& ks_name = table->schema()->ks_name();
|
||||
const sstring& cf_name = table->schema()->cf_name();
|
||||
|
||||
rtlogger.info("Performing TRUNCATE TABLE for {}.{}", ks_name, cf_name);
|
||||
|
||||
// Collect the IDs of the hosts with replicas, but ignore excluded nodes
|
||||
std::unordered_set<locator::host_id> replica_hosts;
|
||||
const std::unordered_set<raft::server_id> excluded_nodes = _topo_sm._topology.get_excluded_nodes();
|
||||
const locator::tablet_map& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& tinfo) {
|
||||
for (const locator::tablet_replica& replica: tinfo.replicas) {
|
||||
if (!excluded_nodes.contains(raft::server_id(replica.host.uuid()))) {
|
||||
replica_hosts.insert(replica.host);
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
// Release the guard to avoid blocking group0 for long periods of time while invoking RPCs
|
||||
release_guard(std::move(guard));
|
||||
|
||||
co_await utils::get_local_injector().inject("truncate_table_wait", [] (auto& handler) {
|
||||
rtlogger.info("truncate_table_wait: start");
|
||||
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
|
||||
});
|
||||
|
||||
// Check if all the nodes with replicas are alive
|
||||
for (const locator::host_id& replica_host: replica_hosts) {
|
||||
if (!_gossiper.is_alive(replica_host)) {
|
||||
throw std::runtime_error(::format("Cannot perform TRUNCATE on table {}.{} because host {} is down", ks_name, cf_name, replica_host));
|
||||
}
|
||||
}
|
||||
|
||||
// Send the RPC to all replicas
|
||||
const service::frozen_topology_guard frozen_guard { _topo_sm._topology.session };
|
||||
co_await coroutine::parallel_for_each(replica_hosts, [&] (const locator::host_id& host_id) -> future<> {
|
||||
co_await ser::storage_proxy_rpc_verbs::send_truncate_with_tablets(&_messaging, host_id, ks_name, cf_name, frozen_guard);
|
||||
});
|
||||
} else {
|
||||
error = ::format("Cannot TRUNCATE table with UUID {} because it does not exist.", table_id);
|
||||
}
|
||||
|
||||
// Clear the session and save the error message
|
||||
while (true) {
|
||||
if (!guard) {
|
||||
guard = co_await start_operation();
|
||||
}
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.del_session()
|
||||
.build());
|
||||
if (error) {
|
||||
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
|
||||
.set("error", *error)
|
||||
.build());
|
||||
}
|
||||
|
||||
try {
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "Clear truncate session");
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
utils::get_local_injector().inject("truncate_crash_after_session_clear", [] {
|
||||
rtlogger.info("truncate_crash_after_session_clear hit, killing the node");
|
||||
_exit(1);
|
||||
});
|
||||
|
||||
// Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point
|
||||
if (!guard) {
|
||||
guard = co_await start_operation();
|
||||
}
|
||||
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
|
||||
|
||||
// Finalize the request
|
||||
while (true) {
|
||||
if (!guard) {
|
||||
guard = co_await start_operation();
|
||||
}
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(topology_mutation_builder(guard.write_timestamp())
|
||||
.del_transition_state()
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.build());
|
||||
updates.push_back(topology_request_tracking_mutation_builder(global_request_id)
|
||||
.set("end_time", db_clock::now())
|
||||
.set("done", true)
|
||||
.build());
|
||||
|
||||
try {
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "Truncate has completed");
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This function must not release and reacquire the guard, callers rely
|
||||
// on the fact that the block which calls this is atomic.
|
||||
// FIXME: Don't take the ownership of the guard to make the above guarantee explicit.
|
||||
@@ -2462,6 +2463,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
co_await update_topology_state(std::move(node.guard), {builder.build(), rtbuilder.build()}, str);
|
||||
}
|
||||
break;
|
||||
case topology::transition_state::truncate_table:
|
||||
co_await handle_truncate_table(std::move(guard));
|
||||
break;
|
||||
}
|
||||
co_return true;
|
||||
};
|
||||
|
||||
@@ -152,6 +152,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
|
||||
{topology::transition_state::tablet_draining, "tablet draining"},
|
||||
{topology::transition_state::left_token_ring, "left token ring"},
|
||||
{topology::transition_state::rollback_to_normal, "rollback to normal"},
|
||||
{topology::transition_state::truncate_table, "truncate table"},
|
||||
};
|
||||
|
||||
// Allows old deprecated names to be recognized and point to the correct transition.
|
||||
|
||||
@@ -119,6 +119,7 @@ struct topology {
|
||||
tablet_resize_finalization,
|
||||
left_token_ring,
|
||||
rollback_to_normal,
|
||||
truncate_table,
|
||||
};
|
||||
|
||||
std::optional<transition_state> tstate;
|
||||
|
||||
@@ -15,12 +15,14 @@
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/fstream.hh>
|
||||
|
||||
#include <seastar/testing/test_case.hh>
|
||||
|
||||
#include "ent/encryption/encryption.hh"
|
||||
#include "ent/encryption/symmetric_key.hh"
|
||||
#include "ent/encryption/encrypted_file_impl.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/exception_utils.hh"
|
||||
@@ -181,6 +183,66 @@ SEASTAR_TEST_CASE(test_short) {
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_read_across_size_boundary) {
|
||||
auto name = "test_read_across_size_boundary";
|
||||
|
||||
auto [dst, k] = co_await make_file(name, open_flags::rw|open_flags::create);
|
||||
auto size = dst.disk_write_dma_alignment() - 1;
|
||||
co_await dst.truncate(size);
|
||||
co_await dst.close();
|
||||
|
||||
auto [f, _] = co_await make_file(name, open_flags::ro, k);
|
||||
auto a = f.disk_write_dma_alignment();
|
||||
auto m = f.memory_dma_alignment();
|
||||
|
||||
auto buf = temporary_buffer<char>::aligned(m, a);
|
||||
auto n = co_await f.dma_read(0, buf.get_write(), buf.size());
|
||||
|
||||
auto buf2 = temporary_buffer<char>::aligned(m, a);
|
||||
auto n2 = co_await f.dma_read(a, buf2.get_write(), buf2.size());
|
||||
|
||||
auto buf3 = temporary_buffer<char>::aligned(m, a);
|
||||
std::vector<iovec> iov({{buf3.get_write(), buf3.size()}});
|
||||
auto n3 = co_await f.dma_read(a, std::move(iov));
|
||||
|
||||
auto buf4 = co_await f.dma_read_bulk<char>(a, size_t(a));
|
||||
|
||||
co_await f.close();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(size, n);
|
||||
buf.trim(n);
|
||||
for (auto c : buf) {
|
||||
BOOST_REQUIRE_EQUAL(c, 0);
|
||||
}
|
||||
|
||||
BOOST_REQUIRE_EQUAL(0, n2);
|
||||
BOOST_REQUIRE_EQUAL(0, n3);
|
||||
BOOST_REQUIRE_EQUAL(0, buf4.size());
|
||||
}
|
||||
|
||||
static future<> test_read_across_size_boundary_unaligned_helper(int64_t size_off, int64_t read_off) {
|
||||
auto name = "test_read_across_size_boundary_unaligned";
|
||||
auto [dst, k] = co_await make_file(name, open_flags::rw|open_flags::create);
|
||||
auto size = dst.disk_write_dma_alignment() + size_off;
|
||||
co_await dst.truncate(size);
|
||||
co_await dst.close();
|
||||
|
||||
auto [f, k2] = co_await make_file(name, open_flags::ro, k);
|
||||
auto buf = co_await f.dma_read_bulk<char>(f.disk_write_dma_alignment() + read_off, size_t(f.disk_write_dma_alignment()));
|
||||
|
||||
co_await f.close();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(0, buf.size());
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_read_across_size_boundary_unaligned) {
|
||||
co_await test_read_across_size_boundary_unaligned_helper(-1, 1);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_read_across_size_boundary_unaligned2) {
|
||||
co_await test_read_across_size_boundary_unaligned_helper(-2, -1);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_truncating_empty) {
|
||||
auto name = "test_truncating_empty";
|
||||
auto t = co_await make_file(name, open_flags::rw|open_flags::create);
|
||||
@@ -263,3 +325,60 @@ SEASTAR_TEST_CASE(test_truncating_extend) {
|
||||
co_await f.close();
|
||||
}
|
||||
|
||||
// Reproducer for https://github.com/scylladb/scylladb/issues/22236
|
||||
SEASTAR_TEST_CASE(test_read_from_padding) {
|
||||
key_info kinfo {"AES/CBC/PKCSPadding", 128};
|
||||
shared_ptr<symmetric_key> k = make_shared<symmetric_key>(kinfo);
|
||||
testlog.info("Created symmetric key: info={} key={} ", k->info(), k->key());
|
||||
|
||||
size_t block_size;
|
||||
size_t buf_size;
|
||||
|
||||
constexpr auto& filename = "encrypted_file";
|
||||
const auto& filepath = dir.path() / filename;
|
||||
|
||||
testlog.info("Creating encrypted file {}", filepath.string());
|
||||
{
|
||||
auto [file, _] = co_await make_file(filename, open_flags::create | open_flags::wo, k);
|
||||
auto ostream = co_await make_file_output_stream(file);
|
||||
|
||||
block_size = file.disk_write_dma_alignment();
|
||||
buf_size = block_size - 1;
|
||||
|
||||
auto wbuf = seastar::temporary_buffer<char>::aligned(file.memory_dma_alignment(), buf_size);
|
||||
co_await ostream.write(wbuf.get(), wbuf.size());
|
||||
testlog.info("Wrote {} bytes to encrypted file {}", wbuf.size(), filepath.string());
|
||||
|
||||
co_await ostream.close();
|
||||
testlog.info("Length of {}: {} bytes", filename, co_await file.size());
|
||||
}
|
||||
|
||||
testlog.info("Testing DMA reads from padding area of file {}", filepath.string());
|
||||
{
|
||||
auto [file, _] = co_await make_file(filename, open_flags::ro, k);
|
||||
|
||||
// Triggering the bug requires reading from the padding area:
|
||||
// `buf_size < read_pos < file.size()`
|
||||
//
|
||||
// For `dma_read()`, we have the additional requirement that `read_pos` must be aligned.
|
||||
// For `dma_read_bulk()`, it doesn't have to.
|
||||
uint64_t read_pos = block_size;
|
||||
size_t read_len = block_size;
|
||||
auto rbuf = seastar::temporary_buffer<char>::aligned(file.memory_dma_alignment(), read_len);
|
||||
std::vector<iovec> iov {{static_cast<void*>(rbuf.get_write()), rbuf.size()}};
|
||||
|
||||
auto res = co_await file.dma_read_bulk<char>(read_pos, read_len);
|
||||
BOOST_CHECK_MESSAGE(res.size() == 0, seastar::format(
|
||||
"Bulk DMA read on pos {}, len {}: returned {} bytes instead of zero", read_pos, read_len, res.size()));
|
||||
|
||||
auto res_len = co_await file.dma_read(read_pos, iov);
|
||||
BOOST_CHECK_MESSAGE(res_len == 0, seastar::format(
|
||||
"IOV DMA read on pos {}, len {}: returned {} bytes instead of zero", read_pos, read_len, res_len));
|
||||
|
||||
res_len = co_await file.dma_read<char>(read_pos, rbuf.get_write(), read_len);
|
||||
BOOST_CHECK_MESSAGE(res_len == 0, seastar::format(
|
||||
"DMA read on pos {}, len {}: returned {} bytes instead of zero", read_pos, read_len, res_len));
|
||||
|
||||
co_await file.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,3 +259,19 @@ async def test_repair_abort(manager):
|
||||
await manager.api.client.get_json(f"/task_manager/wait_task/{id}", host=servers[0].ip_addr)
|
||||
statuses = await manager.api.client.get_json(f"/task_manager/task_status_recursive/{id}", host=servers[0].ip_addr)
|
||||
assert all([status["state"] == "failed" for status in statuses])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_keyspace_drop_during_data_sync_repair(manager):
|
||||
cfg = {
|
||||
'enable_tablets': False,
|
||||
'error_injections_at_startup': ['get_keyspace_erms_throw_no_such_keyspace']
|
||||
}
|
||||
await manager.server_add(config=cfg)
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
cql.execute("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}")
|
||||
cql.execute("CREATE TABLE ks.tbl (pk int, ck int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {'mode': 'repair'}")
|
||||
|
||||
await manager.server_add(config=cfg)
|
||||
|
||||
Reference in New Issue
Block a user