mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-13 19:32:02 +00:00
Compare commits
9 Commits
SCYLLADB-1
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
66439bb753 | ||
|
|
0fcae72530 | ||
|
|
51c35c05e2 | ||
|
|
3f2ff5a13f | ||
|
|
6856f51097 | ||
|
|
aaead10e5d | ||
|
|
c3d2f0bde9 | ||
|
|
5213aee99f | ||
|
|
5f7f72fa50 |
@@ -724,44 +724,6 @@ future<> raft_group0::setup_group0(
|
||||
co_await sys_ks.save_group0_upgrade_state("use_post_raft_procedures");
|
||||
}
|
||||
|
||||
future<> raft_group0::finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm) {
|
||||
if (joined_group0()) {
|
||||
group0_log.info("finish_setup_after_join: group 0 ID present, loading server info.");
|
||||
auto my_id = load_my_id();
|
||||
if (!_raft_gr.group0().get_configuration().can_vote(my_id)) {
|
||||
if (!_feat.group0_limited_voters) {
|
||||
// limited voters feature not enabled yet
|
||||
// - need to become a voter in here
|
||||
group0_log.info("finish_setup_after_join: becoming a voter in the group 0 configuration...");
|
||||
// Just bootstrapped and joined as non-voter. Become a voter.
|
||||
auto pause_shutdown = _shutdown_gate.hold();
|
||||
raft::server_address my_addr{my_id, {}};
|
||||
co_await run_op_with_retry(_abort_source, [this, my_addr]() -> future<operation_result> {
|
||||
try {
|
||||
co_await _raft_gr.group0().modify_config({{my_addr, raft::is_voter::yes}}, {}, &_abort_source);
|
||||
} catch (const raft::commit_status_unknown& e) {
|
||||
group0_log.info("finish_setup_after_join({}): modify_config returned \"{}\", retrying", my_addr, e);
|
||||
co_return operation_result::failure;
|
||||
}
|
||||
co_return operation_result::success;
|
||||
}, "finish_setup_after_join->modify_config", {});
|
||||
group0_log.info("finish_setup_after_join: became a group 0 voter.");
|
||||
}
|
||||
|
||||
// No need to run `upgrade_to_group0()` since we must have bootstrapped with Raft
|
||||
// (that's the only way to join as non-voter today).
|
||||
co_return;
|
||||
}
|
||||
} else {
|
||||
// We're either upgrading or in recovery mode.
|
||||
}
|
||||
|
||||
if (!_feat.supports_raft_cluster_mgmt) {
|
||||
throw std::runtime_error("finish_setup_after_join: SUPPORTS_RAFT feature not yet enabled, but was expected to be enabled at this point."
|
||||
" If you are trying to upgrade a node pf a cluster that is not using Raft yet, this is no longer supported.");
|
||||
}
|
||||
}
|
||||
|
||||
bool raft_group0::is_member(raft::server_id id, bool include_voters_only) {
|
||||
if (!joined_group0()) {
|
||||
on_internal_error(group0_log, "called is_member before we joined group 0");
|
||||
|
||||
@@ -198,13 +198,6 @@ public:
|
||||
//
|
||||
future<> setup_group0_if_exist(db::system_keyspace&, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
|
||||
|
||||
// Call at the end of the startup procedure, after the node entered NORMAL state.
|
||||
// `setup_group0()` must have finished earlier.
|
||||
//
|
||||
// If the node has just bootstrapped, causes the group 0 server to become a voter.
|
||||
//
|
||||
future<> finish_setup_after_join(service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm);
|
||||
|
||||
// Check whether the given Raft server is a member of group 0 configuration
|
||||
// according to our current knowledge.
|
||||
//
|
||||
|
||||
@@ -1727,8 +1727,6 @@ future<> storage_service::join_topology(sharded<service::storage_proxy>& proxy,
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
|
||||
co_await _group0->finish_setup_after_join(*this, _qp, _migration_manager.local());
|
||||
|
||||
// Initializes monitor only after updating local topology.
|
||||
start_tablet_split_monitor();
|
||||
|
||||
|
||||
@@ -3214,7 +3214,16 @@ public:
|
||||
// Convergence check
|
||||
|
||||
// When in shuffle mode, exit condition is guaranteed by running out of candidates or by load limit.
|
||||
if (!shuffle && src == dst) {
|
||||
auto node_is_balanced = [&] {
|
||||
auto min_load = node_load.shard_load(dst);
|
||||
auto max_load = node_load.shard_load(src);
|
||||
// We can't compute accurate load without disk capacity, so stop balancing this node
|
||||
if (!min_load || !max_load) {
|
||||
return true;
|
||||
}
|
||||
return is_balanced(*min_load, *max_load);
|
||||
};
|
||||
if (!shuffle && (src == dst || node_is_balanced())) {
|
||||
lblogger.debug("Node {} is balanced", host);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -5913,6 +5913,125 @@ SEASTAR_THREAD_TEST_CASE(test_drain_node_without_capacity) {
|
||||
}).get();
|
||||
}
|
||||
|
||||
// Verifies that the intranode shard balancer respects the size-based balance threshold
|
||||
// and does not issue migrations when the load difference between shards is within the threshold.
|
||||
// Without the threshold check, the balancer would keep issuing migrations of small tablets
|
||||
// from the slightly-heavier shard until load difference reaches exactly 0, or it can't find
|
||||
// a tablet to migrate because all migrations would go against convergence.
|
||||
SEASTAR_THREAD_TEST_CASE(test_intranode_balance_threshold) {
|
||||
auto cfg = tablet_cql_test_config();
|
||||
// Set the balance threshold explicitly
|
||||
cfg.db_config->size_based_balance_threshold_percentage.set(1.0);
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
logging::logger_registry().set_logger_level("load_balancer", logging::log_level::debug);
|
||||
|
||||
topology_builder topo(e);
|
||||
|
||||
// Single node with 2 shards, RF=1. This isolates intranode balancing.
|
||||
const unsigned shard_count = 2;
|
||||
auto host1 = topo.add_node(node_state::normal, shard_count);
|
||||
|
||||
// Set up capacity for size-based balancing mode.
|
||||
// Each shard has 100GB capacity, so the node has 200GB total.
|
||||
const uint64_t shard_capacity = 100UL * 1024UL * 1024UL * 1024UL;
|
||||
const uint64_t node_capacity = shard_capacity * shard_count;
|
||||
topo.get_shared_load_stats().set_capacity(host1, node_capacity);
|
||||
|
||||
// Create a table with 512 tablets.
|
||||
// Place 257 on shard 0 and 255 on shard 1, all with the same size.
|
||||
// This gives:
|
||||
// Relative load diff = (257 - 255) / 257 = 0.78% < 1% threshold
|
||||
// But each individual tablet passes the per-tablet convergence check:
|
||||
// 257*S > 255*S + S => 257 > 256 => true
|
||||
// So without the threshold fix, the balancer issues a migration.
|
||||
// With the fix, it recognizes the node is balanced and stops.
|
||||
const size_t tablet_count = 512;
|
||||
size_t tablets_on_shard0 = 257;
|
||||
size_t tablets_on_shard1 = 255;
|
||||
const uint64_t tablet_size = 100UL * 1024UL * 1024UL; // 100MB each
|
||||
|
||||
auto ks_name = add_keyspace(e, {{topo.dc(), 1}}, tablet_count);
|
||||
auto table1 = add_table(e, ks_name).get();
|
||||
|
||||
auto& stm = e.shared_token_metadata().local();
|
||||
auto& load_stats = topo.get_shared_load_stats();
|
||||
|
||||
auto mutate_tmap = [&] (tablet_metadata& tmeta) -> future<> {
|
||||
tablet_map tmap(tablet_count);
|
||||
auto tid = tmap.first_tablet();
|
||||
for (size_t i = 0; i < tablets_on_shard0; ++i) {
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set { tablet_replica {host1, 0} }
|
||||
});
|
||||
if (i < tablets_on_shard0 - 1) {
|
||||
tid = *tmap.next_tablet(tid);
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < tablets_on_shard1; ++i) {
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set { tablet_replica {host1, 1} }
|
||||
});
|
||||
}
|
||||
tmeta.set_tablet_map(table1, std::move(tmap));
|
||||
co_return;
|
||||
};
|
||||
|
||||
mutate_tablets(e, mutate_tmap);
|
||||
|
||||
// Set uniform per-tablet sizes
|
||||
auto& tmap = stm.get()->tablets().get_tablet_map(table1);
|
||||
tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
|
||||
locator::range_based_tablet_id rb_tid {table1, tmap.get_token_range(tid)};
|
||||
load_stats.set_tablet_size(host1, rb_tid, tablet_size);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
load_stats.set_size(table1, tablet_count * tablet_size);
|
||||
|
||||
// Call the balancer once and verify NO intranode migrations in the plan.
|
||||
auto& talloc = e.get_tablet_allocator().local();
|
||||
auto& topology = e.get_topology_state_machine().local()._topology;
|
||||
auto& sys_ks = e.get_system_keyspace().local();
|
||||
|
||||
{
|
||||
auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, load_stats.get()).get();
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
BOOST_REQUIRE_MESSAGE(mig.kind != tablet_transition_kind::intranode_migration,
|
||||
"Unexpected intranode migration when load difference is within threshold");
|
||||
}
|
||||
}
|
||||
|
||||
// Now create a clearly unbalanced scenario: 307 vs 205 tablets.
|
||||
// Relative diff = (307 - 205) / 307 = 33% >> 1% threshold
|
||||
tablets_on_shard0 = 307;
|
||||
tablets_on_shard1 = 205;
|
||||
|
||||
mutate_tablets(e, mutate_tmap);
|
||||
|
||||
// Update tablet sizes for new layout
|
||||
auto& tmap2 = stm.get()->tablets().get_tablet_map(table1);
|
||||
tmap2.for_each_tablet([&] (tablet_id tid, const tablet_info& tinfo) {
|
||||
locator::range_based_tablet_id rb_tid {table1, tmap2.get_token_range(tid)};
|
||||
load_stats.set_tablet_size(host1, rb_tid, tablet_size);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
// Call balancer once - this time intranode migrations SHOULD be emitted.
|
||||
{
|
||||
auto plan = talloc.balance_tablets(stm.get(), &topology, &sys_ks, load_stats.get()).get();
|
||||
bool saw_intranode_migration = false;
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
if (mig.kind == tablet_transition_kind::intranode_migration) {
|
||||
saw_intranode_migration = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE_MESSAGE(saw_intranode_migration,
|
||||
"Expected intranode migration when load difference exceeds threshold");
|
||||
}
|
||||
}, cfg).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_tablet_range_splitter) {
|
||||
simple_schema ss;
|
||||
|
||||
|
||||
@@ -1200,9 +1200,11 @@ async def _setup_tombstone_gc_cluster(manager, *, tablets=2, extra_cmdline=None)
|
||||
cmdline = ['--logger-log-level', 'repair=debug']
|
||||
if extra_cmdline:
|
||||
cmdline += extra_cmdline
|
||||
# These tests enable hinted handoff and materialized views, which make debug-mode
|
||||
# concurrent bootstrap occasionally exceed the topology timeout before the test starts.
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(
|
||||
manager, nr_keys=0, cmdline=cmdline, tablets=tablets,
|
||||
disable_flush_cache_time=True)
|
||||
disable_flush_cache_time=True, sequential_server_add=True)
|
||||
# Lower propagation_delay to 0 so gc_before = repair_time, making tombstones
|
||||
# GC-eligible immediately after a successful repair rather than 1h later.
|
||||
await cql.run_async(
|
||||
|
||||
@@ -106,82 +106,6 @@ async def test_raft_voters_multidc_kill_dc(
|
||||
await read_barrier(manager.api, dc_servers[1][0].ip_addr)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_raft_limited_voters_upgrade(manager: ManagerClient):
|
||||
"""
|
||||
Test that the limited voters feature works correctly during the upgrade.
|
||||
|
||||
The scenario being tested here is that we first have a cluster that doesn't
|
||||
have the limited voters feature enabled, and then we enable it during the
|
||||
upgrade.
|
||||
|
||||
We first upgrade all nodes except the coordinator, and then the coordinator
|
||||
itself. When the feature is not available on the coordinator, and would be
|
||||
enabled on the other nodes, the other nodes wouldn't become voters until the
|
||||
coordinator is upgraded as well. But that couldn't be done because the
|
||||
coordinator would be the last voter, thus the majority would be lost.
|
||||
|
||||
Therefore we use the feature flags and only enable the feature after all
|
||||
nodes are upgraded (all nodes report that they support the feature).
|
||||
|
||||
Arrange:
|
||||
- create a 7-node cluster with the limited voters feature disabled
|
||||
(more nodes than the voters limit of 5)
|
||||
Act:
|
||||
- upgrade all nodes in the random order (the topology coordinator at random place)
|
||||
Assert:
|
||||
- the feature has been enabled and the cluster works correctly
|
||||
(i.e. has enough voters to e.g. add a new server)
|
||||
"""
|
||||
|
||||
# Arrange: Create a 3-node cluster with the limited voters feature disabled
|
||||
|
||||
cfg = {
|
||||
"error_injections_at_startup": [
|
||||
{
|
||||
"name": "suppress_features",
|
||||
"value": "GROUP0_LIMITED_VOTERS"
|
||||
},
|
||||
]
|
||||
}
|
||||
servers = await manager.servers_add(7, config=cfg)
|
||||
|
||||
# Check that all servers are voters (the feature is disabled)
|
||||
num_voters = await get_number_of_voters(manager, servers[0])
|
||||
assert num_voters == len(
|
||||
servers), f"The number of voters should be equal to the number of servers (but is {num_voters})"
|
||||
|
||||
# Shuffle the servers for random order of servers and topology coordinator
|
||||
# (the topology coordinator being at the random place)
|
||||
random.shuffle(servers)
|
||||
|
||||
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
||||
marks = [await log.mark() for log in logs]
|
||||
|
||||
# Act: Perform a rolling restart with the feature enabled to upgrade all the servers
|
||||
# (topology coordinator at a random place).
|
||||
|
||||
async def upgrade_server(srv):
|
||||
await manager.server_update_config(srv.server_id, "error_injections_at_startup", [])
|
||||
|
||||
logging.info('Upgrading all nodes in this order: %s', [srv.server_id for srv in servers])
|
||||
await manager.rolling_restart(servers, with_down=upgrade_server)
|
||||
|
||||
# Assert: Verify that the feature has been enabled and the majority has not been lost
|
||||
# (we can add another server to the cluster)
|
||||
|
||||
logging.info('Waiting for the GROUP0_LIMITED_VOTERS feature to be enabled')
|
||||
await asyncio.gather(*(log.wait_for("Feature GROUP0_LIMITED_VOTERS is enabled", from_mark=mark, timeout=60)
|
||||
for log, mark in zip(logs, marks)))
|
||||
|
||||
logging.info('Adding a new server to the cluster')
|
||||
await manager.server_add()
|
||||
|
||||
num_voters = await get_number_of_voters(manager, servers[0])
|
||||
assert num_voters == GROUP0_VOTERS_LIMIT, f"The number of voters should be limited to {GROUP0_VOTERS_LIMIT} (but is {num_voters})"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_raft_limited_voters_retain_coordinator(manager: ManagerClient):
|
||||
"""
|
||||
|
||||
@@ -254,6 +254,52 @@ simple as `dnf install java-11`. On Fedora 42, to install java-11 alongside
|
||||
the system's default Java you need to ask to install it from Fedora 41:
|
||||
`dnf install --releasever=41 java-11-openjdk-headless.x86_64`
|
||||
|
||||
Alternatively, if you cannot or do not want to install an older Java on your
|
||||
system, run-cassandra can run Cassandra (and Java) entirely from Docker:
|
||||
|
||||
**`--docker[=CASSANDRA_VERSION]`** (recommended): uses the official
|
||||
`cassandra` Docker image, which bundles both Cassandra and the right Java
|
||||
version. No local Cassandra installation is required at all. The per-test
|
||||
configuration and data are written to a subdirectory of `$TMPDIR` (defaulting
|
||||
to `/tmp`) that is bind-mounted into the container. This is also efficient:
|
||||
the official `cassandra:4.1` and `cassandra:5` images share their Ubuntu and
|
||||
Java base layers, so those are downloaded only once regardless of how many
|
||||
Cassandra versions you test.
|
||||
|
||||
```
|
||||
test/cqlpy/run-cassandra --docker # default is latest 5 release
|
||||
test/cqlpy/run-cassandra --docker=4.1 # latest 4.1 patch release
|
||||
test/cqlpy/run-cassandra --docker=4.1.11 # specific patch release
|
||||
test/cqlpy/run-cassandra --docker=3.11
|
||||
```
|
||||
|
||||
A small number of tests invoke `nodetool` (set via the `NODETOOL` environment
|
||||
variable). When using `--docker` there is no local `nodetool` binary, so you
|
||||
need to supply one - we haven't yet implemented the ability to use the one in
|
||||
Docker. For now, you'll need to point `NODETOOL` at the `nodetool` script from
|
||||
a downloaded Cassandra tarball (see _Precompiled Cassandra_ below) - you do
|
||||
not need to run that Cassandra, you only need the script:
|
||||
|
||||
```
|
||||
export NODETOOL=/tmp/apache-cassandra-5.0.3/bin/nodetool
|
||||
test/cqlpy/run-cassandra --docker test_file.py
|
||||
```
|
||||
|
||||
**`--java-docker[=JAVA_VERSION]`**: a lighter alternative for when you have a
|
||||
local Cassandra installation but lack a suitable Java version on the host.
|
||||
The local Cassandra installation is bind-mounted into a Docker container that
|
||||
provides the requested Java version, and both the Cassandra startup script and
|
||||
Java run inside that container:
|
||||
|
||||
```
|
||||
export CASSANDRA=/tmp/apache-cassandra-4.1.4/bin/cassandra
|
||||
test/cqlpy/run-cassandra --java-docker # defaults to Java 11
|
||||
test/cqlpy/run-cassandra --java-docker=11
|
||||
```
|
||||
|
||||
In both cases Docker must be installed and running. On the first run, Docker
|
||||
pulls the required image automatically and caches it for subsequent runs.
|
||||
|
||||
## Precompiled Cassandra
|
||||
The easiest way to get Cassandra is to get a pre-compiled tar.
|
||||
Go to [Cassandra's download page](https://cassandra.apache.org/_/download.html)
|
||||
|
||||
@@ -9,19 +9,57 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
import argparse
|
||||
import shutil
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
import run # run.py in this directory
|
||||
|
||||
# Parse options specific to run-cassandra from sys.argv and build a filtered
|
||||
# argument list to pass to pytest (which doesn't know about these options).
|
||||
# --docker[=CASSANDRA_VERSION]: run both Cassandra and Java from
|
||||
# Docker using the official 'cassandra' image (e.g., cassandra:4.1 or
|
||||
# cassandra:5). No local Cassandra installation is needed. The per-test
|
||||
# configuration/data directory (a subdirectory of $TMPDIR) is bind-mounted
|
||||
# so Cassandra inside the container reads the config we write and stores
|
||||
# data on the host. CASSANDRA_VERSION defaults to 5.
|
||||
# --java-docker[=JAVA_VERSION]: run a locally-installed Cassandra using Java
|
||||
# from Docker. The local Cassandra installation is bind-mounted into a
|
||||
# Docker container that provides the requested Java version, and the
|
||||
# Cassandra startup script runs inside that container. JAVA_VERSION
|
||||
# defaults to 11.
|
||||
parser = argparse.ArgumentParser(add_help=False)
|
||||
parser.add_argument('--docker', default=None)
|
||||
parser.add_argument('--java-docker', default=None)
|
||||
# Expand bare --docker / --java-docker (without =) to their defaults. We can't
|
||||
# do this with argparse directly, because it would consume the next positional
|
||||
# argument as a value, and cause "run-cassandra --docker test_file.py" to think
|
||||
# that "test_file.py" is the Docker version.
|
||||
argv = ['--docker=5' if a == '--docker' else
|
||||
'--java-docker=11' if a == '--java-docker' else a
|
||||
for a in sys.argv[1:]]
|
||||
args, pytest_argv = parser.parse_known_args(argv)
|
||||
|
||||
if args.java_docker and args.docker:
|
||||
print("Error: --java-docker and --docker cannot be used together. "
|
||||
"Use --docker to run both Cassandra and Java from Docker, "
|
||||
"or --java-docker to run only Java from Docker with a local Cassandra.")
|
||||
exit(1)
|
||||
|
||||
def find_cassandra():
|
||||
# When running Cassandra via Docker the image already contains the
|
||||
# cassandra binary, so no local installation is needed.
|
||||
if args.docker:
|
||||
return None
|
||||
# By default, we assume 'cassandra' is in the user's path. A specific
|
||||
# cassandra script can be chosen by setting the CASSANDRA variable.
|
||||
cassandra = os.getenv('CASSANDRA', 'cassandra')
|
||||
cassandra_path = shutil.which(cassandra)
|
||||
if cassandra_path is None:
|
||||
print("Error: Can't find {}. Please set the CASSANDRA environment variable to the path of the Cassandra startup script.".format(cassandra))
|
||||
print("Error: Can't find {}. Please set the CASSANDRA environment "
|
||||
"variable to the path of the Cassandra startup script, or use "
|
||||
"'--docker' to automatically get Cassandra and Java from Docker.".format(cassandra))
|
||||
exit(1)
|
||||
return cassandra_path
|
||||
|
||||
@@ -47,6 +85,9 @@ def java_major_version(java):
|
||||
return major
|
||||
|
||||
def find_java():
|
||||
# If the user requested Docker-based Java (either option), skip the local search entirely.
|
||||
if args.java_docker or args.docker:
|
||||
return None
|
||||
# Look for the Java in one of several places known to host the Java
|
||||
# executable, and return the first one that works and has the appropriate
|
||||
# version. The first attempt is just "java" in the path, which is
|
||||
@@ -117,10 +158,12 @@ def run_cassandra_cmd(pid, dir):
|
||||
}
|
||||
# By default, Cassandra's startup script runs "java". We can override this
|
||||
# choice with the JAVA_HOME environment variable based on the Java we
|
||||
# found earlier in find_java().
|
||||
if java and java.startswith('/'):
|
||||
env['JAVA_HOME'] = os.path.dirname(os.path.dirname(java))
|
||||
print('JAVA_HOME: ' + env['JAVA_HOME'])
|
||||
# found earlier in find_java(). In docker modes, java comes from the
|
||||
# container image, so no JAVA_HOME override is needed.
|
||||
if not args.docker and not args.java_docker:
|
||||
if java and java.startswith('/'):
|
||||
env['JAVA_HOME'] = os.path.dirname(os.path.dirname(java))
|
||||
print('JAVA_HOME: ' + env['JAVA_HOME'])
|
||||
# On JVM 11, Cassandra requires a bunch of configuration options in
|
||||
# conf/jvm11-server.options, or it fails loading classes because of JPMS.
|
||||
# The following options were copied from Cassandra's jvm11-server.options.
|
||||
@@ -177,10 +220,125 @@ def run_cassandra_cmd(pid, dir):
|
||||
'--add-opens java.base/java.net=ALL-UNNAMED\n'
|
||||
'--add-opens java.rmi/sun.rmi.transport.tcp=ALL-UNNAMED\n'
|
||||
,file=f)
|
||||
# Cassandra 4.x's cassandra.in.sh greps two separate files: a base
|
||||
# "jvm-server.options" (common to all JVM versions) and then a
|
||||
# version-specific "jvm11-server.options" or "jvm17-server.options".
|
||||
# We put all our options into the version-specific files above, so the
|
||||
# base file can be empty. Cassandra 3.x uses "jvm.options" instead.
|
||||
# All must exist or the startup script prints grep warnings.
|
||||
open(os.path.join(confdir, 'jvm-server.options'), 'w').close()
|
||||
open(os.path.join(confdir, 'jvm.options'), 'w').close()
|
||||
# Current versions of Cassandra 5 refuse to run on Java 21
|
||||
# without this environment variable.
|
||||
env['CASSANDRA_JDK_UNSUPPORTED'] = 'true'
|
||||
return ([cassandra, '-f'], env)
|
||||
if args.docker:
|
||||
return run_cassandra_cmd_docker(ip, dir, env)
|
||||
elif args.java_docker:
|
||||
return run_cassandra_cmd_java_docker(ip, dir, env)
|
||||
else:
|
||||
return ([cassandra, '-f'], env)
|
||||
|
||||
# Generate command line and environment variables to run Cassandra from the
|
||||
# official Docker image, which bundles both Cassandra and the appropriate Java
|
||||
# version. No local Cassandra installation is needed. The per-test temporary
|
||||
# directory 'dir' (a subdirectory of $TMPDIR) is bind-mounted so the config we
|
||||
# wrote above is visible inside the container at the same paths.
|
||||
def run_cassandra_cmd_docker(ip, dir, env):
|
||||
docker = shutil.which('docker')
|
||||
if not docker:
|
||||
print('Error: docker executable not found in PATH.')
|
||||
exit(1)
|
||||
del env['CASSANDRA_INCLUDE']
|
||||
del env['CASSANDRA_HOME']
|
||||
# The official cassandra Docker image's entrypoint overrides
|
||||
# listen_address in cassandra.yaml with the container's detected IP
|
||||
# (via "hostname -i"), which with --network host resolves to the
|
||||
# host's primary interface rather than the loopback alias we assigned.
|
||||
# Pass CASSANDRA_LISTEN_ADDRESS so the entrypoint uses the right IP.
|
||||
env['CASSANDRA_LISTEN_ADDRESS'] = ip
|
||||
image = f'cassandra:{args.docker}'
|
||||
print(f'Running Cassandra {args.docker} from Docker (image: {image})')
|
||||
# We could just run docker directly and the image will be downloaded
|
||||
# automatically, but we want a nice "Pulling Docker image..." message
|
||||
# while waiting, and also to print the Cassandra and Java versions
|
||||
# after the wait, so we do something more elaborate here:
|
||||
if subprocess.call([docker, 'image', 'inspect', image],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) != 0:
|
||||
print(f'Pulling Docker image {image}...')
|
||||
try:
|
||||
subprocess.check_call([docker, 'pull', '-q', image],
|
||||
stdout=subprocess.DEVNULL)
|
||||
except subprocess.CalledProcessError:
|
||||
print(f'Error: Failed to pull Docker image {image} for Cassandra {args.docker!r}. '
|
||||
f'Try versions like 5, 5.1 or 5.1.1.')
|
||||
exit(1)
|
||||
image_env = subprocess.check_output(
|
||||
[docker, 'image', 'inspect', '--format',
|
||||
'{{range .Config.Env}}{{println .}}{{end}}',
|
||||
image], stderr=subprocess.DEVNULL).decode('UTF-8')
|
||||
cassandra_version = next((line.split('=', 1)[1] for line in image_env.splitlines()
|
||||
if line.startswith('CASSANDRA_VERSION=')), '(unknown)')
|
||||
jv_out = subprocess.check_output(
|
||||
[docker, 'run', '--rm', image, 'java', '-version'],
|
||||
stderr=subprocess.STDOUT).decode('UTF-8')
|
||||
m = re.search(r'"([^"]+)"', jv_out)
|
||||
java_version = m.group(1) if m else '(unknown)'
|
||||
print(f'Docker Cassandra version: {cassandra_version}, Java: {java_version}')
|
||||
# The official cassandra image's default CMD is 'cassandra -f', which
|
||||
# is exactly what we want.
|
||||
docker_cmd = [docker, 'run', '--rm', '--network', 'host',
|
||||
'--user', f'{os.getuid()}:{os.getgid()}',
|
||||
'--security-opt', 'label=disable',
|
||||
'-v', f'{dir}:{dir}']
|
||||
for k, v in env.items():
|
||||
docker_cmd += ['-e', f'{k}={v}']
|
||||
docker_cmd += [image]
|
||||
return (docker_cmd, {})
|
||||
|
||||
# Generate command line and environment variables to run the user's local
|
||||
# Cassandra inside Docker containing the requested Java version.
|
||||
# The local Cassandra installation is bind-mounted at its own absolute path so
|
||||
# that the startup script finds its JARs and config at the same paths it
|
||||
# would expect on the host. We use realpath() so the mount point inside the
|
||||
# container is unambiguous (no symlinks).
|
||||
def run_cassandra_cmd_java_docker(ip, dir, env):
|
||||
cassandra_real = os.path.realpath(cassandra)
|
||||
cassandra_home = os.path.dirname(os.path.dirname(cassandra_real))
|
||||
image = f'eclipse-temurin:{args.java_docker}-jre'
|
||||
print(f'Running Cassandra with Docker Java {args.java_docker} (image: {image})')
|
||||
docker = shutil.which('docker')
|
||||
if not docker:
|
||||
print('Error: docker executable not found in PATH.')
|
||||
exit(1)
|
||||
# We could just run docker directly and the image will be downloaded
|
||||
# automatically, but we want a nice "Pulling Docker image..." message
|
||||
# while waiting, and also to print the Java version after the wait,
|
||||
# so we do something more elaborate here:
|
||||
if subprocess.call([docker, 'image', 'inspect', image],
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) != 0:
|
||||
print(f'Pulling Docker image {image}...')
|
||||
try:
|
||||
subprocess.check_call([docker, 'pull', '-q', image])
|
||||
except subprocess.CalledProcessError:
|
||||
print(f'Error: Failed to pull Docker image {image} for Java {args.java_docker!r}. '
|
||||
f'Try versions like 11, or 17.')
|
||||
exit(1)
|
||||
java_version = subprocess.check_output(
|
||||
[docker, 'image', 'inspect', '--format',
|
||||
'{{range .Config.Env}}{{println .}}{{end}}',
|
||||
image], stderr=subprocess.DEVNULL).decode('UTF-8')
|
||||
java_version = next((line.split('=',1)[1] for line in java_version.splitlines()
|
||||
if line.startswith('JAVA_VERSION=')), '(unknown)')
|
||||
print(f'Docker Java version: {java_version}')
|
||||
docker_cmd = [docker, 'run', '--rm', '--network', 'host',
|
||||
'--user', f'{os.getuid()}:{os.getgid()}',
|
||||
'--security-opt', 'label=disable',
|
||||
'-v', f'{dir}:{dir}',
|
||||
'-v', f'{cassandra_home}:{cassandra_home}:ro']
|
||||
for k, v in env.items():
|
||||
docker_cmd += ['-e', f'{k}={v}']
|
||||
docker_cmd += [image, cassandra_real, '-f']
|
||||
return (docker_cmd, {})
|
||||
|
||||
# Same as run_cassandra_cmd, just use SSL encryption for the CQL port (same
|
||||
# port number as default - replacing the unencrypted server).
|
||||
@@ -201,9 +359,10 @@ def run_cassandra_ssl_cmd(pid, dir):
|
||||
# The command and environment variables to run Cassandra are the same,
|
||||
return (cmd, env)
|
||||
|
||||
print('Cassandra is: ' + cassandra + '.')
|
||||
if cassandra:
|
||||
print(f'Cassandra is: {cassandra}.')
|
||||
|
||||
if '--ssl' in sys.argv:
|
||||
if '--ssl' in pytest_argv:
|
||||
cmd = run_cassandra_ssl_cmd
|
||||
check_cql = run.check_ssl_cql
|
||||
else:
|
||||
@@ -214,7 +373,7 @@ pid = run.run_with_temporary_dir(cmd)
|
||||
ip = run.pid_to_ip(pid)
|
||||
|
||||
run.wait_for_services(pid, [lambda: check_cql(ip)])
|
||||
success = run.run_pytest(sys.path[0], ['-o', 'xfail_strict=false', '--host', ip] + sys.argv[1:])
|
||||
success = run.run_pytest(sys.path[0], ['-o', 'xfail_strict=false', '--host', ip] + pytest_argv)
|
||||
|
||||
run.summary = 'Cassandra tests pass' if success else 'Cassandra tests failure'
|
||||
|
||||
|
||||
@@ -64,7 +64,9 @@ async def load_tablet_repair_task_infos(cql, host, table_id):
|
||||
|
||||
return repair_task_infos
|
||||
|
||||
async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256, disable_flush_cache_time = False, cmdline = None) -> (list[ServerInfo], CassandraSession, list[Host], str, str):
|
||||
async def create_table_insert_data_for_repair(manager, rf=3, tablets=8, fast_stats_refresh=True, nr_keys=256,
|
||||
disable_flush_cache_time=False, cmdline=None,
|
||||
sequential_server_add=False) -> tuple[list[ServerInfo], CassandraSession, list[Host], str, str]:
|
||||
assert rf <= 3, "A keyspace with RF > 3 will be RF-rack-invalid if there are fewer racks than the RF"
|
||||
|
||||
if fast_stats_refresh:
|
||||
@@ -73,8 +75,13 @@ async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fas
|
||||
config = {}
|
||||
if disable_flush_cache_time:
|
||||
config.update({'repair_hints_batchlog_flush_cache_time_in_ms': 0})
|
||||
servers = await manager.servers_add(3, config=config, cmdline=cmdline,
|
||||
property_file=[{"dc": "dc1", "rack": f"r{i % rf}"} for i in range(rf)])
|
||||
property_files = [{"dc": "dc1", "rack": f"r{i % rf}"} for i in range(3)]
|
||||
if sequential_server_add:
|
||||
servers = []
|
||||
for property_file in property_files:
|
||||
servers.append(await manager.server_add(config=config, cmdline=cmdline, property_file=property_file))
|
||||
else:
|
||||
servers = await manager.servers_add(len(property_files), config=config, cmdline=cmdline, property_file=property_files)
|
||||
cql = manager.get_cql()
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {{'class': 'NetworkTopologyStrategy', "
|
||||
"'replication_factor': {}}} AND tablets = {{'initial': {}}};".format(rf, tablets))
|
||||
|
||||
Reference in New Issue
Block a user