Compare commits

...

9 Commits

Author SHA1 Message Date
Tomasz Grabiec
66439bb753 Merge 'load_balancer: apply balance threshold to intranode shard balancing' from Ferenc Szili
- Fix intranode shard balancing to respect the size-based balance threshold, preventing unnecessary migrations when load difference between shards is negligible
- Add a regression test that verifies the threshold is respected for intranode balancing

The intranode shard balancing loop only stopped when the algorithm exhausted the migration candidates or when a migration would go against convergence (it would increase imbalance instead of decrease it). This caused unnecessary tablet migrations for negligible imbalances (e.g., 0.78% difference between shards).

The inter-node balancer already uses `is_balanced()` to stop when the relative load difference is within the configured `size_based_balance_threshold`, but this check was missing from the intranode path.

Apply the same `is_balanced()` threshold check that is already used for inter-node balancing to the intranode convergence loop. When the relative load difference between the most-loaded and least-loaded shards on a node is within the threshold, the balancer now stops without issuing further migrations.

The test creates a single node with 2 shards and 512 tablets:
1. **Balanced scenario** (257 vs 255 tablets, same size): relative diff = 0.78% < 1% threshold → verifies no intranode migration is emitted
2. **Unbalanced scenario** (307 vs 205 tablets, same size): relative diff = 33% >> 1% threshold → verifies intranode migration IS emitted

Fixes: SCYLLADB-1775

This is a performance improvement which reduces the number of intranode migrations issued, and needs to be backported to versions with size-based load balancing: 2026.1 and 2026.2

Closes scylladb/scylladb#29756

* github.com:scylladb/scylladb:
  test: add test for intranode balance threshold in size-based mode
  tablet_allocator: apply balance threshold to intranode shard balancing
2026-05-13 13:09:52 +02:00
Piotr Smaron
0fcae72530 test: bootstrap tombstone gc repair cluster sequentially
Avoid concurrent topology changes in the tombstone GC repair setup, where debug-mode nodes running hinted handoff and materialized view startup work can time out while applying Raft entries before the test starts.

Keep the sequential path opt-in so unrelated repair tests still exercise concurrent bootstrap behavior.

Closes scylladb/scylladb#29829
2026-05-13 13:58:44 +03:00
Nadav Har'El
51c35c05e2 test/cqlpy: teach run-cassandra to use Docker
The test/cqlpy/run-cassandra script makes it quite easy to run test/cqlpy
tests against Cassandra, which is important for checking compatibility.

Unfortunately, because modern Linux distributions like Fedora do not have
either Cassandra or the old version of Java that it needs, the user needs
to download those manually. This is fairly easy, and explained in detail
in test/cqlpy/README.md, but nevertheless is a non-trivial manual step.

So this patch adds an even simpler alternative, the "--docker" option
which tells the script to run the official Cassandra docker image,
complete with the version of Java that it prefers - the user does not
need to download or install Cassandra or Java. The image is efficiently
cached by Docker, so running run-cassandra again doesn't need to
download it again; Moreover, trying several different versions of
Cassandra only needs to download and store the shared parts (base image
and Java) once.

test/cqlpy/run-cassandra --docker test_file.py::test_function

Runs by default the latest Cassandra 5 release. You can also use
"--docker=4" to get the latest Cassandra 4 release, "--docker=3.11"
to get the latest Cassandra 3.11 patch release, or "--docker=3.11.1"
to get a specific patch release.

In addition to the "--docker" option, this patch also introduces a
second option, "--java-docker", which takes *only* Java from docker,
but runs your locally installed Cassandra (to which you should point
with the CASSANDRA environment variable, as before). This option can
be useful if your host does not have a suitable version of Java, but
you want to run a locally-installed or locally-modified version of
Cassandra. The "--java-docker" option defaults to getting Java 11,
to use other versions you can use for example "--java-docker=17".

Fixes #25826.

Closes scylladb/scylladb#29860
2026-05-13 11:57:18 +02:00
Patryk Jędrzejczak
3f2ff5a13f Merge 'Remove raft_group0::finish_setup_after_join' from Gleb Natapov
The function does nothing useful now.

No backport needed. Removes code.

Closes scylladb/scylladb#29828

* https://github.com/scylladb/scylladb:
  raft_group0: remove finish_setup_after_join function
  raft_group0: fix indentation after the last change
  raft_group: drop unneeded checks
2026-05-13 10:53:37 +02:00
Ferenc Szili
6856f51097 test: add test for intranode balance threshold in size-based mode
Verify that the load balancer does not issue intranode migrations when
the load difference between shards is within the size_based_balance_threshold,
and that it does issue migrations when the difference exceeds the threshold.
2026-05-12 10:34:25 +02:00
Ferenc Szili
aaead10e5d tablet_allocator: apply balance threshold to intranode shard balancing
The intranode shard balancing loop only stopped when the most-loaded
and least-loaded shard were the same (src == dst), meaning it would
keep issuing migrations until the load difference reached exactly 0.
This caused unnecessary migrations for negligible imbalances.

Apply the same is_balanced() threshold check that is already used for
inter-node balancing, so that intranode migrations stop when the
relative load difference between shards is within the configured
size_based_balance_threshold (default 1%).
2026-05-12 10:34:16 +02:00
Gleb Natapov
c3d2f0bde9 raft_group0: remove finish_setup_after_join function
The only thing it does not change a bootstrapping node to become a voter
in case the cluster does not support limited voters feature. But the
feature was introduced in 2025.2 and direct upgrade from 2025.1 to
version newer than 2026.1 is not supported. But even if such upgrade is
done the removed code has affect only during bootstrap, not during
regular boot.

Also remove the upgrade test since after the patch suppressing the
feature on the first boot will no longer behave correctly.
2026-05-11 15:38:36 +03:00
Gleb Natapov
5213aee99f raft_group0: fix indentation after the last change 2026-05-11 11:56:26 +03:00
Gleb Natapov
5f7f72fa50 raft_group: drop unneeded checks 2026-05-11 11:55:39 +03:00
10 changed files with 356 additions and 137 deletions

View File

@@ -724,44 +724,6 @@ future<> raft_group0::setup_group0(
co_await sys_ks.save_group0_upgrade_state("use_post_raft_procedures");
}
future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
if (joined_group0()) {
group0_log.info("finish_setup_after_join: group 0 ID present, loading server info.");
auto my_id = load_my_id();
if (!_raft_gr.group0().get_configuration().can_vote(my_id)) {
if (!_feat.group0_limited_voters) {
// limited voters feature not enabled yet
// - need to become a voter in here
group0_log.info("finish_setup_after_join: becoming a voter in the group 0 configuration...");
// Just bootstrapped and joined as non-voter. Become a voter.
auto pause_shutdown = _shutdown_gate.hold();
raft::server_address my_addr{my_id, {}};
co_await run_op_with_retry(_abort_source, [this, my_addr]() -> future<operation_result> {
try {
co_await _raft_gr.group0().modify_config({{my_addr, raft::is_voter::yes}}, {}, &_abort_source);
} catch (const raft::commit_status_unknown& e) {
group0_log.info("finish_setup_after_join({}): modify_config returned \"{}\", retrying", my_addr, e);
co_return operation_result::failure;
}
co_return operation_result::success;
}, "finish_setup_after_join->modify_config", {});
group0_log.info("finish_setup_after_join: became a group 0 voter.");
}
// No need to run `upgrade_to_group0()` since we must have bootstrapped with Raft
// (that's the only way to join as non-voter today).
co_return;
}
} else {
// We're either upgrading or in recovery mode.
}
if (!_feat.supports_raft_cluster_mgmt) {
throw std::runtime_error("finish_setup_after_join: SUPPORTS_RAFT feature not yet enabled, but was expected to be enabled at this point."
" If you are trying to upgrade a node pf a cluster that is not using Raft yet, this is no longer supported.");
}
}
bool raft_group0::is_member(raft::server_id id, bool include_voters_only) {
if (!joined_group0()) {
on_internal_error(group0_log, "called is_member before we joined group 0");

View File

@@ -198,13 +198,6 @@ public:
//
future<> setup_group0_if_exist(db::system_keyspace&, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
// Call at the end of the startup procedure, after the node entered NORMAL state.
// `setup_group0()` must have finished earlier.
//
// If the node has just bootstrapped, causes the group 0 server to become a voter.
//
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
// Check whether the given Raft server is a member of group 0 configuration
// according to our current knowledge.
//

View File

@@ -1727,8 +1727,6 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
throw std::runtime_error(err);
}
co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local());
// Initializes monitor only after updating local topology.
start_tablet_split_monitor();

View File

@@ -3214,7 +3214,16 @@ public:
// Convergence check
// When in shuffle mode, exit condition is guaranteed by running out of candidates or by load limit.
if (!shuffle && src == dst) {
auto node_is_balanced = [&] {
auto min_load = node_load.shard_load(dst);
auto max_load = node_load.shard_load(src);
// We can't compute accurate load without disk capacity, so stop balancing this node
if (!min_load || !max_load) {
return true;
}
return is_balanced(*min_load, *max_load);
};
if (!shuffle && (src == dst || node_is_balanced())) {
lblogger.debug("Node {} is balanced", host);
break;
}

View File

@@ -5913,6 +5913,125 @@ SEASTAR_THREAD_TEST_CASE(test_drain_node_without_capacity) {
}).get();
}
// Verifies that the intranode shard balancer respects the size-based balance threshold
// and does not issue migrations when the load difference between shards is within the threshold.
// Without the threshold check, the balancer would keep issuing migrations of small tablets
// from the slightly-heavier shard until load difference reaches exactly 0, or it can't find
// a tablet to migrate because all migrations would go against convergence.
SEASTAR_THREAD_TEST_CASE(test_intranode_balance_threshold) {
auto cfg = tablet_cql_test_config();
// Set the balance threshold explicitly
cfg.db_config->size_based_balance_threshold_percentage.set(1.0);
do_with_cql_env_thread([] (auto& e) {
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug);
topology_builder topo(e);
// Single node with 2 shards, RF=1. This isolates intranode balancing.
const unsigned shard_count = 2;
auto host1 = topo.add_node(node_state::normal, shard_count);
// Set up capacity for size-based balancing mode.
// Each shard has 100GB capacity, so the node has 200GB total.
const uint64_t shard_capacity = 100UL * 1024UL * 1024UL * 1024UL;
const uint64_t node_capacity = shard_capacity * shard_count;
topo.get_shared_load_stats().set_capacity(host1, node_capacity);
// Create a table with 512 tablets.
// Place 257 on shard 0 and 255 on shard 1, all with the same size.
// This gives:
// Relative load diff = (257 - 255) / 257 = 0.78% < 1% threshold
// But each individual tablet passes the per-tablet convergence check:
// 257*S > 255*S + S => 257 > 256 => true
// So without the threshold fix, the balancer issues a migration.
// With the fix, it recognizes the node is balanced and stops.
const size_t tablet_count = 512;
size_t tablets_on_shard0 = 257;
size_t tablets_on_shard1 = 255;
const uint64_t tablet_size = 100UL * 1024UL * 1024UL; // 100MB each
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, tablet_count);
auto table1 = add_table(e, ks_name).get();
auto& stm = e.shared_token_metadata().local();
auto& load_stats = topo.get_shared_load_stats();
auto mutate_tmap = [&] (tablet_metadata& tmeta) -> future<> {
tablet_map tmap(tablet_count);
auto tid = tmap.first_tablet();
for (size_t i = 0; i < tablets_on_shard0; ++i) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set { tablet_replica {host1, 0} }
});
if (i < tablets_on_shard0 - 1) {
tid = *tmap.next_tablet(tid);
}
}
for (size_t i = 0; i < tablets_on_shard1; ++i) {
tid = *tmap.next_tablet(tid);
tmap.set_tablet(tid, tablet_info {
tablet_replica_set { tablet_replica {host1, 1} }
});
}
tmeta.set_tablet_map(table1, std::move(tmap));
co_return;
};
mutate_tablets(e, mutate_tmap);
// Set uniform per-tablet sizes
auto& tmap = stm.get()->tablets().get_tablet_map(table1);
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
locator::range_based_tablet_id rb_tid {table1, tmap.get_token_range(tid)};
load_stats.set_tablet_size(host1, rb_tid, tablet_size);
return make_ready_future<>();
}).get();
load_stats.set_size(table1, tablet_count * tablet_size);
// Call the balancer once and verify NO intranode migrations in the plan.
auto& talloc = e.get_tablet_allocator().local();
auto& topology = e.get_topology_state_machine().local()._topology;
auto& sys_ks = e.get_system_keyspace().local();
{
auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, load_stats.get()).get();
for (auto&& mig : plan.migrations()) {
BOOST_REQUIRE_MESSAGE(mig.kind != tablet_transition_kind::intranode_migration,
"Unexpected intranode migration when load difference is within threshold");
}
}
// Now create a clearly unbalanced scenario: 307 vs 205 tablets.
// Relative diff = (307 - 205) / 307 = 33% >> 1% threshold
tablets_on_shard0 = 307;
tablets_on_shard1 = 205;
mutate_tablets(e, mutate_tmap);
// Update tablet sizes for new layout
auto& tmap2 = stm.get()->tablets().get_tablet_map(table1);
tmap2.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
locator::range_based_tablet_id rb_tid {table1, tmap2.get_token_range(tid)};
load_stats.set_tablet_size(host1, rb_tid, tablet_size);
return make_ready_future<>();
}).get();
// Call balancer once - this time intranode migrations SHOULD be emitted.
{
auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, load_stats.get()).get();
bool saw_intranode_migration = false;
for (auto&& mig : plan.migrations()) {
if (mig.kind == tablet_transition_kind::intranode_migration) {
saw_intranode_migration = true;
break;
}
}
BOOST_REQUIRE_MESSAGE(saw_intranode_migration,
"Expected intranode migration when load difference exceeds threshold");
}
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_tablet_range_splitter) {
simple_schema ss;

View File

@@ -1200,9 +1200,11 @@ async def _setup_tombstone_gc_cluster(manager, *, tablets=2, extra_cmdline=None)
cmdline = ['--logger-log-level', 'repair=debug']
if extra_cmdline:
cmdline += extra_cmdline
# These tests enable hinted handoff and materialized views, which make debug-mode
# concurrent bootstrap occasionally exceed the topology timeout before the test starts.
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(
manager, nr_keys=0, cmdline=cmdline, tablets=tablets,
disable_flush_cache_time=True)
disable_flush_cache_time=True, sequential_server_add=True)
# Lower propagation_delay to 0 so gc_before = repair_time, making tombstones
# GC-eligible immediately after a successful repair rather than 1h later.
await cql.run_async(

View File

@@ -106,82 +106,6 @@ async def test_raft_voters_multidc_kill_dc(
await read_barrier(manager.api, dc_servers[1][0].ip_addr)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_raft_limited_voters_upgrade(manager: ManagerClient):
"""
Test that the limited voters feature works correctly during the upgrade.
The scenario being tested here is that we first have a cluster that doesn't
have the limited voters feature enabled, and then we enable it during the
upgrade.
We first upgrade all nodes except the coordinator, and then the coordinator
itself. When the feature is not available on the coordinator, and would be
enabled on the other nodes, the other nodes wouldn't become voters until the
coordinator is upgraded as well. But that couldn't be done because the
coordinator would be the last voter, thus the majority would be lost.
Therefore we use the feature flags and only enable the feature after all
nodes are upgraded (all nodes report that they support the feature).
Arrange:
- create a 7-node cluster with the limited voters feature disabled
(more nodes than the voters limit of 5)
Act:
- upgrade all nodes in the random order (the topology coordinator at random place)
Assert:
- the feature has been enabled and the cluster works correctly
(i.e. has enough voters to e.g. add a new server)
"""
# Arrange: Create a 3-node cluster with the limited voters feature disabled
cfg = {
"error_injections_at_startup": [
{
"name": "suppress_features",
"value": "GROUP0_LIMITED_VOTERS"
},
]
}
servers = await manager.servers_add(7, config=cfg)
# Check that all servers are voters (the feature is disabled)
num_voters = await get_number_of_voters(manager, servers[0])
assert num_voters == len(
servers), f"The number of voters should be equal to the number of servers (but is {num_voters})"
# Shuffle the servers for random order of servers and topology coordinator
# (the topology coordinator being at the random place)
random.shuffle(servers)
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
marks = [await log.mark() for log in logs]
# Act: Perform a rolling restart with the feature enabled to upgrade all the servers
# (topology coordinator at a random place).
async def upgrade_server(srv):
await manager.server_update_config(srv.server_id, "error_injections_at_startup", [])
logging.info('Upgrading all nodes in this order: %s', [srv.server_id for srv in servers])
await manager.rolling_restart(servers, with_down=upgrade_server)
# Assert: Verify that the feature has been enabled and the majority has not been lost
# (we can add another server to the cluster)
logging.info('Waiting for the GROUP0_LIMITED_VOTERS feature to be enabled')
await asyncio.gather(*(log.wait_for("Feature GROUP0_LIMITED_VOTERS is enabled", from_mark=mark, timeout=60)
for log, mark in zip(logs, marks)))
logging.info('Adding a new server to the cluster')
await manager.server_add()
num_voters = await get_number_of_voters(manager, servers[0])
assert num_voters == GROUP0_VOTERS_LIMIT, f"The number of voters should be limited to {GROUP0_VOTERS_LIMIT} (but is {num_voters})"
@pytest.mark.asyncio
async def test_raft_limited_voters_retain_coordinator(manager: ManagerClient):
"""

View File

@@ -254,6 +254,52 @@ simple as `dnf install java-11`. On Fedora 42, to install java-11 alongside
the system's default Java you need to ask to install it from Fedora 41:
`dnf install --releasever=41 java-11-openjdk-headless.x86_64`
Alternatively, if you cannot or do not want to install an older Java on your
system, run-cassandra can run Cassandra (and Java) entirely from Docker:
**`--docker[=CASSANDRA_VERSION]`** (recommended): uses the official
`cassandra` Docker image, which bundles both Cassandra and the right Java
version. No local Cassandra installation is required at all. The per-test
configuration and data are written to a subdirectory of `$TMPDIR` (defaulting
to `/tmp`) that is bind-mounted into the container. This is also efficient:
the official `cassandra:4.1` and `cassandra:5` images share their Ubuntu and
Java base layers, so those are downloaded only once regardless of how many
Cassandra versions you test.
```
test/cqlpy/run-cassandra --docker # default is latest 5 release
test/cqlpy/run-cassandra --docker=4.1 # latest 4.1 patch release
test/cqlpy/run-cassandra --docker=4.1.11 # specific patch release
test/cqlpy/run-cassandra --docker=3.11
```
A small number of tests invoke `nodetool` (set via the `NODETOOL` environment
variable). When using `--docker` there is no local `nodetool` binary, so you
need to supply one - we haven't yet implemented the ability to use the one in
Docker. For now, you'll need to point `NODETOOL` at the `nodetool` script from
a downloaded Cassandra tarball (see _Precompiled Cassandra_ below) - you do
not need to run that Cassandra, you only need the script:
```
export NODETOOL=/tmp/apache-cassandra-5.0.3/bin/nodetool
test/cqlpy/run-cassandra --docker test_file.py
```
**`--java-docker[=JAVA_VERSION]`**: a lighter alternative for when you have a
local Cassandra installation but lack a suitable Java version on the host.
The local Cassandra installation is bind-mounted into a Docker container that
provides the requested Java version, and both the Cassandra startup script and
Java run inside that container:
```
export CASSANDRA=/tmp/apache-cassandra-4.1.4/bin/cassandra
test/cqlpy/run-cassandra --java-docker # defaults to Java 11
test/cqlpy/run-cassandra --java-docker=11
```
In both cases Docker must be installed and running. On the first run, Docker
pulls the required image automatically and caches it for subsequent runs.
## Precompiled Cassandra
The easiest way to get Cassandra is to get a pre-compiled tar.
Go to [Cassandra's download page](https://cassandra.apache.org/_/download.html)

View File

@@ -9,19 +9,57 @@
import sys
import os
import argparse
import shutil
import subprocess
import re
import run # run.py in this directory
# Parse options specific to run-cassandra from sys.argv and build a filtered
# argument list to pass to pytest (which doesn't know about these options).
# --docker[=CASSANDRA_VERSION]: run both Cassandra and Java from
# Docker using the official 'cassandra' image (e.g., cassandra:4.1 or
# cassandra:5). No local Cassandra installation is needed. The per-test
# configuration/data directory (a subdirectory of $TMPDIR) is bind-mounted
# so Cassandra inside the container reads the config we write and stores
# data on the host. CASSANDRA_VERSION defaults to 5.
# --java-docker[=JAVA_VERSION]: run a locally-installed Cassandra using Java
# from Docker. The local Cassandra installation is bind-mounted into a
# Docker container that provides the requested Java version, and the
# Cassandra startup script runs inside that container. JAVA_VERSION
# defaults to 11.
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--docker', default=None)
parser.add_argument('--java-docker', default=None)
# Expand bare --docker / --java-docker (without =) to their defaults. We can't
# do this with argparse directly, because it would consume the next positional
# argument as a value, and cause "run-cassandra --docker test_file.py" to think
# that "test_file.py" is the Docker version.
argv = ['--docker=5' if a == '--docker' else
'--java-docker=11' if a == '--java-docker' else a
for a in sys.argv[1:]]
args, pytest_argv = parser.parse_known_args(argv)
if args.java_docker and args.docker:
print("Error: --java-docker and --docker cannot be used together. "
"Use --docker to run both Cassandra and Java from Docker, "
"or --java-docker to run only Java from Docker with a local Cassandra.")
exit(1)
def find_cassandra():
# When running Cassandra via Docker the image already contains the
# cassandra binary, so no local installation is needed.
if args.docker:
return None
# By default, we assume 'cassandra' is in the user's path. A specific
# cassandra script can be chosen by setting the CASSANDRA variable.
cassandra = os.getenv('CASSANDRA', 'cassandra')
cassandra_path = shutil.which(cassandra)
if cassandra_path is None:
print("Error: Can't find {}. Please set the CASSANDRA environment variable to the path of the Cassandra startup script.".format(cassandra))
print("Error: Can't find {}. Please set the CASSANDRA environment "
"variable to the path of the Cassandra startup script, or use "
"'--docker' to automatically get Cassandra and Java from Docker.".format(cassandra))
exit(1)
return cassandra_path
@@ -47,6 +85,9 @@ def java_major_version(java):
return major
def find_java():
# If the user requested Docker-based Java (either option), skip the local search entirely.
if args.java_docker or args.docker:
return None
# Look for the Java in one of several places known to host the Java
# executable, and return the first one that works and has the appropriate
# version. The first attempt is just "java" in the path, which is
@@ -117,10 +158,12 @@ def run_cassandra_cmd(pid, dir):
}
# By default, Cassandra's startup script runs "java". We can override this
# choice with the JAVA_HOME environment variable based on the Java we
# found earlier in find_java().
if java and java.startswith('/'):
env['JAVA_HOME'] = os.path.dirname(os.path.dirname(java))
print('JAVA_HOME: ' + env['JAVA_HOME'])
# found earlier in find_java(). In docker modes, java comes from the
# container image, so no JAVA_HOME override is needed.
if not args.docker and not args.java_docker:
if java and java.startswith('/'):
env['JAVA_HOME'] = os.path.dirname(os.path.dirname(java))
print('JAVA_HOME: ' + env['JAVA_HOME'])
# On JVM 11, Cassandra requires a bunch of configuration options in
# conf/jvm11-server.options, or it fails loading classes because of JPMS.
# The following options were copied from Cassandra's jvm11-server.options.
@@ -177,10 +220,125 @@ def run_cassandra_cmd(pid, dir):
'--add-opens java.base/java.net=ALL-UNNAMED\n'
'--add-opens java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED\n'
,file=f)
# Cassandra 4.x's cassandra.in.sh greps two separate files: a base
# "jvm-server.options" (common to all JVM versions) and then a
# version-specific "jvm11-server.options" or "jvm17-server.options".
# We put all our options into the version-specific files above, so the
# base file can be empty. Cassandra 3.x uses "jvm.options" instead.
# All must exist or the startup script prints grep warnings.
open(os.path.join(confdir, 'jvm-server.options'), 'w').close()
open(os.path.join(confdir, 'jvm.options'), 'w').close()
# Current versions of Cassandra 5 refuse to run on Java 21
# without this environment variable.
env['CASSANDRA_JDK_UNSUPPORTED'] = 'true'
return ([cassandra, '-f'], env)
if args.docker:
return run_cassandra_cmd_docker(ip, dir, env)
elif args.java_docker:
return run_cassandra_cmd_java_docker(ip, dir, env)
else:
return ([cassandra, '-f'], env)
# Generate command line and environment variables to run Cassandra from the
# official Docker image, which bundles both Cassandra and the appropriate Java
# version. No local Cassandra installation is needed. The per-test temporary
# directory 'dir' (a subdirectory of $TMPDIR) is bind-mounted so the config we
# wrote above is visible inside the container at the same paths.
def run_cassandra_cmd_docker(ip, dir, env):
docker = shutil.which('docker')
if not docker:
print('Error: docker executable not found in PATH.')
exit(1)
del env['CASSANDRA_INCLUDE']
del env['CASSANDRA_HOME']
# The official cassandra Docker image's entrypoint overrides
# listen_address in cassandra.yaml with the container's detected IP
# (via "hostname -i"), which with --network host resolves to the
# host's primary interface rather than the loopback alias we assigned.
# Pass CASSANDRA_LISTEN_ADDRESS so the entrypoint uses the right IP.
env['CASSANDRA_LISTEN_ADDRESS'] = ip
image = f'cassandra:{args.docker}'
print(f'Running Cassandra {args.docker} from Docker (image: {image})')
# We could just run docker directly and the image will be downloaded
# automatically, but we want a nice "Pulling Docker image..." message
# while waiting, and also to print the Cassandra and Java versions
# after the wait, so we do something more elaborate here:
if subprocess.call([docker, 'image', 'inspect', image],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) != 0:
print(f'Pulling Docker image {image}...')
try:
subprocess.check_call([docker, 'pull', '-q', image],
stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError:
print(f'Error: Failed to pull Docker image {image} for Cassandra {args.docker!r}. '
f'Try versions like 5, 5.1 or 5.1.1.')
exit(1)
image_env = subprocess.check_output(
[docker, 'image', 'inspect', '--format',
'{{range .Config.Env}}{{println .}}{{end}}',
image], stderr=subprocess.DEVNULL).decode('UTF-8')
cassandra_version = next((line.split('=', 1)[1] for line in image_env.splitlines()
if line.startswith('CASSANDRA_VERSION=')), '(unknown)')
jv_out = subprocess.check_output(
[docker, 'run', '--rm', image, 'java', '-version'],
stderr=subprocess.STDOUT).decode('UTF-8')
m = re.search(r'"([^"]+)"', jv_out)
java_version = m.group(1) if m else '(unknown)'
print(f'Docker Cassandra version: {cassandra_version}, Java: {java_version}')
# The official cassandra image's default CMD is 'cassandra -f', which
# is exactly what we want.
docker_cmd = [docker, 'run', '--rm', '--network', 'host',
'--user', f'{os.getuid()}:{os.getgid()}',
'--security-opt', 'label=disable',
'-v', f'{dir}:{dir}']
for k, v in env.items():
docker_cmd += ['-e', f'{k}={v}']
docker_cmd += [image]
return (docker_cmd, {})
# Generate command line and environment variables to run the user's local
# Cassandra inside Docker containing the requested Java version.
# The local Cassandra installation is bind-mounted at its own absolute path so
# that the startup script finds its JARs and config at the same paths it
# would expect on the host. We use realpath() so the mount point inside the
# container is unambiguous (no symlinks).
def run_cassandra_cmd_java_docker(ip, dir, env):
cassandra_real = os.path.realpath(cassandra)
cassandra_home = os.path.dirname(os.path.dirname(cassandra_real))
image = f'eclipse-temurin:{args.java_docker}-jre'
print(f'Running Cassandra with Docker Java {args.java_docker} (image: {image})')
docker = shutil.which('docker')
if not docker:
print('Error: docker executable not found in PATH.')
exit(1)
# We could just run docker directly and the image will be downloaded
# automatically, but we want a nice "Pulling Docker image..." message
# while waiting, and also to print the Java version after the wait,
# so we do something more elaborate here:
if subprocess.call([docker, 'image', 'inspect', image],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) != 0:
print(f'Pulling Docker image {image}...')
try:
subprocess.check_call([docker, 'pull', '-q', image])
except subprocess.CalledProcessError:
print(f'Error: Failed to pull Docker image {image} for Java {args.java_docker!r}. '
f'Try versions like 11, or 17.')
exit(1)
java_version = subprocess.check_output(
[docker, 'image', 'inspect', '--format',
'{{range .Config.Env}}{{println .}}{{end}}',
image], stderr=subprocess.DEVNULL).decode('UTF-8')
java_version = next((line.split('=',1)[1] for line in java_version.splitlines()
if line.startswith('JAVA_VERSION=')), '(unknown)')
print(f'Docker Java version: {java_version}')
docker_cmd = [docker, 'run', '--rm', '--network', 'host',
'--user', f'{os.getuid()}:{os.getgid()}',
'--security-opt', 'label=disable',
'-v', f'{dir}:{dir}',
'-v', f'{cassandra_home}:{cassandra_home}:ro']
for k, v in env.items():
docker_cmd += ['-e', f'{k}={v}']
docker_cmd += [image, cassandra_real, '-f']
return (docker_cmd, {})
# Same as run_cassandra_cmd, just use SSL encryption for the CQL port (same
# port number as default - replacing the unencrypted server).
@@ -201,9 +359,10 @@ def run_cassandra_ssl_cmd(pid, dir):
# The command and environment variables to run Cassandra are the same,
return (cmd, env)
print('Cassandra is: ' + cassandra + '.')
if cassandra:
print(f'Cassandra is: {cassandra}.')
if '--ssl' in sys.argv:
if '--ssl' in pytest_argv:
cmd = run_cassandra_ssl_cmd
check_cql = run.check_ssl_cql
else:
@@ -214,7 +373,7 @@ pid = run.run_with_temporary_dir(cmd)
ip = run.pid_to_ip(pid)
run.wait_for_services(pid, [lambda: check_cql(ip)])
success = run.run_pytest(sys.path[0], ['-o', 'xfail_strict=false', '--host', ip] + sys.argv[1:])
success = run.run_pytest(sys.path[0], ['-o', 'xfail_strict=false', '--host', ip] + pytest_argv)
run.summary = 'Cassandra tests pass' if success else 'Cassandra tests failure'

View File

@@ -64,7 +64,9 @@ async def load_tablet_repair_task_infos(cql, host, table_id):
return repair_task_infos
async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256, disable_flush_cache_time = False, cmdline = None) -> (list[ServerInfo], CassandraSession, list[Host], str, str):
async def create_table_insert_data_for_repair(manager, rf=3, tablets=8, fast_stats_refresh=True, nr_keys=256,
disable_flush_cache_time=False, cmdline=None,
sequential_server_add=False) -> tuple[list[ServerInfo], CassandraSession, list[Host], str, str]:
assert rf <= 3, "A keyspace with RF > 3 will be RF-rack-invalid if there are fewer racks than the RF"
if fast_stats_refresh:
@@ -73,8 +75,13 @@ async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fas
config = {}
if disable_flush_cache_time:
config.update({'repair_hints_batchlog_flush_cache_time_in_ms': 0})
servers = await manager.servers_add(3, config=config, cmdline=cmdline,
property_file=[{"dc": "dc1", "rack": f"r{i % rf}"} for i in range(rf)])
property_files = [{"dc": "dc1", "rack": f"r{i % rf}"} for i in range(3)]
if sequential_server_add:
servers = []
for property_file in property_files:
servers.append(await manager.server_add(config=config, cmdline=cmdline, property_file=property_file))
else:
servers = await manager.servers_add(len(property_files), config=config, cmdline=cmdline, property_file=property_files)
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {{'class': 'NetworkTopologyStrategy', "
"'replication_factor': {}}} AND tablets = {{'initial': {}}};".format(rf, tablets))