mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 12:17:02 +00:00
Compare commits
7 Commits
ykaul/skip
...
fix-invali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45d5f9b827 | ||
|
|
86472e43e1 | ||
|
|
d5efd1f676 | ||
|
|
d14d07a079 | ||
|
|
70261dc674 | ||
|
|
e414b2b0b9 | ||
|
|
99ac36b353 |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2026.2.0-dev
|
||||
VERSION=2026.3.0-dev
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -1088,7 +1088,7 @@ void compaction_manager::register_metrics() {
|
||||
sm::make_gauge("normalized_backlog", [this] { return _last_backlog / available_memory(); },
|
||||
sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")),
|
||||
sm::make_counter("validation_errors", [this] { return _validation_errors; },
|
||||
sm::description("Holds the number of encountered validation errors.")).set_skip_when_empty(),
|
||||
sm::description("Holds the number of encountered validation errors.")),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ Example:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
nodetool removenode 675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
|
||||
To only mark the node as permanently down without doing actual removal, use :doc:`nodetool excludenode </operating-scylla/nodetool-commands/excludenode>`:
|
||||
|
||||
@@ -79,6 +79,6 @@ Example:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1db0-aac8-43fddce9123e 675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -74,7 +74,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UJ 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
UJ 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
Nodes in the cluster finished streaming data to the new node:
|
||||
|
||||
@@ -86,7 +86,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
#. When the new node status is Up Normal (UN), run the :doc:`nodetool cleanup </operating-scylla/nodetool-commands/cleanup>` command on all nodes in the cluster except for the new node that has just been added. Cleanup removes keys that were streamed to the newly added node and are no longer owned by the node.
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ Adding new nodes
|
||||
-- Address Load Tokens Owns Host ID Rack
|
||||
UN 192.168.1.10 500 MB 256 33.3% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c RACK0
|
||||
UN 192.168.1.11 500 MB 256 33.3% 125ed9f4-7777-1dbn-mac8-43fddce9123e RACK1
|
||||
UN 192.168.1.12 500 MB 256 33.3% 675ed9f4-6564-6dbd-can8-43fddce952gy RACK2
|
||||
UN 192.168.1.12 500 MB 256 33.3% 675ed9f4-6564-6dbd-ca08-43fddce952de RACK2
|
||||
UJ 192.168.2.10 250 MB 256 ? a1b2c3d4-5678-90ab-cdef-112233445566 RACK0
|
||||
|
||||
**Example output after bootstrap completes:**
|
||||
@@ -205,7 +205,7 @@ Adding new nodes
|
||||
-- Address Load Tokens Owns Host ID Rack
|
||||
UN 192.168.1.10 400 MB 256 25.0% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c RACK0
|
||||
UN 192.168.1.11 400 MB 256 25.0% 125ed9f4-7777-1dbn-mac8-43fddce9123e RACK1
|
||||
UN 192.168.1.12 400 MB 256 25.0% 675ed9f4-6564-6dbd-can8-43fddce952gy RACK2
|
||||
UN 192.168.1.12 400 MB 256 25.0% 675ed9f4-6564-6dbd-ca08-43fddce952de RACK2
|
||||
UN 192.168.2.10 400 MB 256 25.0% a1b2c3d4-5678-90ab-cdef-112233445566 RACK0
|
||||
|
||||
#. For tablets-enabled clusters, wait for tablet load balancing to complete.
|
||||
|
||||
@@ -163,5 +163,5 @@ This example shows how to install and configure a three-node cluster using Gossi
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c 43
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e 44
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy 45
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de 45
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ Prerequisites
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-lac8-23fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
Datacenter: ASIA-DC
|
||||
Status=Up/Down
|
||||
@@ -165,7 +165,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
Datacenter: EUROPE-DC
|
||||
Status=Up/Down
|
||||
|
||||
@@ -18,7 +18,7 @@ Removing a Running Node
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
UN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
#. If the node status is **Up Normal (UN)**, run the :doc:`nodetool decommission </operating-scylla/nodetool-commands/decommission>` command
|
||||
to remove the node you are connected to. Using ``nodetool decommission`` is the recommended method for cluster scale-down operations. It prevents data loss
|
||||
@@ -75,7 +75,7 @@ command providing the Host ID of the node you are removing. See :doc:`nodetool r
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
nodetool removenode 675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
|
||||
The ``nodetool removenode`` command notifies other nodes that the token range it owns needs to be moved and
|
||||
the nodes should redistribute the data using streaming. Using the command does not guarantee the consistency of the rebalanced data if
|
||||
|
||||
@@ -23,7 +23,7 @@ Prerequisites
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
DN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
Login to one of the nodes in the cluster with (UN) status, collect the following info from the node:
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ Down (DN), and the node can be replaced.
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
Remove the Data
|
||||
==================
|
||||
@@ -72,7 +72,7 @@ Procedure
|
||||
|
||||
For example (using the Host ID of the failed node from above):
|
||||
|
||||
``replace_node_first_boot: 675ed9f4-6564-6dbd-can8-43fddce952gy``
|
||||
``replace_node_first_boot: 675ed9f4-6564-6dbd-ca08-43fddce952de``
|
||||
|
||||
#. Start the new node.
|
||||
|
||||
@@ -90,7 +90,7 @@ Procedure
|
||||
-- Address Load Tokens Owns (effective) Host ID Rack
|
||||
UN 192.168.1.201 112.82 KB 256 32.7% 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c B1
|
||||
UN 192.168.1.202 91.11 KB 256 32.9% 125ed9f4-7777-1dbn-mac8-43fddce9123e B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-can8-43fddce952gy B1
|
||||
DN 192.168.1.203 124.42 KB 256 32.6% 675ed9f4-6564-6dbd-ca08-43fddce952de B1
|
||||
|
||||
``192.168.1.203`` is the dead node.
|
||||
|
||||
@@ -121,7 +121,7 @@ Procedure
|
||||
/192.168.1.203
|
||||
generation:1553759866
|
||||
heartbeat:2147483647
|
||||
HOST_ID:675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
HOST_ID:675ed9f4-6564-6dbd-ca08-43fddce952de
|
||||
STATUS:shutdown,true
|
||||
RELEASE_VERSION:3.0.8
|
||||
X3:3
|
||||
@@ -178,7 +178,7 @@ In this case, the node's data will be cleaned after restart. To remedy this, you
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
echo 'replace_node_first_boot: 675ed9f4-6564-6dbd-can8-43fddce952gy' | sudo tee --append /etc/scylla/scylla.yaml
|
||||
echo 'replace_node_first_boot: 675ed9f4-6564-6dbd-ca08-43fddce952de' | sudo tee --append /etc/scylla/scylla.yaml
|
||||
|
||||
#. Run the following command to re-setup RAID
|
||||
|
||||
|
||||
@@ -5171,8 +5171,16 @@ static void test_sstable_write_large_row_f(schema_ptr s, reader_permit permit, r
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_row) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
// Use a fixed partition key. The row-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5244,8 +5252,16 @@ static void test_sstable_write_large_cell_f(schema_ptr s, reader_permit permit,
|
||||
SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation partition(s.schema(), s.make_pkey());
|
||||
// Use a fixed partition key. The cell-size thresholds below are chosen at exact
|
||||
// byte boundaries of the MC sstable row encoding: the first clustering row body
|
||||
// encodes prev_row_size as a vint, and prev_row_size includes the partition
|
||||
// header (which contains the partition key's serialized length+bytes). A
|
||||
// random-size partition key (as produced by simple_schema::make_pkey() /
|
||||
// tests::generate_partition_key(), which default to key_size{1,128}) would
|
||||
// perturb the encoded row size by 1-2 bytes across runs and flip the threshold
|
||||
// comparison, making this test flaky. Under smp=1 (which this test runs with),
|
||||
// a fixed key is always shard-local, so no sharding-metadata issue arises.
|
||||
mutation partition = s.new_mutation("pv");
|
||||
const partition_key& pk = partition.key();
|
||||
s.add_static_row(partition, "foo bar zed");
|
||||
|
||||
@@ -5264,7 +5280,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_write_large_cell) {
|
||||
static void test_sstable_log_too_many_rows_f(int rows, int range_tombstones, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
for (auto idx = 0; idx < rows - 1; idx++) {
|
||||
@@ -5326,7 +5341,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
static void test_sstable_log_too_many_dead_rows_f(int rows, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s;
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
sstring sv;
|
||||
int live_rows = 0;
|
||||
@@ -5436,7 +5450,6 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_dead_rows) {
|
||||
static void test_sstable_too_many_collection_elements_f(int elements, uint64_t threshold, bool expected, sstable_version_types version) {
|
||||
simple_schema s(simple_schema::with_static::no, simple_schema::with_collection::yes);
|
||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||
// Use make_pkey() to generate a shard-local key (avoids "Failed to generate sharding metadata").
|
||||
mutation p(s.schema(), s.make_pkey());
|
||||
std::map<bytes, bytes> kv_map;
|
||||
for (auto i = 0; i < elements; i++) {
|
||||
@@ -5512,7 +5525,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_round_trip) {
|
||||
// Create a mutation with a clustering row whose serialized cell value
|
||||
// exceeds the 1-byte thresholds, so partition_size, row_size, and
|
||||
// cell_size records are all generated.
|
||||
// Use make_pkey() (no argument) to generate a key on this shard.
|
||||
auto pk = ss.make_pkey();
|
||||
mutation m(s, pk);
|
||||
auto ck = ss.make_ckey("ck1");
|
||||
@@ -5622,7 +5634,6 @@ SEASTAR_THREAD_TEST_CASE(test_large_data_records_top_n_bounded) {
|
||||
// Create 6 partitions, each with one row of increasing size.
|
||||
// Since each partition has exactly one row, we get 6 row_size records
|
||||
// competing for 3 slots.
|
||||
// Use make_pkeys() to generate shard-local keys.
|
||||
auto pkeys = ss.make_pkeys(6);
|
||||
utils::chunked_vector<mutation> muts;
|
||||
for (int i = 0; i < 6; i++) {
|
||||
|
||||
@@ -18,7 +18,7 @@ from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from multiprocessing import Event
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from test import TOP_SRC_DIR, path_to
|
||||
from test import TOP_SRC_DIR, MODES_TIMEOUT_FACTOR, path_to
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.skip_types import skip_env
|
||||
from test.pylib.util import unique_name
|
||||
@@ -394,3 +394,8 @@ async def key_provider(request, tmpdir, scylla_binary):
|
||||
"""Encryption providers fixture"""
|
||||
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
|
||||
yield res
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def failure_detector_timeout(build_mode):
|
||||
return 2000 * MODES_TIMEOUT_FACTOR[build_mode]
|
||||
|
||||
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
|
||||
async def test_kill_coordinator_during_op(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
""" Kill coordinator with error injection while topology operation is running for cluster: decommission,
|
||||
bootstrap, removenode, replace.
|
||||
|
||||
@@ -41,7 +41,7 @@ async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
|
||||
"""
|
||||
# Decrease the failure detector threshold so we don't have to wait for too long.
|
||||
config = {
|
||||
'failure_detector_timeout_in_ms': 2000
|
||||
'failure_detector_timeout_in_ms': failure_detector_timeout
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'raft_topology=trace',
|
||||
|
||||
@@ -22,11 +22,11 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.nightly
|
||||
async def test_banned_node_notification(manager: ManagerClient) -> None:
|
||||
async def test_banned_node_notification(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Test that a node banned from the cluster get notification about been banned"""
|
||||
# Decrease the failure detector threshold so we don't have to wait for too long.
|
||||
config = {
|
||||
'failure_detector_timeout_in_ms': 2000
|
||||
'failure_detector_timeout_in_ms': failure_detector_timeout
|
||||
}
|
||||
srvs = await manager.servers_add(3, config=config, auto_rack_dc="dc")
|
||||
cql = manager.get_cql()
|
||||
|
||||
@@ -60,14 +60,14 @@ async def make_servers(manager: ManagerClient, servers_num: int,
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raft_replace_ignore_nodes(manager: ManagerClient) -> None:
|
||||
async def test_raft_replace_ignore_nodes(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace 3 dead nodes.
|
||||
|
||||
This is a slow test with a 7 node cluster and 3 replace operations,
|
||||
we want to run it only in dev mode.
|
||||
"""
|
||||
logger.info("Booting initial cluster")
|
||||
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await make_servers(manager, 7, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
|
||||
s1_id = await manager.get_host_id(servers[1].server_id)
|
||||
s2_id = await manager.get_host_id(servers[2].server_id)
|
||||
|
||||
@@ -21,9 +21,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_different_ip(manager: ManagerClient) -> None:
|
||||
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node using a different IP address"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
logger.info(f"cluster started, servers {servers}")
|
||||
|
||||
logger.info(f"replacing server {servers[0]}")
|
||||
@@ -67,18 +67,18 @@ async def test_replace_different_ip(manager: ManagerClient) -> None:
|
||||
logger.info(f"server {s} system.peers and gossiper state is valid")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
|
||||
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node reusing the replaced node host id"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
|
||||
await manager.server_add(replace_cfg)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node using the same IP address"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
|
||||
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
|
||||
|
||||
logger.info(f"creating test table")
|
||||
@@ -130,9 +130,9 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
||||
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
|
||||
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
"""Replace an existing node with new node using the same IP address and same host id"""
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
|
||||
await manager.server_add(replace_cfg)
|
||||
|
||||
@@ -14,9 +14,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
|
||||
async def test_replace_with_same_ip_twice(manager: ManagerClient, failure_detector_timeout) -> None:
|
||||
logger.info("starting a cluster with two nodes")
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
||||
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
|
||||
logger.info(f"cluster started {servers}")
|
||||
|
||||
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:
|
||||
|
||||
@@ -119,14 +119,14 @@ async def test_tablet_transition_sanity(manager: ManagerClient, action):
|
||||
@pytest.mark.parametrize("fail_stage", ["streaming", "allow_write_both_read_old", "write_both_read_old", "write_both_read_new", "use_new", "cleanup", "cleanup_target", "end_migration", "revert_migration"])
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage):
|
||||
async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail_replica, fail_stage, failure_detector_timeout):
|
||||
if fail_stage == 'cleanup' and fail_replica == 'destination':
|
||||
skip_env('Failing destination during cleanup is pointless')
|
||||
if fail_stage == 'cleanup_target' and fail_replica == 'source':
|
||||
skip_env('Failing source during target cleanup is pointless')
|
||||
|
||||
logger.info("Bootstrapping cluster")
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': 2000}
|
||||
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled', 'failure_detector_timeout_in_ms': failure_detector_timeout}
|
||||
host_ids = []
|
||||
servers = []
|
||||
|
||||
|
||||
@@ -37,6 +37,6 @@ class ServerInfo(NamedTuple):
|
||||
class ServerUpState(IntEnum):
|
||||
PROCESS_STARTED = auto()
|
||||
HOST_ID_QUERIED = auto()
|
||||
CQL_CONNECTED = auto()
|
||||
CQL_QUERIED = auto()
|
||||
CQL_ALTERNATOR_CONNECTED = auto()
|
||||
CQL_ALTERNATOR_QUERIED = auto()
|
||||
SERVING = auto() # Scylla sent sd_notify("serving")
|
||||
|
||||
@@ -389,7 +389,7 @@ class ManagerClient:
|
||||
seeds: list[IPAddress] | None = None,
|
||||
timeout: float | None = None,
|
||||
connect_driver: bool = True,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -540,7 +540,7 @@ class ManagerClient:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT,
|
||||
server_encryption: str = "none",
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
connect_driver: bool = True) -> ServerInfo:
|
||||
"""Add a new server"""
|
||||
if expected_error is not None:
|
||||
|
||||
@@ -41,6 +41,7 @@ import yaml
|
||||
import signal
|
||||
import glob
|
||||
import errno
|
||||
import json
|
||||
import re
|
||||
import platform
|
||||
import contextlib
|
||||
@@ -557,7 +558,7 @@ class ScyllaServer:
|
||||
async def install_and_start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> None:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> None:
|
||||
"""Setup and start this server."""
|
||||
|
||||
await self.install()
|
||||
@@ -677,10 +678,63 @@ class ScyllaServer:
|
||||
return None
|
||||
return maintenance_socket_option
|
||||
|
||||
async def get_cql_up_state(self) -> ServerUpState | None:
|
||||
"""Get the CQL up state (a check we use at start up).
|
||||
def _alternator_ports(self) -> list[tuple[str, int]]:
|
||||
"""Return (scheme, port) for every configured Alternator port."""
|
||||
ports = []
|
||||
if "alternator_port" in self.config:
|
||||
ports.append(("http", self.config["alternator_port"]))
|
||||
if "alternator_https_port" in self.config:
|
||||
ports.append(("https", self.config["alternator_https_port"]))
|
||||
return ports
|
||||
|
||||
Return None if it fails to connect.
|
||||
async def check_alternator_connected(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""TCP connect to every configured Alternator port.
|
||||
|
||||
Returns True if all ports accept connections.
|
||||
"""
|
||||
for _, port in ports:
|
||||
try:
|
||||
_, writer = await asyncio.wait_for(
|
||||
asyncio.open_connection(self.ip_addr, port), timeout=2)
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except (OSError, asyncio.TimeoutError):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def check_alternator_queried(self, ports: list[tuple[str, int]]) -> bool:
|
||||
"""Sends a GetItem for a randomly-named nonexistent table and validates
|
||||
that the response is a DynamoDB-shaped JSON error (contains __type),
|
||||
confirming Alternator is processing DynamoDB API requests.
|
||||
|
||||
Returns True if all ports respond correctly.
|
||||
"""
|
||||
table_name = f"nonexistent_table_{uuid.uuid4().hex}"
|
||||
headers = {
|
||||
"Content-Type": "application/x-amz-json-1.0",
|
||||
"X-Amz-Target": "DynamoDB_20120810.GetItem",
|
||||
}
|
||||
body = json.dumps({"TableName": table_name, "Key": {"k": {"S": "k"}}})
|
||||
timeout = aiohttp.ClientTimeout(total=2)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
for scheme, port in ports:
|
||||
url = f"{scheme}://{self.ip_addr}:{port}/"
|
||||
try:
|
||||
# ssl=False skips certificate verification
|
||||
async with session.post(url, headers=headers, data=body, ssl=False) as resp:
|
||||
response_body = await resp.json(content_type=None)
|
||||
if "__type" not in response_body:
|
||||
return False
|
||||
except Exception as exc:
|
||||
self.logger.debug("Alternator query check failed for %s: %s", url, exc)
|
||||
return False
|
||||
return True
|
||||
|
||||
async def get_cql_up_state(self) -> tuple[bool, bool]:
|
||||
"""Check CQL connectivity.
|
||||
|
||||
Returns (connected, queried) indicating whether a CQL connection
|
||||
was established and whether a query executed successfully.
|
||||
"""
|
||||
caslog = logging.getLogger('cassandra')
|
||||
oldlevel = caslog.getEffectiveLevel()
|
||||
@@ -708,6 +762,7 @@ class ScyllaServer:
|
||||
request_timeout=self.TOPOLOGY_TIMEOUT)
|
||||
contact_points=[self.rpc_address]
|
||||
connected = False
|
||||
cql_queried = False
|
||||
try:
|
||||
# In a cluster setup, it's possible that the CQL
|
||||
# here is directed to a node different from the initial contact
|
||||
@@ -730,13 +785,32 @@ class ScyllaServer:
|
||||
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
|
||||
auth_provider=self.auth_provider)
|
||||
self.control_connection = self.control_cluster.connect()
|
||||
return ServerUpState.CQL_QUERIED
|
||||
cql_queried = True
|
||||
except (NoHostAvailable, InvalidRequest, OperationTimedOut) as exc:
|
||||
self.logger.debug("Exception when checking if CQL is up: %s", exc)
|
||||
return ServerUpState.CQL_CONNECTED if connected else None
|
||||
finally:
|
||||
caslog.setLevel(oldlevel)
|
||||
# Any other exception may indicate a problem, and is passed to the caller.
|
||||
return connected, cql_queried
|
||||
|
||||
async def get_alternator_up_state(self, ports: list[tuple[str, int]]) -> tuple[bool, bool]:
|
||||
connected = await self.check_alternator_connected(ports)
|
||||
queried = connected and await self.check_alternator_queried(ports)
|
||||
return connected, queried
|
||||
|
||||
async def get_cql_alternator_up_state(self) -> ServerUpState | None:
|
||||
"""Get the combined CQL + Alternator up state."""
|
||||
cql_connected, cql_queried = await self.get_cql_up_state()
|
||||
alt_connected, alt_queried = False, False
|
||||
alt_ports = self._alternator_ports() # `alt_ports` empty = no Alternator
|
||||
if alt_ports:
|
||||
alt_connected, alt_queried = await self.get_alternator_up_state(alt_ports)
|
||||
if cql_queried and (alt_queried or not alt_ports):
|
||||
return ServerUpState.CQL_ALTERNATOR_QUERIED
|
||||
if not cql_connected or (alt_ports and not alt_connected):
|
||||
return None
|
||||
# Here both CQL and Alternator (if exists) are at least connected
|
||||
return ServerUpState.CQL_ALTERNATOR_CONNECTED
|
||||
|
||||
def _setup_notify_socket(self) -> None:
|
||||
"""Create a Unix datagram socket for receiving sd_notify messages from Scylla."""
|
||||
@@ -824,7 +898,7 @@ class ScyllaServer:
|
||||
async def start(self,
|
||||
api: ScyllaRESTAPIClient,
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None) -> None:
|
||||
"""Start an installed server.
|
||||
@@ -897,9 +971,9 @@ class ScyllaServer:
|
||||
if await self.try_get_host_id(api):
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
server_up_state = await self.get_cql_up_state() or server_up_state
|
||||
server_up_state = await self.get_cql_alternator_up_state() or server_up_state
|
||||
# Check for SERVING state (sd_notify "serving" message)
|
||||
if server_up_state >= ServerUpState.CQL_QUERIED and self.check_serving_notification():
|
||||
if server_up_state >= ServerUpState.CQL_ALTERNATOR_QUERIED and self.check_serving_notification():
|
||||
server_up_state = ServerUpState.SERVING
|
||||
if server_up_state >= expected_server_up_state:
|
||||
if expected_error is not None:
|
||||
@@ -1194,7 +1268,7 @@ class ScyllaCluster:
|
||||
seeds: Optional[List[IPAddress]] = None,
|
||||
server_encryption: str = "none",
|
||||
expected_error: Optional[str] = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED) -> ServerInfo:
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED) -> ServerInfo:
|
||||
"""Add a new server to the cluster"""
|
||||
self.is_dirty = True
|
||||
|
||||
@@ -1434,7 +1508,7 @@ class ScyllaCluster:
|
||||
server_id: ServerNum,
|
||||
expected_error: str | None = None,
|
||||
seeds: list[IPAddress] | None = None,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_QUERIED,
|
||||
expected_server_up_state: ServerUpState = ServerUpState.CQL_ALTERNATOR_QUERIED,
|
||||
cmdline_options_override: list[str] | None = None,
|
||||
append_env_override: dict[str, str] | None = None,
|
||||
auth_provider: dict[str, str] | None = None) -> None:
|
||||
@@ -1917,7 +1991,7 @@ class ScyllaClusterManager:
|
||||
server_id=server_id,
|
||||
expected_error=data.get("expected_error"),
|
||||
seeds=data.get("seeds"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
cmdline_options_override=data.get("cmdline_options_override"),
|
||||
append_env_override=data.get("append_env_override"),
|
||||
auth_provider=data.get("auth_provider"),
|
||||
@@ -1952,7 +2026,7 @@ class ScyllaClusterManager:
|
||||
seeds=data.get("seeds"),
|
||||
server_encryption=data.get("server_encryption", "none"),
|
||||
expected_error=data.get("expected_error"),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_QUERIED")),
|
||||
expected_server_up_state=getattr(ServerUpState, data.get("expected_server_up_state", "CQL_ALTERNATOR_QUERIED")),
|
||||
)
|
||||
return s_info.as_dict()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user