Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
b399606af1 Replace std::vector with utils::chunked_vector in schema diff structures
Co-authored-by: mykaul <4655593+mykaul@users.noreply.github.com>
2025-12-18 11:09:14 +00:00
copilot-swe-agent[bot]
5dc8dce827 Initial plan 2025-12-18 11:05:05 +00:00
26 changed files with 68 additions and 932 deletions

View File

@@ -24,6 +24,7 @@
#include <seastar/core/sharded.hh>
#include <unordered_map>
#include "utils/chunked_vector.hh"
namespace db {
@@ -109,9 +110,9 @@ struct frozen_schema_diff {
extended_frozen_schema old_schema;
extended_frozen_schema new_schema;
};
std::vector<extended_frozen_schema> created;
std::vector<altered_schema> altered;
std::vector<extended_frozen_schema> dropped;
utils::chunked_vector<extended_frozen_schema> created;
utils::chunked_vector<altered_schema> altered;
utils::chunked_vector<extended_frozen_schema> dropped;
};
// schema_diff represents what is happening with tables or views during schema merge
@@ -121,9 +122,9 @@ struct schema_diff_per_shard {
schema_ptr new_schema;
};
std::vector<schema_ptr> created;
std::vector<altered_schema> altered;
std::vector<schema_ptr> dropped;
utils::chunked_vector<schema_ptr> created;
utils::chunked_vector<altered_schema> altered;
utils::chunked_vector<schema_ptr> dropped;
future<frozen_schema_diff> freeze() const;
@@ -143,7 +144,7 @@ struct affected_tables_and_views_per_shard {
schema_diff_per_shard tables;
schema_diff_per_shard cdc;
schema_diff_per_shard views;
std::vector<bool> columns_changed;
utils::chunked_vector<bool> columns_changed;
};
struct affected_tables_and_views {

View File

@@ -3157,10 +3157,7 @@ static bool must_have_tokens(service::node_state nst) {
// A decommissioning node doesn't have tokens at the end, they are
// removed during transition to the left_token_ring state.
case service::node_state::decommissioning: return false;
// A removing node might or might not have tokens depending on whether
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
// cases, we allow removing nodes to not have tokens.
case service::node_state::removing: return false;
case service::node_state::removing: return true;
case service::node_state::rebuilding: return true;
case service::node_state::normal: return true;
case service::node_state::left: return false;

View File

@@ -1043,8 +1043,6 @@ The following modes are available:
* - ``immediate``
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
.. _cql-per-table-tablet-options:
Per-table tablet options

View File

@@ -86,7 +86,6 @@ stateDiagram-v2
de_left_token_ring --> [*]
}
state removing {
re_left_token_ring : left_token_ring
re_tablet_draining : tablet_draining
re_tablet_migration : tablet_migration
re_write_both_read_old : write_both_read_old
@@ -99,8 +98,7 @@ stateDiagram-v2
re_tablet_draining --> re_write_both_read_old
re_write_both_read_old --> re_write_both_read_new: streaming completed
re_write_both_read_old --> re_rollback_to_normal: rollback
re_write_both_read_new --> re_left_token_ring
re_left_token_ring --> [*]
re_write_both_read_new --> [*]
}
rebuilding --> normal: streaming completed
decommissioning --> left: operation succeeded
@@ -124,10 +122,9 @@ Note that these are not all states, as there are other states specific to tablet
Writes to vnodes-based tables are going to both new and old replicas (new replicas means calculated according
to modified token ring), reads are using old replicas.
- `write_both_read_new` - as above, but reads are using new replicas.
- `left_token_ring` - the decommissioning or removing node left the token ring, but we still need to wait until other
nodes observe it and stop sending writes to this node. For decommission, we tell the node to shut down,
then remove it from group 0. For removenode, the node is already down, so we skip the shutdown step.
We also use this state to rollback a failed bootstrap or decommission.
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
it from group 0. We also use this state to rollback a failed bootstrap or decommission.
- `rollback_to_normal` - the decommission or removenode operation failed. Rollback the operation by
moving the node we tried to decommission/remove back to the normal state.
- `lock` - the topology stays in this state until externally changed (to null state), preventing topology
@@ -144,9 +141,7 @@ reads that started before this point exist in the system. Finally we remove the
transitioning state.
Decommission, removenode and replace work similarly, except they don't go through
`commit_cdc_generation`. Both decommission and removenode go through the
`left_token_ring` state to run a global barrier ensuring all nodes are aware
of the topology change before the operation completes.
`commit_cdc_generation`.
The state machine may also go only through the `commit_cdc_generation` state
after getting a request from the user to create a new CDC generation if the

View File

@@ -25,7 +25,8 @@ Getting Started
:id: "getting-started"
:class: my-panel
* :doc:`Install ScyllaDB </getting-started/install-scylla/index/>`
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/#core>`_ - Links to the ScyllaDB Download Center
* :doc:`Configure ScyllaDB </getting-started/system-configuration/>`
* :doc:`Run ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
* :doc:`Create a ScyllaDB Cluster - Single Data Center (DC) </operating-scylla/procedures/cluster-management/create-cluster/>`

View File

@@ -3,7 +3,8 @@
ScyllaDB Housekeeping and how to disable it
============================================
It is always recommended to run the latest stable version of ScyllaDB.
It is always recommended to run the latest version of ScyllaDB.
The latest stable release version is always available from the `Download Center <https://www.scylladb.com/download/>`_.
When you install ScyllaDB, it installs by default two services: **scylla-housekeeping-restart** and **scylla-housekeeping-daily**. These services check for the latest ScyllaDB version and prompt the user if they are using a version that is older than what is publicly available.
Information about your ScyllaDB deployment, including the ScyllaDB version currently used, as well as unique user and server identifiers, are collected by a centralized service.

View File

@@ -9,8 +9,6 @@ Running ``cluster repair`` on a **single node** synchronizes all data on all nod
To synchronize all data in clusters that have both tablets-based and vnodes-based keyspaces, run :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair/>` on **all**
of the nodes in the cluster, and :doc:`nodetool cluster repair </operating-scylla/nodetool-commands/cluster/repair/>` on **any** of the nodes in the cluster.
.. warning:: :term:`Colocated Tables <Colocated Table>` cannot be synchronized using cluster repair in this version.
To check if a keyspace enables tablets, use:
.. code-block:: cql

View File

@@ -202,7 +202,3 @@ Glossary
The name comes from two basic operations, multiply (MU) and rotate (R), used in its inner loop.
The MurmurHash3 version used in ScyllaDB originated from `Apache Cassandra <https://commons.apache.org/proper/commons-codec/apidocs/org/apache/commons/codec/digest/MurmurHash3.html>`_, and is **not** identical to the `official MurmurHash3 calculation <https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/MurmurHash.java#L31-L33>`_. More `here <https://github.com/russss/murmur3-cassandra>`_.
Colocated Table
An internal table of a special type in a :doc:`tablets </architecture/tablets>` enabled keyspace that is colocated with another base table, meaning it always has the same tablet replicas as the base table.
Current types of colocated tables include CDC log tables, local indexes, and materialized views that have the same partition key as their base table.

View File

@@ -177,7 +177,6 @@ public:
gms::feature driver_service_level { *this, "DRIVER_SERVICE_LEVEL"sv };
gms::feature strongly_consistent_tables { *this, "STRONGLY_CONSISTENT_TABLES"sv };
gms::feature client_routes { *this, "CLIENT_ROUTES"sv };
gms::feature removenode_with_left_token_ring { *this, "REMOVENODE_WITH_LEFT_TOKEN_RING"sv };
public:
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;

View File

@@ -2793,7 +2793,6 @@ future<> database::flush_all_tables() {
});
_all_tables_flushed_at = db_clock::now();
co_await _commitlog->wait_for_pending_deletes();
dblog.info("Forcing new commitlog segment and flushing all tables complete");
}
future<db_clock::time_point> database::get_all_tables_flushed_at(sharded<database>& sharded_db) {

View File

@@ -105,7 +105,7 @@ seastar::future<> service::client_routes_service::delete_client_routes_inner(con
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
return cr.with_retry([&cr, route_entries = std::move(route_entries)] () mutable {
return cr.with_retry([&] {
return cr.set_client_routes_inner(route_entries);
});
});
@@ -113,7 +113,7 @@ seastar::future<> service::client_routes_service::set_client_routes(const std::v
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
return cr.with_retry([&cr, route_keys = std::move(route_keys)] () mutable {
return cr.with_retry([&] {
return cr.delete_client_routes_inner(route_keys);
});
});

View File

@@ -588,16 +588,12 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
}
break;
case node_state::decommissioning:
[[fallthrough]];
case node_state::removing:
// A decommissioning or removing node loses its tokens when topology moves to left_token_ring.
// A decommissioning node loses its tokens when topology moves to left_token_ring.
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
if (rs.state == node_state::removing && !_feature_service.removenode_with_left_token_ring) {
on_internal_error(
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
}
break;
}
[[fallthrough]];
case node_state::removing:
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
co_await process_normal_node(id, host_id, ip, rs);

View File

@@ -2672,7 +2672,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
while (utils::get_local_injector().enter("topology_coordinator_pause_after_streaming")) {
co_await sleep_abortable(std::chrono::milliseconds(10), _as);
}
const bool removenode_with_left_token_ring = _feature_service.removenode_with_left_token_ring;
auto node = get_node_to_work_on(std::move(guard));
bool barrier_failed = false;
// In this state writes goes to old and new replicas but reads start to be done from new replicas
@@ -2727,9 +2726,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
break;
case node_state::removing: {
co_await utils::get_local_injector().inject("delay_node_removal", utils::wait_for_message(std::chrono::minutes(5)));
if (!removenode_with_left_token_ring) {
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
}
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
}
[[fallthrough]];
case node_state::decommissioning: {
@@ -2737,10 +2734,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
node_state next_state;
utils::chunked_vector<canonical_mutation> muts;
muts.reserve(2);
if (removenode_with_left_token_ring || node.rs->state == node_state::decommissioning) {
// Both decommission and removenode go through left_token_ring state
// to ensure a global barrier is executed before the request is marked as done.
// This ensures all nodes have observed the topology change.
if (node.rs->state == node_state::decommissioning) {
next_state = node.rs->state;
builder.set_transition_state(topology::transition_state::left_token_ring);
} else {
@@ -2815,16 +2809,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case topology::transition_state::left_token_ring: {
auto node = get_node_to_work_on(std::move(guard));
// Need to be captured as the node variable might become invalid (e.g. moved out) at particular points.
const auto node_rs_state = node.rs->state;
const bool is_removenode = node_rs_state == node_state::removing;
if (is_removenode && !_feature_service.removenode_with_left_token_ring) {
on_internal_error(
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
}
auto finish_left_token_ring_transition = [&](node_to_work_on& node) -> future<> {
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
@@ -2844,16 +2828,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
muts.push_back(builder.build());
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
auto str = std::invoke([&]() {
switch (node_rs_state) {
case node_state::decommissioning:
return ::format("finished decommissioning node {}", node.id);
case node_state::removing:
return ::format("finished removing node {}", node.id);
default:
return ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
}
});
auto str = node.rs->state == node_state::decommissioning
? ::format("finished decommissioning node {}", node.id)
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
};
@@ -2866,11 +2843,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
if (node.id == _raft.id()) {
// Removed node must be dead, so it shouldn't enter here (it can't coordinate its own removal).
if (is_removenode) {
on_internal_error(rtlogger, "removenode operation cannot be coordinated by the removed node itself");
}
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
rtlogger.info("coordinator is decommissioning; giving up leadership");
@@ -2884,13 +2856,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
auto excluded_nodes = get_excluded_nodes_for_topology_request(node);
try {
// Removed node is added to ignored nodes, so it should be automatically excluded.
if (is_removenode && !excluded_nodes.contains(node.id)) {
on_internal_error(rtlogger, "removenode operation must have the removed node in excluded_nodes");
}
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), std::move(excluded_nodes)), node.id);
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
@@ -2907,17 +2874,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
if (barrier_failed) {
// If barrier above failed it means there may be unfinished writes to a decommissioned node,
// or some nodes might not have observed the new topology yet (one purpose of the barrier
// is to make sure all nodes observed the new topology before completing the request).
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
// Lets wait for the ring delay for those writes to complete and new topology to propagate
// before continuing.
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
// Make decommissioning/removed node a non voter before reporting operation completion below.
// Otherwise the node may see the completion and exit before it is removed from
// Make decommissioning node a non voter before reporting operation completion below.
// Otherwise the decommissioned node may see the completion and exit before it is removed from
// the config at which point the removal from the config will hang if the cluster had only two
// nodes before the decommission.
co_await _voter_handler.on_node_removed(node.id, _as);
@@ -2928,7 +2893,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state");
// For decommission/rollback: Tell the node to shut down.
// Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
// so there's no guarantee that it would learn about entering that state even if it was still
@@ -2937,19 +2902,15 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// There is the possibility that the node will never get the message
// and decommission will hang on that node.
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
//
// For removenode: The node is already dead, no need to send shutdown command.
auto node_id = node.id;
bool shutdown_failed = false;
if (!is_removenode) {
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
if (shutdown_failed) {
node = retake_node(co_await start_operation(), node_id);

View File

@@ -604,14 +604,18 @@ async def test_driver_service_creation_failure(manager: ManagerClient) -> None:
service_level_names = [sl.service_level for sl in service_levels]
assert "driver" not in service_level_names
def get_processed_tasks_for_group(metrics, group):
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
if res is None:
return 0
return res
@pytest.mark.asyncio
async def _verify_tasks_processed_metrics(manager, server, used_group, unused_group, func):
number_of_requests = 3000
number_of_requests = 1000
def get_processed_tasks_for_group(metrics, group):
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
logger.info(f"group={group}, tasks_processed={res}")
if res is None:
return 0
return res
@@ -623,10 +627,8 @@ async def _verify_tasks_processed_metrics(manager, server, used_group, unused_gr
await asyncio.gather(*[asyncio.to_thread(func) for i in range(number_of_requests)])
metrics = await manager.metrics.query(server.ip_addr)
tasks_processed_by_used_group = get_processed_tasks_for_group(metrics, used_group)
tasks_processed_by_unused_group = get_processed_tasks_for_group(metrics, unused_group)
assert tasks_processed_by_used_group - initial_tasks_processed_by_used_group > number_of_requests
assert tasks_processed_by_unused_group - initial_tasks_processed_by_unused_group < number_of_requests
assert get_processed_tasks_for_group(metrics, used_group) - initial_tasks_processed_by_used_group > number_of_requests
assert get_processed_tasks_for_group(metrics, unused_group) - initial_tasks_processed_by_unused_group < number_of_requests
@pytest.mark.asyncio
async def test_driver_service_level_not_used_for_user_queries(manager: ManagerClient) -> None:

View File

@@ -52,18 +52,6 @@ KNOWN_LOG_LEVELS = {
"OFF": "info",
}
# Captures the aggregate metric before the "[READ ..., WRITE ...]" block.
STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*')
# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block.
STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*')
# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block.
STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*')
# Splits a "key : value" line into key and value.
STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$')
class NodeError(Exception):
def __init__(self, msg: str, process: int | None = None):
@@ -540,15 +528,6 @@ class ScyllaNode:
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
def stress(self, stress_options: list[str], **kwargs):
"""
Run `cassandra-stress` against this node.
This method does not do any result parsing.
:param stress_options: List of options to pass to `cassandra-stress`.
:param kwargs: Additional arguments to pass to `subprocess.Popen()`.
:return: Named tuple with `stdout`, `stderr`, and `rc` (return code).
"""
cmd_args = ["cassandra-stress"] + stress_options
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
@@ -570,73 +549,6 @@ class ScyllaNode:
except KeyboardInterrupt:
pass
def _set_stress_val(self, key, val, res):
"""
Normalize a stress result string and populate aggregate/read/write metrics.
Removes comma-thousands separators from numbers, converts to float,
stores the aggregate metric under `key`.
If the value contains a "[READ ..., WRITE ...]" block, also stores the
read and write metrics under `key:read` and `key:write`.
:param key: The metric name
:param val: The metric value string
:param res: The dictionary to populate
"""
def parse_num(s):
return float(s.replace(',', ''))
if "[" in val:
p = STRESS_SUMMARY_PATTERN
m = p.match(val)
if m:
res[key] = parse_num(m.group(1))
p = STRESS_READ_PATTERN
m = p.match(val)
if m:
res[key + ":read"] = parse_num(m.group(1))
p = STRESS_WRITE_PATTERN
m = p.match(val)
if m:
res[key + ":write"] = parse_num(m.group(1))
else:
try:
res[key] = parse_num(val)
except ValueError:
res[key] = val
def stress_object(self, stress_options=None, ignore_errors=None, **kwargs):
"""
Run stress test and return results as a structured metrics dictionary.
Runs `stress()`, finds the `Results:` section in `stdout`, and then
processes each `key : value` line, putting it into a dictionary.
:param stress_options: List of stress options to pass to `stress()`.
:param ignore_errors: Deprecated (no effect).
:param kwargs: Additional arguments to pass to `stress()`.
:return: Dictionary of stress test results.
"""
if ignore_errors:
self.warning("passing `ignore_errors` to stress_object() is deprecated")
ret = self.stress(stress_options, **kwargs)
p = STRESS_KEY_VALUE_PATTERN
res = {}
start = False
for line in (s.strip() for s in ret.stdout.splitlines()):
if start:
m = p.match(line)
if m:
self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res)
else:
if line == 'Results:':
start = True
return res
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
cmd = ["flush"]
if ks:

View File

@@ -1,690 +0,0 @@
#
# Copyright (C) 2015-present The Apache Software Foundation
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import functools
import logging
import string
import threading
import time
from concurrent import futures
from typing import NamedTuple
import pytest
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import SimpleStatement, dict_factory
from concurrent.futures import ThreadPoolExecutor
from dtest_class import Tester, create_cf, create_ks, read_barrier
from tools.assertions import assert_all, assert_invalid
from tools.cluster_topology import generate_cluster_topology
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
logger = logging.getLogger(__name__)
class TestSchemaManagement(Tester):
def prepare(self, racks_num: int, has_config: bool = True):
cluster = self.cluster
cluster_topology = generate_cluster_topology(rack_num=racks_num)
if has_config:
config = {
"ring_delay_ms": 5000,
}
cluster.set_configuration_options(values=config)
cluster.populate(cluster_topology)
cluster.start(wait_other_notice=True)
return cluster
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema...")
create_ks(session, "ks", 3)
session.execute(
"""
CREATE TABLE users (
id int,
firstname text,
lastname text,
PRIMARY KEY (id)
);
"""
)
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
insert_statement.consistency_level = ConsistencyLevel.ALL
session.execute(insert_statement, [0])
logger.debug("Altering schema")
session.execute("ALTER TABLE users WITH comment = 'updated'")
logger.debug("Restarting node2")
node2.stop(gently=True)
node2.start(wait_for_binary_proto=True)
logger.debug("Restarting node3")
node3.stop(gently=True)
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
n_partitions = 20
for i in range(n_partitions):
session.execute(insert_statement, [i])
rows = session.execute("SELECT * FROM users")
res = sorted(rows)
assert len(res) == n_partitions
for i in range(n_partitions):
expected = [i, "A", "B"]
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
def test_dropping_keyspace_with_many_columns(self):
"""
Exploits https://github.com/scylladb/scylla/issues/1484
"""
cluster = self.prepare(racks_num=1, has_config=False)
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
for i in range(8):
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
session.execute("drop keyspace testxyz")
for node in cluster.nodelist():
s = self.patient_cql_connection(node)
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
s.execute("drop keyspace testxyz")
def test_multiple_create_table_in_parallel(self):
"""
Run multiple create table statements via different nodes
1. Create a cluster of 3 nodes
2. Run create table with different table names in parallel - check all complete
3. Run create table with the same table name in parallel - check if they complete
"""
logger.debug("1. Create a cluster of 3 nodes")
nodes_count = 3
cluster = self.prepare(racks_num=nodes_count)
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
ks = "ks"
create_ks(sessions[0], ks, nodes_count)
def create_table(session, table_name):
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
logger.debug(f"create_statement {create_statement}")
session.execute(create_statement)
logger.debug("2. Run create table with different table names in parallel - check all complete")
step2_tables = [f"t{i}" for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
list(executor.map(create_table, sessions, step2_tables))
for table in step2_tables:
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
step3_table = "test"
step3_tables = [step3_table for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
for res_future in res_futures:
try:
res_future.result()
except AlreadyExists as e:
logger.info(f"expected cassandra.AlreadyExists error {e}")
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
def test_alter_table_in_parallel_to_read_and_write(self, case):
"""
Create a table and write into while altering the table
1. Create a cluster of 3 nodes and populate a table
2. Run write/read/read_and_write" statement in a loop
3. Alter table while inserts are running
"""
logger.debug("1. Create a cluster of 3 nodes and populate a table")
cluster = self.prepare(racks_num=3)
col_number = 20
[node1, node2, node3] = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
def run_stress(stress_type, col=col_number - 2):
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
logger.debug("Populate")
run_stress("write", col_number)
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. Run {case} statement in a loop")
statement_future = executor.submit(functools.partial(run_stress, case))
logger.debug(f"let's {case} statement work some time")
time.sleep(2)
logger.debug("3. Alter table while inserts are running")
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
logger.debug(f"alter_statement {alter_statement}")
alter_result = session.execute(alter_statement)
logger.debug(alter_result.all())
logger.debug(f"wait till {case} statement finished")
statement_future.result()
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
logger.debug("read and check data")
run_stress("read")
@pytest.mark.skip("unimplemented")
def commitlog_replays_after_schema_change(self):
"""
Commitlog can be replayed even though schema has been changed
1. Create a table and insert data
2. Alter table
3. Kill node
4. Boot node and verify that commitlog have been replayed and that all data is restored
"""
raise NotImplementedError
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
def test_update_schema_while_node_is_killed(self, case):
"""
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
def create_table_case():
try:
logger.debug("Creating table")
create_c1c2_table(session)
logger.debug("Populating")
insert_c1c2(session, n=10)
except AlreadyExists:
# the CQL command can be called multiple time case of retries
pass
def alter_table_case():
try:
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Invalid column name c3" in str(exc)
def drop_table_case():
try:
session.execute("DROP TABLE cf;", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Cannot drop non existing table" in str(exc)
logger.debug("Creating keyspace")
create_ks(session, "ks", 3)
if case != "create_table":
create_table_case()
case_map = {
"create_table": create_table_case,
"alter_table": alter_table_case,
"drop_table": drop_table_case,
}
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. kill node during {case}")
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
case_map[case]()
kill_node_future.result()
logger.debug("3. Start the stopped node2")
node2.start(wait_for_binary_proto=True)
session = self.patient_exclusive_cql_connection(node2)
read_barrier(session)
def create_or_alter_table_expected_result(col_mun):
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
expected_case_result_map = {
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
}
logger.debug("verify that commitlog has been replayed and that all data is restored")
expected_case_result_map[case]()
@pytest.mark.parametrize("is_gently_stop", [True, False])
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
"""
Nodes rejoining the cluster synch on schema changes
1. Create a cluster and insert data
2. Stop a node
3. Alter table
4. Insert additional data
5. Start the stopped node
6. Verify the stopped node synchs on the updated schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
create_c1c2_table(session)
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
logger.debug("Populating")
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
logger.debug("2 Stop a node1")
node1.stop(gently=is_gently_stop, wait_other_notice=True)
logger.debug("3 Alter table")
session = self.patient_cql_connection(node2)
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
logger.debug("4 Insert additional data")
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
logger.debug("5. Start the stopped node1")
node1.start(wait_for_binary_proto=True)
logger.debug("6. Verify the stopped node synchs on the updated schema")
session = self.patient_exclusive_cql_connection(node1)
read_barrier(session)
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
expected = [["test", "test", "test", "test"]]
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
def test_reads_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
def test_writes_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True, wait_other_notice=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
expected = [[2, "2"]]
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
class TestLargePartitionAlterSchema(Tester):
# Issue scylladb/scylla: #5135:
#
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
# partition entry update
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
# entry being preempted.
#
# The scenario in which the problem could have happened has to involve:
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
# - appending writes to the partition (not overwrites)
# - scans of the partition
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
# column lands in the middle (in alphabetical order) of the old column set.
#
# Memtable flush has to happen after a schema alter concurrently with a read.
#
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
PARTITIONS = 50
STRING_VALUE = string.ascii_lowercase
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
if not self.cluster.nodelist():
self.cluster.populate(cluster_topology)
self.cluster.start(wait_other_notice=True)
node1 = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node=node1)
self.create_schema(session=session, rf=rf)
return session
def create_schema(self, session, rf):
logger.debug("Creating schema")
create_ks(session=session, name="ks", rf=rf)
session.execute(
"""
CREATE TABLE lp_table (
pk int,
ck1 int,
val1 text,
val2 text,
PRIMARY KEY (pk, ck1)
);
"""
)
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
ck = ck_start
def _populate_loop():
nonlocal ck
while True:
if stop_populating is not None and stop_populating.is_set():
return
if ck_end is not None and ck >= ck_end:
return
for pk in range(self.PARTITIONS):
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
data.append(row)
yield tuple(row)
ck += 1
records_written = ck - ck_start
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
parameters = _populate_loop()
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
return data
def read(self, session, ck_max, stop_reading: threading.Event = None):
def _read_loop():
while True:
for ck in range(ck_max):
for pk in range(self.PARTITIONS):
if stop_reading is not None and stop_reading.is_set():
return
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
if stop_reading is None:
return
logger.debug(f"Start reading..")
_read_loop()
logger.debug(f"Finish reading..")
def add_column(self, session, column_name, column_type):
logger.debug(f"Add {column_name} column")
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
def drop_column(self, session, column_name):
logger.debug(f"Drop {column_name} column")
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
def test_large_partition_with_add_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.add_column(session, "new_clmn", "int")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
for future in futures.as_completed(threads, timeout=timeout):
try:
future.result()
except Exception as exc: # noqa: BLE001
pytest.fail(f"Generated an exception: {exc}")
# Add 'null' values for the new column `new_clmn` in the expected data
for i, _ in enumerate(data):
data[i].append(None)
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
def test_large_partition_with_drop_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.drop_column(session=session, column_name="val1")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
result = []
for future in futures.as_completed(threads, timeout=timeout):
try:
result.append(future.result())
except Exception as exc: # noqa: BLE001
# "Unknown identifier val1" is expected error
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
pytest.fail(f"Generated an exception: {exc}")
class HistoryVerifier:
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
"""
Initialize parameters for further verification of schema history.
:param table_name: table thats we change it's schema and verify schema history accordingly.
"""
self.table_name = table_name
self.keyspace_name = keyspace_name
self.versions = []
self.versions_dict = {}
self.query = ""
def verify(self, session, expected_current_diff, expected_prev_diff, query):
"""
Verify current schema history entry by comparing to previous schema entry.
:param session: python cql session
:param expected_current_diff: difference of current schema from previous schema
:param expected_prev_diff: difference of previous schema from current schema
:param query: The query that created new schema
"""
def get_table_id(session, keyspace_name, table_name):
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
assert table_name, f"Input table_name should have value, table_name={table_name}"
query = "select keyspace_name,table_name,id from system_schema.tables"
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
current_rows = session.execute(query).current_rows
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
res = current_rows[0]
return res["id"]
def read_schema_history_table(session, cf_id):
"""
read system.scylla_table_schema_history and verify current version diff from previous vesion
:param session: python cql session
:param cf_id: uuid of the table we changed it's schema
"""
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
res = session.execute(query).current_rows
new_versions = list({
entry["schema_version"]
for entry in res
if str(entry["schema_version"]) not in self.versions
})
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
assert len(new_versions) == 1, msg
current_version = str(new_versions[0])
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
columns_list = (
{"column_name": entry["column_name"], "type": entry["type"]}
for entry in res
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
)
self.versions_dict[current_version] = {}
for item in columns_list:
self.versions_dict[current_version][item["column_name"]] = item["type"]
self.versions.append(current_version)
if len(self.versions) > 1:
current_id = self.versions[-1]
previous_id = self.versions[-2]
set_current = set(self.versions_dict[current_id].items())
set_previous = set(self.versions_dict[previous_id].items())
current_diff = set_current - set_previous
previous_diff = set_previous - set_current
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
if current_diff != expected_current_diff:
logger.debug(msg1 + msg2)
assert current_diff == expected_current_diff, msg1 + msg2
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
assert previous_diff == expected_prev_diff, msg1 + msg2
self.query = query
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
read_schema_history_table(session, cf_id)
class DDL(NamedTuple):
ddl_command: str
expected_current_diff: set | None
expected_prev_diff: set | None
class TestSchemaHistory(Tester):
def prepare(self):
cluster = self.cluster
# in case support tablets and rf-rack-valid-keyspaces
# create cluster with 3 racks with 1 node in each rack
cluster_topology = generate_cluster_topology(rack_num=3)
rf = 3
cluster.populate(cluster_topology).start(wait_other_notice=True)
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
create_ks(self.session, "lwt_load_ks", rf)
def test_schema_history_alter_table(self):
"""test schema history changes following alter table cql commands"""
self.prepare()
verifier = HistoryVerifier(table_name="table2")
queries_and_expected_diffs = [
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
]
for ddl in queries_and_expected_diffs:
self.session.execute(ddl.ddl_command)
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)

View File

@@ -218,18 +218,6 @@ def assert_row_count_in_select_less(
assert count < max_rows_expected, f'Expected a row count < of {max_rows_expected} in query "{query}", but got {count}'
def assert_length_equal(object_with_length, expected_length):
"""
Assert an object has a specific length.
@param object_with_length The object whose length will be checked
@param expected_length The expected length of the object
Examples:
assert_length_equal(res, nb_counter)
"""
assert len(object_with_length) == expected_length, f"Expected {object_with_length} to have length {expected_length}, but instead is of length {len(object_with_length)}"
def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
"""
asserts that the contents of the two provided lists are equal

View File

@@ -14,7 +14,6 @@ from cassandra.query import SimpleStatement
from cassandra.concurrent import execute_concurrent_with_args
from test.cluster.dtest.dtest_class import create_cf
from test.cluster.dtest.tools import assertions
logger = logging.getLogger(__name__)
@@ -52,27 +51,6 @@ def insert_c1c2( # noqa: PLR0913
execute_concurrent_with_args(session, statement, [[f"k{k}"] for k in keys], concurrency=concurrency)
def query_c1c2( # noqa: PLR0913
session,
key,
consistency=ConsistencyLevel.QUORUM,
tolerate_missing=False,
must_be_missing=False,
c1_value="value1",
c2_value="value2",
ks="ks",
cf="cf",
):
query = SimpleStatement(f"SELECT c1, c2 FROM {ks}.{cf} WHERE key='k{key}'", consistency_level=consistency)
rows = list(session.execute(query))
if not tolerate_missing and not must_be_missing:
assertions.assert_length_equal(rows, 1)
res = rows[0]
assert len(res) == 2 and res[0] == c1_value and res[1] == c2_value, res
if must_be_missing:
assertions.assert_length_equal(rows, 0)
def rows_to_list(rows):
new_list = [list(row) for row in rows]
return new_list

View File

@@ -181,14 +181,11 @@ async def test_random_failures(manager: ManagerClient,
LOGGER.info("Found following message in the coordinator's log:\n\t%s", matches[-1][0])
await manager.server_stop(server_id=s_info.server_id)
BANNED_NOTIFICATION = "received notification of being banned from the cluster from"
STARTUP_FAILED_PATTERN = f"init - Startup failed:|{BANNED_NOTIFICATION}"
if s_info in await manager.running_servers():
LOGGER.info("Wait until the new node initialization completes or fails.")
await server_log.wait_for(f"init - (Startup failed:|Scylla version .* initialization completed)|{BANNED_NOTIFICATION}", timeout=120)
await server_log.wait_for("init - (Startup failed:|Scylla version .* initialization completed)", timeout=120)
if await server_log.grep(STARTUP_FAILED_PATTERN):
if await server_log.grep("init - Startup failed:"):
LOGGER.info("Check that the new node is dead.")
expected_statuses = [psutil.STATUS_DEAD]
else:
@@ -219,7 +216,7 @@ async def test_random_failures(manager: ManagerClient,
else:
if s_info in await manager.running_servers():
LOGGER.info("The new node is dead. Check if it failed to startup.")
assert await server_log.grep(STARTUP_FAILED_PATTERN)
assert await server_log.grep("init - Startup failed:")
await manager.server_stop(server_id=s_info.server_id) # remove the node from the list of running servers
LOGGER.info("Try to remove the dead new node from the cluster.")

View File

@@ -26,7 +26,6 @@ skip_in_release:
- test_raft_cluster_features
- test_cluster_features
- dtest/limits_test
- dtest/schema_management_test
skip_in_debug:
- test_shutdown_hang
- test_replace

View File

@@ -146,13 +146,13 @@ async def test_joining_old_node_fails(manager: ManagerClient) -> None:
# Try to add a node that doesn't support the feature - should fail
new_server_info = await manager.server_add(start=False, property_file=servers[0].property_file())
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
# Try to replace with a node that doesn't support the feature - should fail
await manager.server_stop_gracefully(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id=servers[0].server_id, reuse_ip_addr=False, use_host_id=False)
new_server_info = await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[0].property_file())
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
@pytest.mark.asyncio

View File

@@ -131,7 +131,7 @@ async def test_major_compaction_flush_all_tables(manager: ManagerClient, compact
await manager.api.keyspace_compaction(server.ip_addr, ks, cf)
flush_log = await log.grep("Forcing new commitlog segment and flushing all tables", from_mark=mark)
assert len(flush_log) == (2 if expect_all_table_flush else 0)
assert len(flush_log) == (1 if expect_all_table_flush else 0)
# all tables should be flushed the first time unless compaction_flush_all_tables_before_major_seconds == 0
await check_all_table_flush_in_major_compaction(compaction_flush_all_tables_before_major_seconds != 0)

View File

@@ -67,7 +67,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
logger.info(f"Removing node {servers[0]} using {servers[1]}")
await manager.remove_node(servers[1].server_id, servers[0].server_id)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
await check_token_ring_and_group0_consistency(manager)
servers = servers[1:]
logger.info("Checking results of the background writes")

View File

@@ -74,7 +74,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
logger.info(f"Removing node {servers[0]} using {servers[1]}")
await manager.remove_node(servers[1].server_id, servers[0].server_id)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
await check_token_ring_and_group0_consistency(manager)
servers = servers[1:]
logger.info("Checking results of the background writes")

View File

@@ -40,8 +40,16 @@ struct test_pinger: public direct_failure_detector::pinger {
co_return;
}
// Simulate a blocking ping that only returns when aborted.
co_await sleep_abortable(std::chrono::hours(1), as);
promise<> p;
auto f = p.get_future();
auto sub = as.subscribe([&, p = std::move(p)] () mutable noexcept {
p.set_value();
});
if (!sub) {
throw abort_requested_exception{};
}
co_await std::move(f);
throw abort_requested_exception{};
}, as);
co_return ret;
}

View File

@@ -64,7 +64,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
for server in servers:
await manager.api.disable_autocompaction(server.ip_addr, ks)
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", " WITH speculative_retry = 'NONE'") as cf:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)