Compare commits
24 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb0d8a38f1 | ||
|
|
12787302bf | ||
|
|
f65db4e8eb | ||
|
|
df2ac0f257 | ||
|
|
093e97a539 | ||
|
|
fa6e5d0754 | ||
|
|
08518b2c12 | ||
|
|
2a75b1374e | ||
|
|
2cb9bb8f3a | ||
|
|
f1d63d014c | ||
|
|
33f7bc28da | ||
|
|
f831ca5ab5 | ||
|
|
1fe0509a9b | ||
|
|
e7d76fd8f3 | ||
|
|
700853740d | ||
|
|
3c5dd5e5ae | ||
|
|
5971b2ad97 | ||
|
|
f89315d02f | ||
|
|
d5c205194b | ||
|
|
6ad10b141a | ||
|
|
8cf8e6c87d | ||
|
|
1642c686c2 | ||
|
|
9431826c52 | ||
|
|
ba6fabfc88 |
@@ -3157,7 +3157,10 @@ static bool must_have_tokens(service::node_state nst) {
|
||||
// A decommissioning node doesn't have tokens at the end, they are
|
||||
// removed during transition to the left_token_ring state.
|
||||
case service::node_state::decommissioning: return false;
|
||||
case service::node_state::removing: return true;
|
||||
// A removing node might or might not have tokens depending on whether
|
||||
// REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled. To support both
|
||||
// cases, we allow removing nodes to not have tokens.
|
||||
case service::node_state::removing: return false;
|
||||
case service::node_state::rebuilding: return true;
|
||||
case service::node_state::normal: return true;
|
||||
case service::node_state::left: return false;
|
||||
|
||||
@@ -1043,6 +1043,8 @@ The following modes are available:
|
||||
* - ``immediate``
|
||||
- Tombstone GC is immediately performed. There is no wait time or repair requirement. This mode is useful for a table that uses the TWCS compaction strategy with no user deletes. After data is expired after TTL, ScyllaDB can perform compaction to drop the expired data immediately.
|
||||
|
||||
.. warning:: The ``repair`` mode is not supported for :term:`Colocated Tables <Colocated Table>` in this version.
|
||||
|
||||
.. _cql-per-table-tablet-options:
|
||||
|
||||
Per-table tablet options
|
||||
|
||||
@@ -86,6 +86,7 @@ stateDiagram-v2
|
||||
de_left_token_ring --> [*]
|
||||
}
|
||||
state removing {
|
||||
re_left_token_ring : left_token_ring
|
||||
re_tablet_draining : tablet_draining
|
||||
re_tablet_migration : tablet_migration
|
||||
re_write_both_read_old : write_both_read_old
|
||||
@@ -98,7 +99,8 @@ stateDiagram-v2
|
||||
re_tablet_draining --> re_write_both_read_old
|
||||
re_write_both_read_old --> re_write_both_read_new: streaming completed
|
||||
re_write_both_read_old --> re_rollback_to_normal: rollback
|
||||
re_write_both_read_new --> [*]
|
||||
re_write_both_read_new --> re_left_token_ring
|
||||
re_left_token_ring --> [*]
|
||||
}
|
||||
rebuilding --> normal: streaming completed
|
||||
decommissioning --> left: operation succeeded
|
||||
@@ -122,9 +124,10 @@ Note that these are not all states, as there are other states specific to tablet
|
||||
Writes to vnodes-based tables are going to both new and old replicas (new replicas means calculated according
|
||||
to modified token ring), reads are using old replicas.
|
||||
- `write_both_read_new` - as above, but reads are using new replicas.
|
||||
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
|
||||
it from group 0. We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `left_token_ring` - the decommissioning or removing node left the token ring, but we still need to wait until other
|
||||
nodes observe it and stop sending writes to this node. For decommission, we tell the node to shut down,
|
||||
then remove it from group 0. For removenode, the node is already down, so we skip the shutdown step.
|
||||
We also use this state to rollback a failed bootstrap or decommission.
|
||||
- `rollback_to_normal` - the decommission or removenode operation failed. Rollback the operation by
|
||||
moving the node we tried to decommission/remove back to the normal state.
|
||||
- `lock` - the topology stays in this state until externally changed (to null state), preventing topology
|
||||
@@ -141,7 +144,9 @@ reads that started before this point exist in the system. Finally we remove the
|
||||
transitioning state.
|
||||
|
||||
Decommission, removenode and replace work similarly, except they don't go through
|
||||
`commit_cdc_generation`.
|
||||
`commit_cdc_generation`. Both decommission and removenode go through the
|
||||
`left_token_ring` state to run a global barrier ensuring all nodes are aware
|
||||
of the topology change before the operation completes.
|
||||
|
||||
The state machine may also go only through the `commit_cdc_generation` state
|
||||
after getting a request from the user to create a new CDC generation if the
|
||||
|
||||
@@ -25,8 +25,7 @@ Getting Started
|
||||
:id: "getting-started"
|
||||
:class: my-panel
|
||||
|
||||
* `Install ScyllaDB (Binary Packages, Docker, or EC2) <https://www.scylladb.com/download/#core>`_ - Links to the ScyllaDB Download Center
|
||||
|
||||
* :doc:`Install ScyllaDB </getting-started/install-scylla/index/>`
|
||||
* :doc:`Configure ScyllaDB </getting-started/system-configuration/>`
|
||||
* :doc:`Run ScyllaDB in a Shared Environment </getting-started/scylla-in-a-shared-environment>`
|
||||
* :doc:`Create a ScyllaDB Cluster - Single Data Center (DC) </operating-scylla/procedures/cluster-management/create-cluster/>`
|
||||
|
||||
@@ -3,8 +3,7 @@
|
||||
ScyllaDB Housekeeping and how to disable it
|
||||
============================================
|
||||
|
||||
It is always recommended to run the latest version of ScyllaDB.
|
||||
The latest stable release version is always available from the `Download Center <https://www.scylladb.com/download/>`_.
|
||||
It is always recommended to run the latest stable version of ScyllaDB.
|
||||
|
||||
When you install ScyllaDB, it installs by default two services: **scylla-housekeeping-restart** and **scylla-housekeeping-daily**. These services check for the latest ScyllaDB version and prompt the user if they are using a version that is older than what is publicly available.
|
||||
Information about your ScyllaDB deployment, including the ScyllaDB version currently used, as well as unique user and server identifiers, are collected by a centralized service.
|
||||
|
||||
@@ -9,6 +9,8 @@ Running ``cluster repair`` on a **single node** synchronizes all data on all nod
|
||||
To synchronize all data in clusters that have both tablets-based and vnodes-based keyspaces, run :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair/>` on **all**
|
||||
of the nodes in the cluster, and :doc:`nodetool cluster repair </operating-scylla/nodetool-commands/cluster/repair/>` on **any** of the nodes in the cluster.
|
||||
|
||||
.. warning:: :term:`Colocated Tables <Colocated Table>` cannot be synchronized using cluster repair in this version.
|
||||
|
||||
To check if a keyspace enables tablets, use:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
@@ -202,3 +202,7 @@ Glossary
|
||||
The name comes from two basic operations, multiply (MU) and rotate (R), used in its inner loop.
|
||||
The MurmurHash3 version used in ScyllaDB originated from `Apache Cassandra <https://commons.apache.org/proper/commons-codec/apidocs/org/apache/commons/codec/digest/MurmurHash3.html>`_, and is **not** identical to the `official MurmurHash3 calculation <https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/utils/MurmurHash.java#L31-L33>`_. More `here <https://github.com/russss/murmur3-cassandra>`_.
|
||||
|
||||
Colocated Table
|
||||
An internal table of a special type in a :doc:`tablets </architecture/tablets>` enabled keyspace that is colocated with another base table, meaning it always has the same tablet replicas as the base table.
|
||||
Current types of colocated tables include CDC log tables, local indexes, and materialized views that have the same partition key as their base table.
|
||||
|
||||
|
||||
@@ -177,6 +177,7 @@ public:
|
||||
gms::feature driver_service_level { *this, "DRIVER_SERVICE_LEVEL"sv };
|
||||
gms::feature strongly_consistent_tables { *this, "STRONGLY_CONSISTENT_TABLES"sv };
|
||||
gms::feature client_routes { *this, "CLIENT_ROUTES"sv };
|
||||
gms::feature removenode_with_left_token_ring { *this, "REMOVENODE_WITH_LEFT_TOKEN_RING"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
@@ -2793,6 +2793,7 @@ future<> database::flush_all_tables() {
|
||||
});
|
||||
_all_tables_flushed_at = db_clock::now();
|
||||
co_await _commitlog->wait_for_pending_deletes();
|
||||
dblog.info("Forcing new commitlog segment and flushing all tables complete");
|
||||
}
|
||||
|
||||
future<db_clock::time_point> database::get_all_tables_flushed_at(sharded<database>& sharded_db) {
|
||||
|
||||
@@ -105,7 +105,7 @@ seastar::future<> service::client_routes_service::delete_client_routes_inner(con
|
||||
|
||||
seastar::future<> service::client_routes_service::set_client_routes(const std::vector<service::client_routes_service::client_route_entry>& route_entries) {
|
||||
return container().invoke_on(0, [route_entries = std::move(route_entries)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.with_retry([&cr, route_entries = std::move(route_entries)] () mutable {
|
||||
return cr.set_client_routes_inner(route_entries);
|
||||
});
|
||||
});
|
||||
@@ -113,7 +113,7 @@ seastar::future<> service::client_routes_service::set_client_routes(const std::v
|
||||
|
||||
seastar::future<> service::client_routes_service::delete_client_routes(const std::vector<service::client_routes_service::client_route_key>& route_keys) {
|
||||
return container().invoke_on(0, [route_keys = std::move(route_keys)] (service::client_routes_service& cr) -> future<> {
|
||||
return cr.with_retry([&] {
|
||||
return cr.with_retry([&cr, route_keys = std::move(route_keys)] () mutable {
|
||||
return cr.delete_client_routes_inner(route_keys);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -588,12 +588,16 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
}
|
||||
break;
|
||||
case node_state::decommissioning:
|
||||
// A decommissioning node loses its tokens when topology moves to left_token_ring.
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
|
||||
break;
|
||||
}
|
||||
[[fallthrough]];
|
||||
case node_state::removing:
|
||||
// A decommissioning or removing node loses its tokens when topology moves to left_token_ring.
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
|
||||
if (rs.state == node_state::removing && !_feature_service.removenode_with_left_token_ring) {
|
||||
on_internal_error(
|
||||
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
|
||||
// no need for double writes anymore since op failed
|
||||
co_await process_normal_node(id, host_id, ip, rs);
|
||||
|
||||
@@ -2672,6 +2672,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
while (utils::get_local_injector().enter("topology_coordinator_pause_after_streaming")) {
|
||||
co_await sleep_abortable(std::chrono::milliseconds(10), _as);
|
||||
}
|
||||
const bool removenode_with_left_token_ring = _feature_service.removenode_with_left_token_ring;
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
bool barrier_failed = false;
|
||||
// In this state writes goes to old and new replicas but reads start to be done from new replicas
|
||||
@@ -2726,7 +2727,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
break;
|
||||
case node_state::removing: {
|
||||
co_await utils::get_local_injector().inject("delay_node_removal", utils::wait_for_message(std::chrono::minutes(5)));
|
||||
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
|
||||
if (!removenode_with_left_token_ring) {
|
||||
node = retake_node(co_await remove_from_group0(std::move(node.guard), node.id), node.id);
|
||||
}
|
||||
}
|
||||
[[fallthrough]];
|
||||
case node_state::decommissioning: {
|
||||
@@ -2734,7 +2737,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
node_state next_state;
|
||||
utils::chunked_vector<canonical_mutation> muts;
|
||||
muts.reserve(2);
|
||||
if (node.rs->state == node_state::decommissioning) {
|
||||
if (removenode_with_left_token_ring || node.rs->state == node_state::decommissioning) {
|
||||
// Both decommission and removenode go through left_token_ring state
|
||||
// to ensure a global barrier is executed before the request is marked as done.
|
||||
// This ensures all nodes have observed the topology change.
|
||||
next_state = node.rs->state;
|
||||
builder.set_transition_state(topology::transition_state::left_token_ring);
|
||||
} else {
|
||||
@@ -2809,6 +2815,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
case topology::transition_state::left_token_ring: {
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
|
||||
// Need to be captured as the node variable might become invalid (e.g. moved out) at particular points.
|
||||
const auto node_rs_state = node.rs->state;
|
||||
|
||||
const bool is_removenode = node_rs_state == node_state::removing;
|
||||
|
||||
if (is_removenode && !_feature_service.removenode_with_left_token_ring) {
|
||||
on_internal_error(
|
||||
rtlogger, "removenode operation can only enter the left_token_ring state when REMOVENODE_WITH_LEFT_TOKEN_RING feature is enabled");
|
||||
}
|
||||
|
||||
auto finish_left_token_ring_transition = [&](node_to_work_on& node) -> future<> {
|
||||
// Remove the node from group0 here - in general, it won't be able to leave on its own
|
||||
// because we'll ban it as soon as we tell it to shut down.
|
||||
@@ -2828,9 +2844,16 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
muts.push_back(builder.build());
|
||||
co_await remove_view_build_statuses_on_left_node(muts, node.guard, node.id);
|
||||
co_await db::view::view_builder::generate_mutations_on_node_left(_db, _sys_ks, node.guard.write_timestamp(), locator::host_id(node.id.uuid()), muts);
|
||||
auto str = node.rs->state == node_state::decommissioning
|
||||
? ::format("finished decommissioning node {}", node.id)
|
||||
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
|
||||
auto str = std::invoke([&]() {
|
||||
switch (node_rs_state) {
|
||||
case node_state::decommissioning:
|
||||
return ::format("finished decommissioning node {}", node.id);
|
||||
case node_state::removing:
|
||||
return ::format("finished removing node {}", node.id);
|
||||
default:
|
||||
return ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
|
||||
}
|
||||
});
|
||||
co_await update_topology_state(take_guard(std::move(node)), std::move(muts), std::move(str));
|
||||
};
|
||||
|
||||
@@ -2843,6 +2866,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
if (node.id == _raft.id()) {
|
||||
// Removed node must be dead, so it shouldn't enter here (it can't coordinate its own removal).
|
||||
if (is_removenode) {
|
||||
on_internal_error(rtlogger, "removenode operation cannot be coordinated by the removed node itself");
|
||||
}
|
||||
|
||||
// Someone else needs to coordinate the rest of the decommission process,
|
||||
// because the decommissioning node is going to shut down in the middle of this state.
|
||||
rtlogger.info("coordinator is decommissioning; giving up leadership");
|
||||
@@ -2856,8 +2884,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
bool barrier_failed = false;
|
||||
// Wait until other nodes observe the new token ring and stop sending writes to this node.
|
||||
auto excluded_nodes = get_excluded_nodes_for_topology_request(node);
|
||||
try {
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes_for_topology_request(node)), node.id);
|
||||
// Removed node is added to ignored nodes, so it should be automatically excluded.
|
||||
if (is_removenode && !excluded_nodes.contains(node.id)) {
|
||||
on_internal_error(rtlogger, "removenode operation must have the removed node in excluded_nodes");
|
||||
}
|
||||
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), std::move(excluded_nodes)), node.id);
|
||||
} catch (term_changed_error&) {
|
||||
throw;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
@@ -2874,15 +2907,17 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
if (barrier_failed) {
|
||||
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
|
||||
// If barrier above failed it means there may be unfinished writes to a decommissioned node,
|
||||
// or some nodes might not have observed the new topology yet (one purpose of the barrier
|
||||
// is to make sure all nodes observed the new topology before completing the request).
|
||||
// Lets wait for the ring delay for those writes to complete and new topology to propagate
|
||||
// before continuing.
|
||||
co_await sleep_abortable(_ring_delay, _as);
|
||||
node = retake_node(co_await start_operation(), node.id);
|
||||
}
|
||||
|
||||
// Make decommissioning node a non voter before reporting operation completion below.
|
||||
// Otherwise the decommissioned node may see the completion and exit before it is removed from
|
||||
// Make decommissioning/removed node a non voter before reporting operation completion below.
|
||||
// Otherwise the node may see the completion and exit before it is removed from
|
||||
// the config at which point the removal from the config will hang if the cluster had only two
|
||||
// nodes before the decommission.
|
||||
co_await _voter_handler.on_node_removed(node.id, _as);
|
||||
@@ -2893,7 +2928,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state");
|
||||
|
||||
// Tell the node to shut down.
|
||||
// For decommission/rollback: Tell the node to shut down.
|
||||
// This is done to improve user experience when there are no failures.
|
||||
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
|
||||
// so there's no guarantee that it would learn about entering that state even if it was still
|
||||
@@ -2902,15 +2937,19 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// There is the possibility that the node will never get the message
|
||||
// and decommission will hang on that node.
|
||||
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
|
||||
//
|
||||
// For removenode: The node is already dead, no need to send shutdown command.
|
||||
auto node_id = node.id;
|
||||
bool shutdown_failed = false;
|
||||
try {
|
||||
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
|
||||
} catch (...) {
|
||||
rtlogger.warn("failed to tell node {} to shut down - it may hang."
|
||||
" It's safe to shut it down manually now. (Exception: {})",
|
||||
node.id, std::current_exception());
|
||||
shutdown_failed = true;
|
||||
if (!is_removenode) {
|
||||
try {
|
||||
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
|
||||
} catch (...) {
|
||||
rtlogger.warn("failed to tell node {} to shut down - it may hang."
|
||||
" It's safe to shut it down manually now. (Exception: {})",
|
||||
node.id, std::current_exception());
|
||||
shutdown_failed = true;
|
||||
}
|
||||
}
|
||||
if (shutdown_failed) {
|
||||
node = retake_node(co_await start_operation(), node_id);
|
||||
|
||||
@@ -604,18 +604,14 @@ async def test_driver_service_creation_failure(manager: ManagerClient) -> None:
|
||||
service_level_names = [sl.service_level for sl in service_levels]
|
||||
assert "driver" not in service_level_names
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def _verify_tasks_processed_metrics(manager, server, used_group, unused_group, func):
|
||||
number_of_requests = 1000
|
||||
number_of_requests = 3000
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
logger.info(f"group={group}, tasks_processed={res}")
|
||||
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
@@ -627,8 +623,10 @@ async def _verify_tasks_processed_metrics(manager, server, used_group, unused_gr
|
||||
await asyncio.gather(*[asyncio.to_thread(func) for i in range(number_of_requests)])
|
||||
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
assert get_processed_tasks_for_group(metrics, used_group) - initial_tasks_processed_by_used_group > number_of_requests
|
||||
assert get_processed_tasks_for_group(metrics, unused_group) - initial_tasks_processed_by_unused_group < number_of_requests
|
||||
tasks_processed_by_used_group = get_processed_tasks_for_group(metrics, used_group)
|
||||
tasks_processed_by_unused_group = get_processed_tasks_for_group(metrics, unused_group)
|
||||
assert tasks_processed_by_used_group - initial_tasks_processed_by_used_group > number_of_requests
|
||||
assert tasks_processed_by_unused_group - initial_tasks_processed_by_unused_group < number_of_requests
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_driver_service_level_not_used_for_user_queries(manager: ManagerClient) -> None:
|
||||
|
||||
@@ -52,6 +52,18 @@ KNOWN_LOG_LEVELS = {
|
||||
"OFF": "info",
|
||||
}
|
||||
|
||||
# Captures the aggregate metric before the "[READ ..., WRITE ...]" block.
|
||||
STRESS_SUMMARY_PATTERN = re.compile(r'^\s*([\d\.\,]+\d?)\s*\[.*')
|
||||
|
||||
# Extracts the READ metric number inside the "[READ ..., WRITE ...]" block.
|
||||
STRESS_READ_PATTERN = re.compile(r'.*READ:\s*([\d\.\,]+\d?)[^\d].*')
|
||||
|
||||
# Extracts the WRITE metric number inside the "[READ ..., WRITE ...]" block.
|
||||
STRESS_WRITE_PATTERN = re.compile(r'.*WRITE:\s*([\d\.\,]+\d?)[^\d].*')
|
||||
|
||||
# Splits a "key : value" line into key and value.
|
||||
STRESS_KEY_VALUE_PATTERN = re.compile(r'^\s*([^:]+)\s*:\s*(\S.*)\s*$')
|
||||
|
||||
|
||||
class NodeError(Exception):
|
||||
def __init__(self, msg: str, process: int | None = None):
|
||||
@@ -528,6 +540,15 @@ class ScyllaNode:
|
||||
return self.cluster.manager.server_get_workdir(server_id=self.server_id)
|
||||
|
||||
def stress(self, stress_options: list[str], **kwargs):
|
||||
"""
|
||||
Run `cassandra-stress` against this node.
|
||||
This method does not do any result parsing.
|
||||
|
||||
:param stress_options: List of options to pass to `cassandra-stress`.
|
||||
:param kwargs: Additional arguments to pass to `subprocess.Popen()`.
|
||||
:return: Named tuple with `stdout`, `stderr`, and `rc` (return code).
|
||||
"""
|
||||
|
||||
cmd_args = ["cassandra-stress"] + stress_options
|
||||
|
||||
if not any(opt in cmd_args for opt in ("-d", "-node", "-cloudconf")):
|
||||
@@ -549,6 +570,73 @@ class ScyllaNode:
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
|
||||
def _set_stress_val(self, key, val, res):
|
||||
"""
|
||||
Normalize a stress result string and populate aggregate/read/write metrics.
|
||||
|
||||
Removes comma-thousands separators from numbers, converts to float,
|
||||
stores the aggregate metric under `key`.
|
||||
If the value contains a "[READ ..., WRITE ...]" block, also stores the
|
||||
read and write metrics under `key:read` and `key:write`.
|
||||
|
||||
:param key: The metric name
|
||||
:param val: The metric value string
|
||||
:param res: The dictionary to populate
|
||||
"""
|
||||
|
||||
def parse_num(s):
|
||||
return float(s.replace(',', ''))
|
||||
|
||||
if "[" in val:
|
||||
p = STRESS_SUMMARY_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key] = parse_num(m.group(1))
|
||||
p = STRESS_READ_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key + ":read"] = parse_num(m.group(1))
|
||||
p = STRESS_WRITE_PATTERN
|
||||
m = p.match(val)
|
||||
if m:
|
||||
res[key + ":write"] = parse_num(m.group(1))
|
||||
else:
|
||||
try:
|
||||
res[key] = parse_num(val)
|
||||
except ValueError:
|
||||
res[key] = val
|
||||
|
||||
|
||||
def stress_object(self, stress_options=None, ignore_errors=None, **kwargs):
|
||||
"""
|
||||
Run stress test and return results as a structured metrics dictionary.
|
||||
|
||||
Runs `stress()`, finds the `Results:` section in `stdout`, and then
|
||||
processes each `key : value` line, putting it into a dictionary.
|
||||
|
||||
:param stress_options: List of stress options to pass to `stress()`.
|
||||
:param ignore_errors: Deprecated (no effect).
|
||||
:param kwargs: Additional arguments to pass to `stress()`.
|
||||
:return: Dictionary of stress test results.
|
||||
"""
|
||||
if ignore_errors:
|
||||
self.warning("passing `ignore_errors` to stress_object() is deprecated")
|
||||
ret = self.stress(stress_options, **kwargs)
|
||||
p = STRESS_KEY_VALUE_PATTERN
|
||||
res = {}
|
||||
start = False
|
||||
for line in (s.strip() for s in ret.stdout.splitlines()):
|
||||
if start:
|
||||
m = p.match(line)
|
||||
if m:
|
||||
self._set_stress_val(m.group(1).strip().lower(), m.group(2).strip(), res)
|
||||
else:
|
||||
if line == 'Results:':
|
||||
start = True
|
||||
return res
|
||||
|
||||
|
||||
def flush(self, ks: str | None = None, table: str | None = None, **kwargs) -> None:
|
||||
cmd = ["flush"]
|
||||
if ks:
|
||||
|
||||
690
test/cluster/dtest/schema_management_test.py
Normal file
690
test/cluster/dtest/schema_management_test.py
Normal file
@@ -0,0 +1,690 @@
|
||||
#
|
||||
# Copyright (C) 2015-present The Apache Software Foundation
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import string
|
||||
import threading
|
||||
import time
|
||||
from concurrent import futures
|
||||
from typing import NamedTuple
|
||||
|
||||
import pytest
|
||||
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
from cassandra.query import SimpleStatement, dict_factory
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from dtest_class import Tester, create_cf, create_ks, read_barrier
|
||||
from tools.assertions import assert_all, assert_invalid
|
||||
from tools.cluster_topology import generate_cluster_topology
|
||||
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TestSchemaManagement(Tester):
|
||||
def prepare(self, racks_num: int, has_config: bool = True):
|
||||
cluster = self.cluster
|
||||
cluster_topology = generate_cluster_topology(rack_num=racks_num)
|
||||
|
||||
if has_config:
|
||||
config = {
|
||||
"ring_delay_ms": 5000,
|
||||
}
|
||||
cluster.set_configuration_options(values=config)
|
||||
|
||||
cluster.populate(cluster_topology)
|
||||
cluster.start(wait_other_notice=True)
|
||||
|
||||
return cluster
|
||||
|
||||
|
||||
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema...")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute(
|
||||
"""
|
||||
CREATE TABLE users (
|
||||
id int,
|
||||
firstname text,
|
||||
lastname text,
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
"""
|
||||
)
|
||||
|
||||
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
|
||||
insert_statement.consistency_level = ConsistencyLevel.ALL
|
||||
session.execute(insert_statement, [0])
|
||||
|
||||
logger.debug("Altering schema")
|
||||
session.execute("ALTER TABLE users WITH comment = 'updated'")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.stop(gently=True)
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
|
||||
logger.debug("Restarting node3")
|
||||
node3.stop(gently=True)
|
||||
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
|
||||
|
||||
n_partitions = 20
|
||||
for i in range(n_partitions):
|
||||
session.execute(insert_statement, [i])
|
||||
|
||||
rows = session.execute("SELECT * FROM users")
|
||||
res = sorted(rows)
|
||||
assert len(res) == n_partitions
|
||||
for i in range(n_partitions):
|
||||
expected = [i, "A", "B"]
|
||||
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
|
||||
|
||||
def test_dropping_keyspace_with_many_columns(self):
|
||||
"""
|
||||
Exploits https://github.com/scylladb/scylla/issues/1484
|
||||
"""
|
||||
cluster = self.prepare(racks_num=1, has_config=False)
|
||||
|
||||
node1 = cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
||||
for i in range(8):
|
||||
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
|
||||
session.execute("drop keyspace testxyz")
|
||||
|
||||
for node in cluster.nodelist():
|
||||
s = self.patient_cql_connection(node)
|
||||
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
||||
s.execute("drop keyspace testxyz")
|
||||
|
||||
def test_multiple_create_table_in_parallel(self):
|
||||
"""
|
||||
Run multiple create table statements via different nodes
|
||||
1. Create a cluster of 3 nodes
|
||||
2. Run create table with different table names in parallel - check all complete
|
||||
3. Run create table with the same table name in parallel - check if they complete
|
||||
"""
|
||||
logger.debug("1. Create a cluster of 3 nodes")
|
||||
nodes_count = 3
|
||||
cluster = self.prepare(racks_num=nodes_count)
|
||||
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
|
||||
ks = "ks"
|
||||
create_ks(sessions[0], ks, nodes_count)
|
||||
|
||||
def create_table(session, table_name):
|
||||
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
|
||||
logger.debug(f"create_statement {create_statement}")
|
||||
session.execute(create_statement)
|
||||
|
||||
logger.debug("2. Run create table with different table names in parallel - check all complete")
|
||||
step2_tables = [f"t{i}" for i in range(nodes_count)]
|
||||
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
||||
list(executor.map(create_table, sessions, step2_tables))
|
||||
|
||||
for table in step2_tables:
|
||||
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
||||
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
||||
|
||||
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
|
||||
step3_table = "test"
|
||||
step3_tables = [step3_table for i in range(nodes_count)]
|
||||
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
||||
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
|
||||
for res_future in res_futures:
|
||||
try:
|
||||
res_future.result()
|
||||
except AlreadyExists as e:
|
||||
logger.info(f"expected cassandra.AlreadyExists error {e}")
|
||||
|
||||
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
||||
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
|
||||
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
||||
|
||||
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
|
||||
def test_alter_table_in_parallel_to_read_and_write(self, case):
|
||||
"""
|
||||
Create a table and write into while altering the table
|
||||
1. Create a cluster of 3 nodes and populate a table
|
||||
2. Run write/read/read_and_write" statement in a loop
|
||||
3. Alter table while inserts are running
|
||||
"""
|
||||
logger.debug("1. Create a cluster of 3 nodes and populate a table")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
col_number = 20
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
session = self.patient_exclusive_cql_connection(node1)
|
||||
|
||||
def run_stress(stress_type, col=col_number - 2):
|
||||
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
|
||||
|
||||
logger.debug("Populate")
|
||||
run_stress("write", col_number)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
logger.debug(f"2. Run {case} statement in a loop")
|
||||
statement_future = executor.submit(functools.partial(run_stress, case))
|
||||
|
||||
logger.debug(f"let's {case} statement work some time")
|
||||
time.sleep(2)
|
||||
|
||||
logger.debug("3. Alter table while inserts are running")
|
||||
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
|
||||
logger.debug(f"alter_statement {alter_statement}")
|
||||
alter_result = session.execute(alter_statement)
|
||||
logger.debug(alter_result.all())
|
||||
|
||||
logger.debug(f"wait till {case} statement finished")
|
||||
statement_future.result()
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
|
||||
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
|
||||
|
||||
logger.debug("read and check data")
|
||||
run_stress("read")
|
||||
|
||||
@pytest.mark.skip("unimplemented")
|
||||
def commitlog_replays_after_schema_change(self):
|
||||
"""
|
||||
Commitlog can be replayed even though schema has been changed
|
||||
1. Create a table and insert data
|
||||
2. Alter table
|
||||
3. Kill node
|
||||
4. Boot node and verify that commitlog have been replayed and that all data is restored
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
|
||||
def test_update_schema_while_node_is_killed(self, case):
|
||||
"""
|
||||
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
|
||||
"""
|
||||
|
||||
logger.debug("1. Create a cluster and insert data")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
def create_table_case():
|
||||
try:
|
||||
logger.debug("Creating table")
|
||||
create_c1c2_table(session)
|
||||
logger.debug("Populating")
|
||||
insert_c1c2(session, n=10)
|
||||
except AlreadyExists:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
pass
|
||||
|
||||
def alter_table_case():
|
||||
try:
|
||||
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
||||
except InvalidRequest as exc:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
assert "Invalid column name c3" in str(exc)
|
||||
|
||||
def drop_table_case():
|
||||
try:
|
||||
session.execute("DROP TABLE cf;", timeout=180)
|
||||
except InvalidRequest as exc:
|
||||
# the CQL command can be called multiple time case of retries
|
||||
assert "Cannot drop non existing table" in str(exc)
|
||||
|
||||
logger.debug("Creating keyspace")
|
||||
create_ks(session, "ks", 3)
|
||||
if case != "create_table":
|
||||
create_table_case()
|
||||
|
||||
case_map = {
|
||||
"create_table": create_table_case,
|
||||
"alter_table": alter_table_case,
|
||||
"drop_table": drop_table_case,
|
||||
}
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
logger.debug(f"2. kill node during {case}")
|
||||
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
|
||||
case_map[case]()
|
||||
kill_node_future.result()
|
||||
|
||||
logger.debug("3. Start the stopped node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
|
||||
session = self.patient_exclusive_cql_connection(node2)
|
||||
read_barrier(session)
|
||||
|
||||
def create_or_alter_table_expected_result(col_mun):
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
|
||||
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
|
||||
for key in range(10):
|
||||
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
|
||||
|
||||
expected_case_result_map = {
|
||||
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
|
||||
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
|
||||
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
|
||||
}
|
||||
logger.debug("verify that commitlog has been replayed and that all data is restored")
|
||||
expected_case_result_map[case]()
|
||||
|
||||
@pytest.mark.parametrize("is_gently_stop", [True, False])
|
||||
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
|
||||
"""
|
||||
Nodes rejoining the cluster synch on schema changes
|
||||
1. Create a cluster and insert data
|
||||
2. Stop a node
|
||||
3. Alter table
|
||||
4. Insert additional data
|
||||
5. Start the stopped node
|
||||
6. Verify the stopped node synchs on the updated schema
|
||||
"""
|
||||
|
||||
logger.debug("1. Create a cluster and insert data")
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
create_c1c2_table(session)
|
||||
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
|
||||
|
||||
logger.debug("Populating")
|
||||
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
|
||||
|
||||
logger.debug("2 Stop a node1")
|
||||
node1.stop(gently=is_gently_stop, wait_other_notice=True)
|
||||
|
||||
logger.debug("3 Alter table")
|
||||
session = self.patient_cql_connection(node2)
|
||||
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
||||
|
||||
logger.debug("4 Insert additional data")
|
||||
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
|
||||
|
||||
logger.debug("5. Start the stopped node1")
|
||||
node1.start(wait_for_binary_proto=True)
|
||||
|
||||
logger.debug("6. Verify the stopped node synchs on the updated schema")
|
||||
session = self.patient_exclusive_cql_connection(node1)
|
||||
read_barrier(session)
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
|
||||
expected = [["test", "test", "test", "test"]]
|
||||
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
|
||||
for key in range(10):
|
||||
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
|
||||
|
||||
def test_reads_schema_recreated_while_node_down(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Populating")
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
logger.debug("Stopping node2")
|
||||
node2.stop(gently=True)
|
||||
|
||||
logger.debug("Re-creating schema")
|
||||
session.execute("DROP TABLE cf;")
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
session2 = self.patient_cql_connection(node2)
|
||||
read_barrier(session2)
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
||||
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
|
||||
|
||||
def test_writes_schema_recreated_while_node_down(self):
|
||||
cluster = self.prepare(racks_num=3)
|
||||
|
||||
[node1, node2, node3] = cluster.nodelist()
|
||||
|
||||
session = self.patient_cql_connection(node1)
|
||||
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session, "ks", 3)
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Populating")
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
logger.debug("Stopping node2")
|
||||
node2.stop(gently=True, wait_other_notice=True)
|
||||
|
||||
logger.debug("Re-creating schema")
|
||||
session.execute("DROP TABLE cf;")
|
||||
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
||||
|
||||
logger.debug("Restarting node2")
|
||||
node2.start(wait_for_binary_proto=True)
|
||||
session2 = self.patient_cql_connection(node2)
|
||||
read_barrier(session2)
|
||||
|
||||
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
|
||||
|
||||
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
||||
expected = [[2, "2"]]
|
||||
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
|
||||
|
||||
|
||||
class TestLargePartitionAlterSchema(Tester):
|
||||
# Issue scylladb/scylla: #5135:
|
||||
#
|
||||
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
|
||||
# partition entry update
|
||||
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
|
||||
# entry being preempted.
|
||||
#
|
||||
# The scenario in which the problem could have happened has to involve:
|
||||
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
|
||||
# - appending writes to the partition (not overwrites)
|
||||
# - scans of the partition
|
||||
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
|
||||
# column lands in the middle (in alphabetical order) of the old column set.
|
||||
#
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read.
|
||||
#
|
||||
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
|
||||
|
||||
PARTITIONS = 50
|
||||
STRING_VALUE = string.ascii_lowercase
|
||||
|
||||
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
|
||||
if not self.cluster.nodelist():
|
||||
self.cluster.populate(cluster_topology)
|
||||
self.cluster.start(wait_other_notice=True)
|
||||
|
||||
node1 = self.cluster.nodelist()[0]
|
||||
session = self.patient_cql_connection(node=node1)
|
||||
self.create_schema(session=session, rf=rf)
|
||||
|
||||
return session
|
||||
|
||||
def create_schema(self, session, rf):
|
||||
logger.debug("Creating schema")
|
||||
create_ks(session=session, name="ks", rf=rf)
|
||||
|
||||
session.execute(
|
||||
"""
|
||||
CREATE TABLE lp_table (
|
||||
pk int,
|
||||
ck1 int,
|
||||
val1 text,
|
||||
val2 text,
|
||||
PRIMARY KEY (pk, ck1)
|
||||
);
|
||||
"""
|
||||
)
|
||||
|
||||
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
|
||||
ck = ck_start
|
||||
def _populate_loop():
|
||||
nonlocal ck
|
||||
while True:
|
||||
if stop_populating is not None and stop_populating.is_set():
|
||||
return
|
||||
if ck_end is not None and ck >= ck_end:
|
||||
return
|
||||
for pk in range(self.PARTITIONS):
|
||||
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
|
||||
data.append(row)
|
||||
yield tuple(row)
|
||||
ck += 1
|
||||
|
||||
records_written = ck - ck_start
|
||||
|
||||
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
|
||||
|
||||
parameters = _populate_loop()
|
||||
|
||||
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
|
||||
|
||||
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
|
||||
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
|
||||
return data
|
||||
|
||||
def read(self, session, ck_max, stop_reading: threading.Event = None):
|
||||
def _read_loop():
|
||||
while True:
|
||||
for ck in range(ck_max):
|
||||
for pk in range(self.PARTITIONS):
|
||||
if stop_reading is not None and stop_reading.is_set():
|
||||
return
|
||||
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
|
||||
if stop_reading is None:
|
||||
return
|
||||
|
||||
logger.debug(f"Start reading..")
|
||||
_read_loop()
|
||||
logger.debug(f"Finish reading..")
|
||||
|
||||
def add_column(self, session, column_name, column_type):
|
||||
logger.debug(f"Add {column_name} column")
|
||||
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
|
||||
|
||||
def drop_column(self, session, column_name):
|
||||
logger.debug(f"Drop {column_name} column")
|
||||
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
|
||||
|
||||
def test_large_partition_with_add_column(self):
|
||||
cluster_topology = generate_cluster_topology()
|
||||
session = self.prepare(cluster_topology, rf=1)
|
||||
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
||||
|
||||
threads = []
|
||||
timeout = 300
|
||||
ck_end = 5000
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
timeout = 900
|
||||
ck_end = 500
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stop_populating = threading.Event()
|
||||
stop_reading = threading.Event()
|
||||
# Insert new rows in background
|
||||
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
||||
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
||||
# Wait for running load
|
||||
time.sleep(10)
|
||||
self.add_column(session, "new_clmn", "int")
|
||||
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read
|
||||
logger.debug("Flush data")
|
||||
self.cluster.nodelist()[0].flush()
|
||||
|
||||
# Stop populating and reading soon after flush
|
||||
time.sleep(1)
|
||||
logger.debug("Stop populating and reading")
|
||||
stop_populating.set()
|
||||
stop_reading.set()
|
||||
|
||||
for future in futures.as_completed(threads, timeout=timeout):
|
||||
try:
|
||||
future.result()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
pytest.fail(f"Generated an exception: {exc}")
|
||||
|
||||
# Add 'null' values for the new column `new_clmn` in the expected data
|
||||
for i, _ in enumerate(data):
|
||||
data[i].append(None)
|
||||
|
||||
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
|
||||
|
||||
def test_large_partition_with_drop_column(self):
|
||||
cluster_topology = generate_cluster_topology()
|
||||
session = self.prepare(cluster_topology, rf=1)
|
||||
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
||||
|
||||
threads = []
|
||||
timeout = 300
|
||||
ck_end = 5000
|
||||
if self.cluster.scylla_mode == "debug":
|
||||
timeout = 900
|
||||
ck_end = 500
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
stop_populating = threading.Event()
|
||||
stop_reading = threading.Event()
|
||||
# Insert new rows in background
|
||||
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
||||
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
||||
# Wait for running load
|
||||
time.sleep(10)
|
||||
self.drop_column(session=session, column_name="val1")
|
||||
|
||||
# Memtable flush has to happen after a schema alter concurrently with a read
|
||||
logger.debug("Flush data")
|
||||
self.cluster.nodelist()[0].flush()
|
||||
|
||||
# Stop populating and reading soon after flush
|
||||
time.sleep(1)
|
||||
logger.debug("Stop populating and reading")
|
||||
stop_populating.set()
|
||||
stop_reading.set()
|
||||
|
||||
result = []
|
||||
for future in futures.as_completed(threads, timeout=timeout):
|
||||
try:
|
||||
result.append(future.result())
|
||||
except Exception as exc: # noqa: BLE001
|
||||
# "Unknown identifier val1" is expected error
|
||||
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
|
||||
pytest.fail(f"Generated an exception: {exc}")
|
||||
|
||||
|
||||
class HistoryVerifier:
|
||||
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
|
||||
"""
|
||||
Initialize parameters for further verification of schema history.
|
||||
:param table_name: table thats we change it's schema and verify schema history accordingly.
|
||||
"""
|
||||
|
||||
self.table_name = table_name
|
||||
self.keyspace_name = keyspace_name
|
||||
self.versions = []
|
||||
self.versions_dict = {}
|
||||
self.query = ""
|
||||
|
||||
def verify(self, session, expected_current_diff, expected_prev_diff, query):
|
||||
"""
|
||||
Verify current schema history entry by comparing to previous schema entry.
|
||||
:param session: python cql session
|
||||
:param expected_current_diff: difference of current schema from previous schema
|
||||
:param expected_prev_diff: difference of previous schema from current schema
|
||||
:param query: The query that created new schema
|
||||
"""
|
||||
|
||||
def get_table_id(session, keyspace_name, table_name):
|
||||
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
|
||||
assert table_name, f"Input table_name should have value, table_name={table_name}"
|
||||
query = "select keyspace_name,table_name,id from system_schema.tables"
|
||||
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
|
||||
current_rows = session.execute(query).current_rows
|
||||
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
|
||||
res = current_rows[0]
|
||||
return res["id"]
|
||||
|
||||
def read_schema_history_table(session, cf_id):
|
||||
"""
|
||||
read system.scylla_table_schema_history and verify current version diff from previous vesion
|
||||
:param session: python cql session
|
||||
:param cf_id: uuid of the table we changed it's schema
|
||||
"""
|
||||
|
||||
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
|
||||
res = session.execute(query).current_rows
|
||||
new_versions = list({
|
||||
entry["schema_version"]
|
||||
for entry in res
|
||||
if str(entry["schema_version"]) not in self.versions
|
||||
})
|
||||
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
|
||||
assert len(new_versions) == 1, msg
|
||||
current_version = str(new_versions[0])
|
||||
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
|
||||
columns_list = (
|
||||
{"column_name": entry["column_name"], "type": entry["type"]}
|
||||
for entry in res
|
||||
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
|
||||
)
|
||||
self.versions_dict[current_version] = {}
|
||||
for item in columns_list:
|
||||
self.versions_dict[current_version][item["column_name"]] = item["type"]
|
||||
|
||||
self.versions.append(current_version)
|
||||
if len(self.versions) > 1:
|
||||
current_id = self.versions[-1]
|
||||
previous_id = self.versions[-2]
|
||||
set_current = set(self.versions_dict[current_id].items())
|
||||
set_previous = set(self.versions_dict[previous_id].items())
|
||||
current_diff = set_current - set_previous
|
||||
previous_diff = set_previous - set_current
|
||||
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
|
||||
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
|
||||
if current_diff != expected_current_diff:
|
||||
logger.debug(msg1 + msg2)
|
||||
assert current_diff == expected_current_diff, msg1 + msg2
|
||||
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
|
||||
assert previous_diff == expected_prev_diff, msg1 + msg2
|
||||
|
||||
self.query = query
|
||||
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
|
||||
read_schema_history_table(session, cf_id)
|
||||
|
||||
|
||||
class DDL(NamedTuple):
|
||||
ddl_command: str
|
||||
expected_current_diff: set | None
|
||||
expected_prev_diff: set | None
|
||||
|
||||
|
||||
class TestSchemaHistory(Tester):
|
||||
def prepare(self):
|
||||
cluster = self.cluster
|
||||
# in case support tablets and rf-rack-valid-keyspaces
|
||||
# create cluster with 3 racks with 1 node in each rack
|
||||
cluster_topology = generate_cluster_topology(rack_num=3)
|
||||
rf = 3
|
||||
cluster.populate(cluster_topology).start(wait_other_notice=True)
|
||||
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
|
||||
create_ks(self.session, "lwt_load_ks", rf)
|
||||
|
||||
def test_schema_history_alter_table(self):
|
||||
"""test schema history changes following alter table cql commands"""
|
||||
self.prepare()
|
||||
verifier = HistoryVerifier(table_name="table2")
|
||||
queries_and_expected_diffs = [
|
||||
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
|
||||
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
|
||||
]
|
||||
for ddl in queries_and_expected_diffs:
|
||||
self.session.execute(ddl.ddl_command)
|
||||
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)
|
||||
@@ -218,6 +218,18 @@ def assert_row_count_in_select_less(
|
||||
assert count < max_rows_expected, f'Expected a row count < of {max_rows_expected} in query "{query}", but got {count}'
|
||||
|
||||
|
||||
def assert_length_equal(object_with_length, expected_length):
|
||||
"""
|
||||
Assert an object has a specific length.
|
||||
@param object_with_length The object whose length will be checked
|
||||
@param expected_length The expected length of the object
|
||||
|
||||
Examples:
|
||||
assert_length_equal(res, nb_counter)
|
||||
"""
|
||||
assert len(object_with_length) == expected_length, f"Expected {object_with_length} to have length {expected_length}, but instead is of length {len(object_with_length)}"
|
||||
|
||||
|
||||
def assert_lists_equal_ignoring_order(list1, list2, sort_key=None):
|
||||
"""
|
||||
asserts that the contents of the two provided lists are equal
|
||||
|
||||
@@ -14,6 +14,7 @@ from cassandra.query import SimpleStatement
|
||||
from cassandra.concurrent import execute_concurrent_with_args
|
||||
|
||||
from test.cluster.dtest.dtest_class import create_cf
|
||||
from test.cluster.dtest.tools import assertions
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -51,6 +52,27 @@ def insert_c1c2( # noqa: PLR0913
|
||||
execute_concurrent_with_args(session, statement, [[f"k{k}"] for k in keys], concurrency=concurrency)
|
||||
|
||||
|
||||
def query_c1c2( # noqa: PLR0913
|
||||
session,
|
||||
key,
|
||||
consistency=ConsistencyLevel.QUORUM,
|
||||
tolerate_missing=False,
|
||||
must_be_missing=False,
|
||||
c1_value="value1",
|
||||
c2_value="value2",
|
||||
ks="ks",
|
||||
cf="cf",
|
||||
):
|
||||
query = SimpleStatement(f"SELECT c1, c2 FROM {ks}.{cf} WHERE key='k{key}'", consistency_level=consistency)
|
||||
rows = list(session.execute(query))
|
||||
if not tolerate_missing and not must_be_missing:
|
||||
assertions.assert_length_equal(rows, 1)
|
||||
res = rows[0]
|
||||
assert len(res) == 2 and res[0] == c1_value and res[1] == c2_value, res
|
||||
if must_be_missing:
|
||||
assertions.assert_length_equal(rows, 0)
|
||||
|
||||
|
||||
def rows_to_list(rows):
|
||||
new_list = [list(row) for row in rows]
|
||||
return new_list
|
||||
|
||||
@@ -181,11 +181,14 @@ async def test_random_failures(manager: ManagerClient,
|
||||
LOGGER.info("Found following message in the coordinator's log:\n\t%s", matches[-1][0])
|
||||
await manager.server_stop(server_id=s_info.server_id)
|
||||
|
||||
BANNED_NOTIFICATION = "received notification of being banned from the cluster from"
|
||||
STARTUP_FAILED_PATTERN = f"init - Startup failed:|{BANNED_NOTIFICATION}"
|
||||
|
||||
if s_info in await manager.running_servers():
|
||||
LOGGER.info("Wait until the new node initialization completes or fails.")
|
||||
await server_log.wait_for("init - (Startup failed:|Scylla version .* initialization completed)", timeout=120)
|
||||
await server_log.wait_for(f"init - (Startup failed:|Scylla version .* initialization completed)|{BANNED_NOTIFICATION}", timeout=120)
|
||||
|
||||
if await server_log.grep("init - Startup failed:"):
|
||||
if await server_log.grep(STARTUP_FAILED_PATTERN):
|
||||
LOGGER.info("Check that the new node is dead.")
|
||||
expected_statuses = [psutil.STATUS_DEAD]
|
||||
else:
|
||||
@@ -216,7 +219,7 @@ async def test_random_failures(manager: ManagerClient,
|
||||
else:
|
||||
if s_info in await manager.running_servers():
|
||||
LOGGER.info("The new node is dead. Check if it failed to startup.")
|
||||
assert await server_log.grep("init - Startup failed:")
|
||||
assert await server_log.grep(STARTUP_FAILED_PATTERN)
|
||||
await manager.server_stop(server_id=s_info.server_id) # remove the node from the list of running servers
|
||||
|
||||
LOGGER.info("Try to remove the dead new node from the cluster.")
|
||||
|
||||
@@ -26,6 +26,7 @@ skip_in_release:
|
||||
- test_raft_cluster_features
|
||||
- test_cluster_features
|
||||
- dtest/limits_test
|
||||
- dtest/schema_management_test
|
||||
skip_in_debug:
|
||||
- test_shutdown_hang
|
||||
- test_replace
|
||||
|
||||
@@ -146,13 +146,13 @@ async def test_joining_old_node_fails(manager: ManagerClient) -> None:
|
||||
|
||||
# Try to add a node that doesn't support the feature - should fail
|
||||
new_server_info = await manager.server_add(start=False, property_file=servers[0].property_file())
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
|
||||
|
||||
# Try to replace with a node that doesn't support the feature - should fail
|
||||
await manager.server_stop_gracefully(servers[0].server_id)
|
||||
replace_cfg = ReplaceConfig(replaced_id=servers[0].server_id, reuse_ip_addr=False, use_host_id=False)
|
||||
new_server_info = await manager.server_add(start=False, replace_cfg=replace_cfg, property_file=servers[0].property_file())
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed")
|
||||
await manager.server_start(new_server_info.server_id, expected_error="Feature check failed|received notification of being banned from the cluster from")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -131,7 +131,7 @@ async def test_major_compaction_flush_all_tables(manager: ManagerClient, compact
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks, cf)
|
||||
|
||||
flush_log = await log.grep("Forcing new commitlog segment and flushing all tables", from_mark=mark)
|
||||
assert len(flush_log) == (1 if expect_all_table_flush else 0)
|
||||
assert len(flush_log) == (2 if expect_all_table_flush else 0)
|
||||
|
||||
# all tables should be flushed the first time unless compaction_flush_all_tables_before_major_seconds == 0
|
||||
await check_all_table_flush_in_major_compaction(compaction_flush_all_tables_before_major_seconds != 0)
|
||||
|
||||
@@ -67,7 +67,7 @@ async def test_topology_ops(request, manager: ManagerClient, tablets_enabled: bo
|
||||
|
||||
logger.info(f"Removing node {servers[0]} using {servers[1]}")
|
||||
await manager.remove_node(servers[1].server_id, servers[0].server_id)
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
servers = servers[1:]
|
||||
|
||||
logger.info("Checking results of the background writes")
|
||||
|
||||
@@ -74,7 +74,7 @@ async def test_topology_ops_encrypted(request, manager: ManagerClient, tablets_e
|
||||
|
||||
logger.info(f"Removing node {servers[0]} using {servers[1]}")
|
||||
await manager.remove_node(servers[1].server_id, servers[0].server_id)
|
||||
await check_token_ring_and_group0_consistency(manager)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
servers = servers[1:]
|
||||
|
||||
logger.info("Checking results of the background writes")
|
||||
|
||||
@@ -40,16 +40,8 @@ struct test_pinger: public direct_failure_detector::pinger {
|
||||
co_return;
|
||||
}
|
||||
|
||||
promise<> p;
|
||||
auto f = p.get_future();
|
||||
auto sub = as.subscribe([&, p = std::move(p)] () mutable noexcept {
|
||||
p.set_value();
|
||||
});
|
||||
if (!sub) {
|
||||
throw abort_requested_exception{};
|
||||
}
|
||||
co_await std::move(f);
|
||||
throw abort_requested_exception{};
|
||||
// Simulate a blocking ping that only returns when aborted.
|
||||
co_await sleep_abortable(std::chrono::hours(1), as);
|
||||
}, as);
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
for server in servers:
|
||||
await manager.api.disable_autocompaction(server.ip_addr, ks)
|
||||
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", " WITH speculative_retry = 'NONE'") as cf:
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
|
||||
Reference in New Issue
Block a user