mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 11:30:36 +00:00
Compare commits
3 Commits
ykaul/skip
...
copilot/su
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae0208e35c | ||
|
|
b418e7a489 | ||
|
|
2c5727753a |
4
.github/workflows/trigger-scylla-ci.yaml
vendored
4
.github/workflows/trigger-scylla-ci.yaml
vendored
@@ -1,6 +1,6 @@
|
||||
name: Trigger Scylla CI Route
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
permissions: {}
|
||||
|
||||
on:
|
||||
issue_comment:
|
||||
|
||||
3
.github/workflows/trigger_jenkins.yaml
vendored
3
.github/workflows/trigger_jenkins.yaml
vendored
@@ -1,8 +1,5 @@
|
||||
name: Trigger next gating
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
|
||||
@@ -699,17 +699,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// for such a size.
|
||||
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
|
||||
}
|
||||
// Check the concurrency limit early, before acquiring memory and
|
||||
// reading the request body, to avoid piling up memory from excess
|
||||
// requests that will be rejected anyway. This mirrors the CQL
|
||||
// transport which also checks concurrency before memory acquisition
|
||||
// (transport/server.cc).
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
// JSON parsing can allocate up to roughly 2x the size of the raw
|
||||
// document, + a couple of bytes for maintenance.
|
||||
// If the Content-Length of the request is not available, we assume
|
||||
@@ -771,6 +760,12 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
_executor._stats.unsupported_operations++;
|
||||
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
|
||||
}
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
executor::client_state client_state(service::client_state::external_tag(),
|
||||
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
|
||||
if (!username.empty()) {
|
||||
|
||||
@@ -583,7 +583,8 @@ sstable_format: ms
|
||||
audit: "table"
|
||||
#
|
||||
# List of statement categories that should be audited.
|
||||
audit_categories: "DCL,DDL,AUTH,ADMIN"
|
||||
# Possible categories are: QUERY, DML, DCL, DDL, AUTH, ADMIN
|
||||
audit_categories: "DCL,AUTH,ADMIN"
|
||||
#
|
||||
# List of tables that should be audited.
|
||||
# audit_tables: "<keyspace_name>.<table_name>,<keyspace_name>.<table_name>"
|
||||
|
||||
@@ -1582,7 +1582,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"\tnone : No auditing enabled.\n"
|
||||
"\tsyslog : Audit messages sent to Syslog.\n"
|
||||
"\ttable : Audit messages written to column family named audit.audit_log.\n")
|
||||
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,DDL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
|
||||
, audit_categories(this, "audit_categories", liveness::LiveUpdate, value_status::Used, "DCL,AUTH,ADMIN", "Comma separated list of operation categories that should be audited.")
|
||||
, audit_tables(this, "audit_tables", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of table names (<keyspace>.<table>) that will be audited.")
|
||||
, audit_keyspaces(this, "audit_keyspaces", liveness::LiveUpdate, value_status::Used, "", "Comma separated list of keyspaces that will be audited. All tables in those keyspaces will be audited")
|
||||
, audit_unix_socket_path(this, "audit_unix_socket_path", value_status::Used, "/dev/log", "The path to the unix socket used for writing to syslog. Only applicable when audit is set to syslog.")
|
||||
|
||||
28
dist/common/scripts/scylla_swap_setup
vendored
28
dist/common/scripts/scylla_swap_setup
vendored
@@ -9,7 +9,6 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shlex
|
||||
import argparse
|
||||
import psutil
|
||||
from pathlib import Path
|
||||
@@ -104,41 +103,16 @@ if __name__ == '__main__':
|
||||
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
|
||||
swapfile.chmod(0o600)
|
||||
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
|
||||
|
||||
mount_point = find_mount_point(swap_directory)
|
||||
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
|
||||
|
||||
# Add DefaultDependencies=no to the swap unit to avoid getting the default
|
||||
# Before=swap.target dependency. We apply this to all clouds, but the
|
||||
# requirement came from Azure:
|
||||
#
|
||||
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
|
||||
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
|
||||
# the network (After=network-online.target). By extension, this means that
|
||||
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
|
||||
# then the swap unit would be part of the swap.target which other services
|
||||
# assume to be a local boot target, so we would end up with dependency cycles
|
||||
# such as:
|
||||
#
|
||||
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
|
||||
#
|
||||
# By removing the automatic Before=swap.target, the swap unit is no longer
|
||||
# part of swap.target, avoiding such cycles. The swap will still be
|
||||
# activated via WantedBy=multi-user.target.
|
||||
unit_data = '''
|
||||
[Unit]
|
||||
Description=swapfile
|
||||
DefaultDependencies=no
|
||||
After={}
|
||||
Conflicts=umount.target
|
||||
Before=umount.target
|
||||
|
||||
[Swap]
|
||||
What={}
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
'''[1:-1].format(mount_unit, swapfile)
|
||||
'''[1:-1].format(swapfile)
|
||||
with swapunit.open('w') as f:
|
||||
f.write(unit_data)
|
||||
systemd_unit.reload()
|
||||
|
||||
@@ -727,12 +727,7 @@ public:
|
||||
|
||||
// now we need one page more to be able to save one for next lap
|
||||
auto fill_size = align_up(buf1.size(), block_size) + block_size - buf1.size();
|
||||
// If the underlying stream is already at EOF (e.g. buf1 came from
|
||||
// cached _next while the previous read_exactly drained the source),
|
||||
// skip the read_exactly call — it would return empty anyway.
|
||||
auto buf2 = _input.eof()
|
||||
? temporary_buffer<char>()
|
||||
: co_await _input.read_exactly(fill_size);
|
||||
auto buf2 = co_await _input.read_exactly(fill_size);
|
||||
|
||||
temporary_buffer<char> output(buf1.size() + buf2.size());
|
||||
|
||||
|
||||
@@ -42,14 +42,7 @@ void everywhere_replication_strategy::validate_options(const gms::feature_servic
|
||||
|
||||
sstring everywhere_replication_strategy::sanity_check_read_replicas(const effective_replication_map& erm, const host_id_vector_replica_set& read_replicas) const {
|
||||
const auto replication_factor = erm.get_replication_factor();
|
||||
if (const auto& topo_info = erm.get_token_metadata().get_topology_change_info(); topo_info && topo_info->read_new) {
|
||||
if (read_replicas.size() > replication_factor + 1) {
|
||||
return seastar::format(
|
||||
"everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, "
|
||||
"cannot be higher than replication factor {} + 1 during the 'read from new replicas' stage of a topology change",
|
||||
read_replicas.size(), replication_factor);
|
||||
}
|
||||
} else if (read_replicas.size() > replication_factor) {
|
||||
if (read_replicas.size() > replication_factor) {
|
||||
return seastar::format("everywhere_replication_strategy: the number of replicas for everywhere_replication_strategy is {}, cannot be higher than replication factor {}", read_replicas.size(), replication_factor);
|
||||
}
|
||||
return {};
|
||||
|
||||
32
pgo/pgo.py
32
pgo/pgo.py
@@ -15,7 +15,6 @@ from typing import Any, Optional
|
||||
import asyncio
|
||||
import contextlib
|
||||
import glob
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -365,14 +364,12 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
|
||||
llvm_profile_file = f"{addr}-%m.profraw"
|
||||
scylla_workdir = f"{addr}"
|
||||
logfile = f"{addr}.log"
|
||||
socket = maintenance_socket_path(cluster_workdir, addr)
|
||||
command = [
|
||||
"env",
|
||||
f"LLVM_PROFILE_FILE={llvm_profile_file}",
|
||||
f"SCYLLA_HOME={os.path.realpath(os.getcwd())}", # We assume that the script has Scylla's `conf/` as its filesystem neighbour.
|
||||
os.path.realpath(executable),
|
||||
f"--workdir={scylla_workdir}",
|
||||
f"--maintenance-socket={socket}",
|
||||
"--ring-delay-ms=0",
|
||||
"--developer-mode=yes",
|
||||
"--memory=1G",
|
||||
@@ -394,7 +391,6 @@ async def start_node(executable: PathLike, cluster_workdir: PathLike, addr: str,
|
||||
f"--authenticator=PasswordAuthenticator",
|
||||
f"--authorizer=CassandraAuthorizer",
|
||||
] + list(extra_opts)
|
||||
training_logger.info(f"Using maintenance socket {socket}")
|
||||
return await run(['bash', '-c', fr"""exec {shlex.join(command)} >{q(logfile)} 2>&1"""], cwd=cluster_workdir)
|
||||
|
||||
async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optional[list[str]], workdir: PathLike, cluster_name: str, extra_opts: list[str]) -> list[Process]:
|
||||
@@ -437,25 +433,16 @@ async def start_cluster(executable: PathLike, addrs: list[str], cpusets: Optiona
|
||||
procs.append(proc)
|
||||
await wait_for_node(proc, addrs[i], timeout)
|
||||
except:
|
||||
await stop_cluster(procs, addrs, cluster_workdir=workdir)
|
||||
await stop_cluster(procs, addrs)
|
||||
raise
|
||||
return procs
|
||||
|
||||
async def stop_cluster(procs: list[Process], addrs: list[str], cluster_workdir: PathLike) -> None:
|
||||
async def stop_cluster(procs: list[Process], addrs: list[str]) -> None:
|
||||
"""Stops a Scylla cluster started with start_cluster().
|
||||
Doesn't return until all nodes exit, even if stop_cluster() is cancelled.
|
||||
|
||||
"""
|
||||
await clean_gather(*[cancel_process(p, timeout=60) for p in procs])
|
||||
_cleanup_short_sockets(cluster_workdir, addrs)
|
||||
|
||||
def _cleanup_short_sockets(cluster_workdir: PathLike, addrs: list[str]) -> None:
|
||||
"""Remove short maintenance socket files created in /tmp."""
|
||||
for addr in addrs:
|
||||
try:
|
||||
os.unlink(maintenance_socket_path(cluster_workdir, addr))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
async def wait_for_port(addr: str, port: int) -> None:
|
||||
await bash(fr'until printf "" >>/dev/tcp/{addr}/{port}; do sleep 0.1; done 2>/dev/null')
|
||||
@@ -466,17 +453,12 @@ async def merge_profraw(directory: PathLike) -> None:
|
||||
await bash(fr"llvm-profdata merge {q(directory)}/*.profraw -output {q(directory)}/prof.profdata")
|
||||
|
||||
def maintenance_socket_path(cluster_workdir: PathLike, addr: str) -> str:
|
||||
"""Return the maintenance socket path for a node.
|
||||
"""Returns the absolute path of the maintenance socket for a given node.
|
||||
|
||||
Returns a short deterministic path in /tmp (derived from an MD5 hash of
|
||||
the natural ``<cluster_workdir>/<addr>/cql.m`` path) to stay within the
|
||||
Unix domain socket length limit.
|
||||
The same path is passed to Scylla via ``--maintenance-socket`` in
|
||||
``start_node()``.
|
||||
With ``maintenance_socket: workdir`` in scylla.yaml the socket lives at
|
||||
``<node-workdir>/cql.m``, i.e. ``<cluster_workdir>/<addr>/cql.m``.
|
||||
"""
|
||||
natural = os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
|
||||
path_hash = hashlib.md5(natural.encode()).hexdigest()[:12]
|
||||
return os.path.join(tempfile.gettempdir(), f'pgo-{path_hash}.m')
|
||||
return os.path.realpath(f"{cluster_workdir}/{addr}/cql.m")
|
||||
|
||||
async def setup_cassandra_user(workdir: PathLike, addr: str) -> None:
|
||||
"""Create the ``cassandra`` superuser via the maintenance socket.
|
||||
@@ -543,7 +525,7 @@ async def with_cluster(executable: PathLike, workdir: PathLike, cpusets: Optiona
|
||||
yield addrs, procs
|
||||
finally:
|
||||
training_logger.info(f"Stopping the cluster in {workdir}")
|
||||
await stop_cluster(procs, addrs, cluster_workdir=workdir)
|
||||
await stop_cluster(procs, addrs)
|
||||
training_logger.info(f"Stopped the cluster in {workdir}")
|
||||
|
||||
################################################################################
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:54662978b9ce4a6e25790b1b0a5099e6063173ffa95a399a6287cf474376ed09
|
||||
size 6595952
|
||||
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
|
||||
size 6505524
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:0cf44ea1fb2ae20de45d26fe8095054e60cb8700cddcb2fd79ef79705484b18a
|
||||
size 6603780
|
||||
oid sha256:34a0955d2c5a88e18ddab0f1df085e10a17e14129c3e21de91e4f27ef949b6c4
|
||||
size 6502668
|
||||
|
||||
23
raft/fsm.cc
23
raft/fsm.cc
@@ -1098,8 +1098,7 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
|
||||
|
||||
// Make sure that only a leader or a node that is part of the config can request read barrier
|
||||
// Nodes outside of the config may never get the data, so they will not be able to read it.
|
||||
follower_progress* opt_progress = leader_state().tracker.find(requester);
|
||||
if (requester != _my_id && opt_progress == nullptr) {
|
||||
if (requester != _my_id && leader_state().tracker.find(requester) == nullptr) {
|
||||
throw std::runtime_error(fmt::format("Read barrier requested by a node outside of the configuration {}", requester));
|
||||
}
|
||||
|
||||
@@ -1110,23 +1109,19 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
|
||||
return {};
|
||||
}
|
||||
|
||||
// Optimization for read barriers requested on non-voters. A non-voter doesn't receive the read_quorum message, so
|
||||
// it might update its commit index only after another leader tick, which would slow down wait_for_apply() at the
|
||||
// end of the read barrier. Prevent that by replicating to the non-voting requester here.
|
||||
if (requester != _my_id && opt_progress->commit_idx < _commit_idx && opt_progress->match_idx == _log.last_idx()
|
||||
&& !opt_progress->can_vote) {
|
||||
logger.trace("start_read_barrier[{}]: replicate to {} because follower commit_idx={} < commit_idx={}, "
|
||||
"follower match_idx={} == last_idx={}, and follower can_vote={}",
|
||||
_my_id, requester, opt_progress->commit_idx, _commit_idx, opt_progress->match_idx,
|
||||
_log.last_idx(), opt_progress->can_vote);
|
||||
replicate_to(*opt_progress, true);
|
||||
}
|
||||
|
||||
read_id id = next_read_id();
|
||||
logger.trace("start_read_barrier[{}] starting read barrier with id {}", _my_id, id);
|
||||
return std::make_pair(id, _commit_idx);
|
||||
}
|
||||
|
||||
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
|
||||
// read_idx from the leader might not be replicated to the local node yet.
|
||||
const bool in_local_log = read_idx <= _log.last_idx();
|
||||
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
|
||||
advance_commit_idx(read_idx);
|
||||
}
|
||||
}
|
||||
|
||||
void fsm::stop() {
|
||||
if (is_leader()) {
|
||||
// Become follower to stop accepting requests
|
||||
|
||||
@@ -480,6 +480,15 @@ public:
|
||||
|
||||
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
|
||||
|
||||
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
|
||||
// read index belongs to the current term.
|
||||
//
|
||||
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
|
||||
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
|
||||
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
|
||||
// Completeness Property). Hence, updating the commit index is safe.
|
||||
void maybe_update_commit_idx_for_read(index_t read_idx);
|
||||
|
||||
size_t in_memory_log_size() const {
|
||||
return _log.in_memory_size();
|
||||
}
|
||||
|
||||
@@ -1109,18 +1109,6 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
|
||||
// case.
|
||||
co_await _persistence->store_term_and_vote(batch.term_and_vote->first, batch.term_and_vote->second);
|
||||
_stats.store_term_and_vote++;
|
||||
|
||||
// When the term advances, any in-flight snapshot transfers
|
||||
// belong to an outdated term: the progress tracker has been
|
||||
// reset in become_leader() or we are now a follower.
|
||||
// Abort them before we dispatch this batch's messages, which
|
||||
// may start fresh transfers for the new term.
|
||||
//
|
||||
// A vote may also change independently of the term (e.g. a
|
||||
// follower voting for a candidate at the same term), but in
|
||||
// that case there are no in-flight transfers and the abort
|
||||
// is a no-op.
|
||||
abort_snapshot_transfers();
|
||||
}
|
||||
|
||||
if (batch.snp) {
|
||||
@@ -1230,6 +1218,8 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
|
||||
// quickly) stop happening (we're outside the config after all).
|
||||
co_await _apply_entries.push_eventually(removed_from_config{});
|
||||
}
|
||||
// request aborts of snapshot transfers
|
||||
abort_snapshot_transfers();
|
||||
// abort all read barriers
|
||||
for (auto& r : _reads) {
|
||||
r.promise.set_value(not_a_leader{_fsm->current_leader()});
|
||||
@@ -1571,6 +1561,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
read_idx = std::get<index_t>(res);
|
||||
_fsm->maybe_update_commit_idx_for_read(read_idx);
|
||||
co_return stop_iteration::yes;
|
||||
});
|
||||
|
||||
|
||||
@@ -1622,14 +1622,14 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
|
||||
size_t next_file_id = 0;
|
||||
for (auto file_id : found_file_ids) {
|
||||
if (file_id != next_file_id) {
|
||||
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id).string()));
|
||||
throw std::runtime_error(fmt::format("Missing log segment file(s) detected during recovery: file {} missing", _file_mgr.get_file_path(next_file_id)));
|
||||
}
|
||||
next_file_id++;
|
||||
}
|
||||
|
||||
// populate index from all segments. keep the latest record for each key.
|
||||
for (auto file_id : found_file_ids) {
|
||||
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size());
|
||||
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id), (file_id + 1) * 100 / found_file_ids.size());
|
||||
co_await max_concurrent_for_each(segments_in_file(file_id), 32,
|
||||
[this, &db] (log_segment_id seg_id) {
|
||||
return recover_segment(db, seg_id);
|
||||
|
||||
@@ -4860,14 +4860,13 @@ table::query(schema_ptr query_schema,
|
||||
}
|
||||
|
||||
std::optional<full_position> last_pos;
|
||||
if (querier_opt) {
|
||||
if (querier_opt->current_position()) {
|
||||
last_pos.emplace(*querier_opt->current_position());
|
||||
}
|
||||
if (!saved_querier || (!querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
|
||||
co_await querier_opt->close();
|
||||
querier_opt = {};
|
||||
}
|
||||
if (querier_opt && querier_opt->current_position()) {
|
||||
last_pos.emplace(*querier_opt->current_position());
|
||||
}
|
||||
|
||||
if (!saved_querier || (querier_opt && !querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
|
||||
co_await querier_opt->close();
|
||||
querier_opt = {};
|
||||
}
|
||||
if (saved_querier) {
|
||||
*saved_querier = std::move(querier_opt);
|
||||
|
||||
@@ -87,11 +87,6 @@ target_include_directories(wasmtime_bindings
|
||||
target_link_libraries(wasmtime_bindings
|
||||
INTERFACE Rust::rust_combined)
|
||||
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
||||
# The PCH from scylla-precompiled-header is compiled with Seastar's compile
|
||||
# flags, including sanitizer flags in Debug/Sanitize modes. Any target reusing
|
||||
# this PCH must have matching compile options, otherwise the compiler rejects
|
||||
# the PCH due to flag mismatch (e.g., -fsanitize=address).
|
||||
target_link_libraries(wasmtime_bindings PRIVATE Seastar::seastar)
|
||||
target_precompile_headers(wasmtime_bindings REUSE_FROM scylla-precompiled-header)
|
||||
endif()
|
||||
|
||||
@@ -113,6 +108,5 @@ target_include_directories(inc
|
||||
target_link_libraries(inc
|
||||
INTERFACE Rust::rust_combined)
|
||||
if (Scylla_USE_PRECOMPILED_HEADER_USE)
|
||||
target_link_libraries(inc PRIVATE Seastar::seastar)
|
||||
target_precompile_headers(inc REUSE_FROM scylla-precompiled-header)
|
||||
endif()
|
||||
|
||||
@@ -538,7 +538,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
|
||||
group0_id = g0_info.group0_id;
|
||||
raft::server_address my_addr{my_id, {}};
|
||||
|
||||
bool starting_server_as_follower = false;
|
||||
if (server == nullptr) {
|
||||
// This is the first time discovery is run. Create and start a Raft server for group 0 on this node.
|
||||
raft::configuration initial_configuration;
|
||||
@@ -566,7 +565,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
|
||||
// trigger an empty snapshot transfer.
|
||||
nontrivial_snapshot = true;
|
||||
} else {
|
||||
starting_server_as_follower = true;
|
||||
co_await handshaker->pre_server_start(g0_info);
|
||||
}
|
||||
|
||||
@@ -593,9 +591,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
|
||||
}
|
||||
|
||||
SCYLLA_ASSERT(server);
|
||||
co_await utils::get_local_injector().inject("join_group0_pause_before_config_check",
|
||||
utils::wait_for_message(std::chrono::minutes{5}));
|
||||
if (!starting_server_as_follower && server->get_configuration().contains(my_id)) {
|
||||
if (server->get_configuration().contains(my_id)) {
|
||||
// True if we started a new group or completed a configuration change initiated earlier.
|
||||
group0_log.info("server {} already in group 0 (id {}) as {}", my_id, group0_id,
|
||||
server->get_configuration().can_vote(my_id)? "voter" : "non-voter");
|
||||
|
||||
@@ -910,7 +910,7 @@ future<> storage_service::merge_topology_snapshot(raft_snapshot snp) {
|
||||
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
|
||||
} else {
|
||||
co_await for_each_split_mutation(std::move(mut), max_size, [&] (mutation m) -> future<> {
|
||||
frozen_muts_to_apply.push_back(co_await freeze_gently(m));
|
||||
frozen_muts_to_apply.push_back(co_await freeze_gently(mut));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,9 +239,11 @@ public:
|
||||
// The i-th element corresponds to the i-th entry in _entries.
|
||||
// Can be smaller than _entries. If _entries[i] doesn't have a matching element in _promoted_indexes then
|
||||
// that entry doesn't have a promoted index.
|
||||
// It's not chunked, because promoted index is present only when there are large partitions in the page,
|
||||
// which also means the page will have typically only 1 entry due to summary:data_file size ratio.
|
||||
// Kept separately to avoid paying for storage cost in pages where no entry has a promoted index,
|
||||
// which is typical in workloads with small partitions.
|
||||
lsa::chunked_managed_vector<promoted_index> _promoted_indexes;
|
||||
managed_vector<promoted_index> _promoted_indexes;
|
||||
public:
|
||||
partition_index_page() = default;
|
||||
partition_index_page(partition_index_page&&) noexcept = default;
|
||||
|
||||
75
test.py
75
test.py
@@ -11,7 +11,6 @@ from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import math
|
||||
import shlex
|
||||
import textwrap
|
||||
from random import randint
|
||||
@@ -74,51 +73,6 @@ PYTEST_RUNNER_DIRECTORIES = [
|
||||
|
||||
launch_time = time.monotonic()
|
||||
|
||||
class ThreadsCalculator:
|
||||
"""
|
||||
The ThreadsCalculator class calculates the number of jobs that can be run concurrently based on system
|
||||
memory and CPU constraints. It allows resource reservation and configurable parameters for
|
||||
flexible job scheduling in various modes, such as `debug`.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
modes: list[str],
|
||||
min_system_memory_reserve: float = 5e9,
|
||||
max_system_memory_reserve: float = 8e9,
|
||||
system_memory_reserve_fraction = 16,
|
||||
max_test_memory: float = 5e9,
|
||||
test_memory_fraction: float = 8.0,
|
||||
debug_test_memory_multiplier: float = 1.5,
|
||||
debug_cpus_per_test_job=1.5,
|
||||
non_debug_cpus_per_test_job: float =1.0,
|
||||
non_debug_max_test_memory: float = 4e9
|
||||
):
|
||||
sys_mem = int(os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES"))
|
||||
test_mem = min(sys_mem / test_memory_fraction, max_test_memory)
|
||||
if "debug" in modes:
|
||||
test_mem *= debug_test_memory_multiplier
|
||||
system_memory_reserve = int(min(
|
||||
max(sys_mem / system_memory_reserve_fraction, min_system_memory_reserve),
|
||||
max_system_memory_reserve,
|
||||
))
|
||||
available_mem = max(0, sys_mem - system_memory_reserve)
|
||||
is_debug = "debug" in modes
|
||||
test_mem = min(
|
||||
sys_mem / test_memory_fraction,
|
||||
max_test_memory if is_debug else non_debug_max_test_memory,
|
||||
)
|
||||
if is_debug:
|
||||
test_mem *= debug_test_memory_multiplier
|
||||
self.cpus_per_test_job = (
|
||||
debug_cpus_per_test_job if is_debug else non_debug_cpus_per_test_job
|
||||
)
|
||||
self.default_num_jobs_mem = max(1, int(available_mem // test_mem))
|
||||
|
||||
def get_number_of_threads(self, nr_cpus: int) -> int:
|
||||
default_num_jobs_cpu = max(1, math.ceil(nr_cpus / self.cpus_per_test_job))
|
||||
return min(self.default_num_jobs_mem, default_num_jobs_cpu)
|
||||
|
||||
|
||||
|
||||
class TabularConsoleOutput:
|
||||
"""Print test progress to the console"""
|
||||
@@ -227,7 +181,7 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
help="Run only tests for given build mode(s)")
|
||||
parser.add_argument('--repeat', action="store", default="1", type=int,
|
||||
help="number of times to repeat test execution")
|
||||
parser.add_argument('--timeout', action="store", default="3600", type=int,
|
||||
parser.add_argument('--timeout', action="store", default="24000", type=int,
|
||||
help="timeout value for single test execution")
|
||||
parser.add_argument('--session-timeout', action="store", default="24000", type=int,
|
||||
help="timeout value for test.py/pytest session execution")
|
||||
@@ -319,13 +273,6 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
if args.skip_patterns and args.k:
|
||||
parser.error(palette.fail('arguments --skip and -k are mutually exclusive, please use only one of them'))
|
||||
|
||||
if not args.modes:
|
||||
try:
|
||||
args.modes = get_configured_modes()
|
||||
except Exception:
|
||||
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
|
||||
raise
|
||||
|
||||
if not args.jobs:
|
||||
if not args.cpus:
|
||||
nr_cpus = multiprocessing.cpu_count()
|
||||
@@ -333,7 +280,19 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
nr_cpus = int(subprocess.check_output(
|
||||
['taskset', '-c', args.cpus, 'python3', '-c',
|
||||
'import os; print(len(os.sched_getaffinity(0)))']))
|
||||
args.jobs = ThreadsCalculator(args.modes).get_number_of_threads(nr_cpus)
|
||||
|
||||
cpus_per_test_job = 1
|
||||
sysmem = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
|
||||
testmem = 6e9 if os.sysconf('SC_PAGE_SIZE') > 4096 else 2e9
|
||||
default_num_jobs_mem = ((sysmem - 4e9) // testmem)
|
||||
args.jobs = min(default_num_jobs_mem, nr_cpus // cpus_per_test_job)
|
||||
|
||||
if not args.modes:
|
||||
try:
|
||||
args.modes = get_configured_modes()
|
||||
except Exception:
|
||||
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
|
||||
raise
|
||||
|
||||
if not args.coverage_modes and args.coverage:
|
||||
args.coverage_modes = list(args.modes)
|
||||
@@ -391,12 +350,16 @@ def run_pytest(options: argparse.Namespace) -> tuple[int, list[SimpleNamespace]]
|
||||
if options.list_tests:
|
||||
args.extend(['--collect-only', '--quiet', '--no-header'])
|
||||
else:
|
||||
threads = int(options.jobs)
|
||||
# debug mode is very CPU and memory hungry, so we need to lower the number of threads to be able to finish tests
|
||||
if 'debug' in options.modes:
|
||||
threads = int(threads * 0.5)
|
||||
args.extend([
|
||||
"--log-level=DEBUG", # Capture logs
|
||||
f'--junit-xml={junit_output_file}',
|
||||
"-rf",
|
||||
'--test-py-init',
|
||||
f'-n{options.jobs}',
|
||||
f'-n{threads}',
|
||||
f'--tmpdir={temp_dir}',
|
||||
f'--maxfail={options.max_failures}',
|
||||
f'--alluredir={report_dir / f"allure_{HOST_ID}"}',
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/manual_clock.hh>
|
||||
#include <seastar/core/timer.hh>
|
||||
#include <seastar/util/later.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/util/alloc_failure_injector.hh>
|
||||
@@ -290,17 +290,12 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
|
||||
m.set_expiring(id1);
|
||||
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
|
||||
m.barrier().get();
|
||||
promise<> shard0_timer_expired;
|
||||
timer<manual_clock> shard0_timer([&shard0_timer_expired] {
|
||||
shard0_timer_expired.set_value();
|
||||
});
|
||||
shard0_timer.arm(manual_clock::now() + expiration_time);
|
||||
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
|
||||
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
|
||||
manual_clock::advance(expiration_time);
|
||||
BOOST_CHECK(!m.find(id1));
|
||||
return smp::submit_to(0, []{}); // Ensure shard 0 notices timer is expired.
|
||||
}).get();
|
||||
shard0_timer_expired.get_future().get();
|
||||
BOOST_CHECK(!m.find(id1));
|
||||
|
||||
// Expiring entries are replicated
|
||||
|
||||
@@ -23,11 +23,8 @@
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/limiting_data_source.hh"
|
||||
#include "utils/io-wrappers.hh"
|
||||
|
||||
#include <seastar/util/memory-data-source.hh>
|
||||
|
||||
using namespace encryption;
|
||||
|
||||
static tmpdir dir;
|
||||
@@ -598,113 +595,6 @@ SEASTAR_TEST_CASE(test_encrypted_data_source_simple) {
|
||||
co_await test_random_data_source(sizes);
|
||||
}
|
||||
|
||||
// Reproduces the production deadlock where encrypted SSTable component downloads
|
||||
// got stuck during restore. The encrypted_data_source::get() caches a block in
|
||||
// _next, then on the next call bypasses input_stream::read()'s _eof check and
|
||||
// calls input_stream::read_exactly() — which does NOT check _eof when _buf is
|
||||
// empty. This causes a second get() on the underlying source after EOS.
|
||||
//
|
||||
// In production the underlying source was chunked_download_source whose get()
|
||||
// hung forever. Here we simulate it with a strict source that fails the test.
|
||||
//
|
||||
// The fix belongs in seastar's input_stream::read_exactly(): check _eof before
|
||||
// calling _fd.get(), consistent with read(), read_up_to(), and consume().
|
||||
static future<> test_encrypted_source_copy(size_t plaintext_size) {
|
||||
testlog.info("test_encrypted_source_copy: plaintext_size={}", plaintext_size);
|
||||
|
||||
key_info info{"AES/CBC", 256};
|
||||
auto k = ::make_shared<symmetric_key>(info);
|
||||
|
||||
// Step 1: Encrypt the plaintext into memory buffers
|
||||
auto plaintext = generate_random<char>(plaintext_size);
|
||||
std::vector<temporary_buffer<char>> encrypted_bufs;
|
||||
{
|
||||
data_sink sink(make_encrypted_sink(create_memory_sink(encrypted_bufs), k));
|
||||
co_await sink.put(plaintext.clone());
|
||||
co_await sink.close();
|
||||
}
|
||||
|
||||
// Flatten encrypted buffers into a single contiguous buffer
|
||||
size_t encrypted_total = 0;
|
||||
for (const auto& b : encrypted_bufs) {
|
||||
encrypted_total += b.size();
|
||||
}
|
||||
temporary_buffer<char> encrypted(encrypted_total);
|
||||
size_t pos = 0;
|
||||
for (const auto& b : encrypted_bufs) {
|
||||
std::copy(b.begin(), b.end(), encrypted.get_write() + pos);
|
||||
pos += b.size();
|
||||
}
|
||||
|
||||
// Step 2: Create a data source from the encrypted data that fails on
|
||||
// post-EOS get() — simulating a source like chunked_download_source
|
||||
// that would hang forever in this situation.
|
||||
class strict_memory_source final : public limiting_data_source_impl {
|
||||
bool _eof = false;
|
||||
public:
|
||||
strict_memory_source(temporary_buffer<char> data, size_t chunk_size)
|
||||
: limiting_data_source_impl(
|
||||
data_source(std::make_unique<util::temporary_buffer_data_source>(std::move(data))),
|
||||
chunk_size) {}
|
||||
|
||||
future<temporary_buffer<char>> get() override {
|
||||
BOOST_REQUIRE_MESSAGE(!_eof,
|
||||
"get() called on source after it already returned EOS — "
|
||||
"this is the production deadlock: read_exactly() does not "
|
||||
"check _eof before calling _fd.get()");
|
||||
auto buf = co_await limiting_data_source_impl::get();
|
||||
_eof = buf.empty();
|
||||
co_return buf;
|
||||
}
|
||||
};
|
||||
|
||||
// Step 3: Wrap in encrypted_data_source and drain via consume() —
|
||||
// the exact code path used by seastar::copy() which is what
|
||||
// sstables_loader_helpers::download_sstable() calls.
|
||||
// Try multiple chunk sizes to hit different alignment scenarios.
|
||||
for (size_t chunk_size : {1ul, 7ul, 4096ul, 8192ul, encrypted_total, encrypted_total + 1}) {
|
||||
if (chunk_size == 0) continue;
|
||||
auto src = data_source(make_encrypted_source(
|
||||
data_source(std::make_unique<strict_memory_source>(encrypted.clone(), chunk_size)), k));
|
||||
auto in = input_stream<char>(std::move(src));
|
||||
|
||||
// consume() is what seastar::copy() uses internally. It calls
|
||||
// encrypted_data_source::get() via _fd.get() until EOF.
|
||||
size_t total_decrypted = 0;
|
||||
co_await in.consume([&total_decrypted](temporary_buffer<char> buf) {
|
||||
total_decrypted += buf.size();
|
||||
return make_ready_future<consumption_result<char>>(continue_consuming{});
|
||||
});
|
||||
co_await in.close();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(total_decrypted, plaintext_size);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_8k) {
|
||||
co_await test_encrypted_source_copy(8192);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_4k) {
|
||||
co_await test_encrypted_source_copy(4096);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_small) {
|
||||
co_await test_encrypted_source_copy(100);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_12k) {
|
||||
co_await test_encrypted_source_copy(12288);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_unaligned) {
|
||||
co_await test_encrypted_source_copy(8193);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_1byte) {
|
||||
co_await test_encrypted_source_copy(1);
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_data_source_fuzzy) {
|
||||
std::mt19937_64 rand_gen(std::random_device{}());
|
||||
|
||||
@@ -1045,6 +1045,7 @@ validate_result_size(size_t i, schema_ptr schema, const utils::chunked_vector<mu
|
||||
|
||||
struct fuzzy_test_config {
|
||||
uint32_t seed;
|
||||
std::chrono::seconds timeout;
|
||||
unsigned concurrency;
|
||||
unsigned scans;
|
||||
};
|
||||
@@ -1076,9 +1077,6 @@ run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, sharded<replica::database>&
|
||||
testlog.debug("[scan#{}]: seed={}, is_stateful={}, prange={}, ckranges={}", i, seed, is_stateful, partition_range,
|
||||
partition_slice.default_row_ranges());
|
||||
|
||||
// Use a small max_size to force many pages per scan, stressing the
|
||||
// paging and result-merging logic. With the large row limit here,
|
||||
// the byte limit is typically the tighter bound.
|
||||
const auto [results, npages] = read_partitions_with_paged_scan(db, schema, 1000, 1024, is_stateful, partition_range, partition_slice);
|
||||
|
||||
const auto expected_partitions = slice_partitions(*schema, mutations, partition_index_range, partition_slice);
|
||||
@@ -1162,27 +1160,21 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
|
||||
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 100), // range-tombstones
|
||||
#else
|
||||
// Keep these values moderate: with complex randomly-generated
|
||||
// schemas (deeply nested frozen collections/UDTs), large row
|
||||
// counts cause data generation and paged scanning to be very
|
||||
// slow, leading to CI timeouts. The test's value comes from
|
||||
// schema variety and paging correctness, not from sheer data
|
||||
// volume.
|
||||
std::uniform_int_distribution<size_t>(32, 64), // partitions
|
||||
std::uniform_int_distribution<size_t>(0, 200), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 200), // range-tombstones
|
||||
std::uniform_int_distribution<size_t>(0, 1000), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 1000), // range-tombstones
|
||||
#endif
|
||||
tests::default_timestamp_generator());
|
||||
|
||||
#if defined DEBUG
|
||||
auto cfg = fuzzy_test_config{seed, 1, 1};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{8}, 1, 1};
|
||||
#elif defined DEVEL
|
||||
auto cfg = fuzzy_test_config{seed, 2, 4};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 2, 4};
|
||||
#else
|
||||
auto cfg = fuzzy_test_config{seed, 4, 8};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 4, 8};
|
||||
#endif
|
||||
|
||||
testlog.info("Running test workload with configuration: seed={}, concurrency={}, scans={}", cfg.seed,
|
||||
testlog.info("Running test workload with configuration: seed={}, timeout={}s, concurrency={}, scans={}", cfg.seed, cfg.timeout.count(),
|
||||
cfg.concurrency, cfg.scans);
|
||||
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(tbl.schema), &compacted_frozen_mutations = tbl.compacted_frozen_mutations] {
|
||||
|
||||
@@ -906,13 +906,9 @@ SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
|
||||
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
|
||||
BOOST_REQUIRE(!entry.permit.get_abort_exception());
|
||||
|
||||
// Don't waste time retrying before the timeout is up
|
||||
sleep(ttl_timeout_test_timeout).get();
|
||||
|
||||
eventually_true([&entry] {
|
||||
return bool(entry.permit.get_abort_exception());
|
||||
});
|
||||
sleep(ttl_timeout_test_timeout * 2).get();
|
||||
|
||||
BOOST_REQUIRE(entry.permit.get_abort_exception());
|
||||
BOOST_REQUIRE_THROW(std::rethrow_exception(entry.permit.get_abort_exception()), seastar::named_semaphore_timed_out);
|
||||
}
|
||||
|
||||
|
||||
@@ -2644,10 +2644,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
|
||||
return rd;
|
||||
};
|
||||
|
||||
{
|
||||
memory::scoped_critical_alloc_section dfg;
|
||||
populate_range(cache, population_range);
|
||||
}
|
||||
populate_range(cache, population_range);
|
||||
auto rd1_v1 = assert_that(make_reader(population_range));
|
||||
mutation_reader_opt snap;
|
||||
auto close_snap = defer([&snap] {
|
||||
|
||||
@@ -257,44 +257,39 @@ async def manager(request: pytest.FixtureRequest,
|
||||
yield manager_client
|
||||
# `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve
|
||||
# test failure.
|
||||
cluster_status = None
|
||||
found_errors = {}
|
||||
failed = False
|
||||
report = request.node.stash[PHASE_REPORT_KEY]
|
||||
failed = report.when == "call" and report.failed
|
||||
|
||||
# Check if the test has the check_nodes_for_errors marker
|
||||
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
|
||||
|
||||
failed_test_dir_path = None
|
||||
try:
|
||||
report = request.node.stash[PHASE_REPORT_KEY]
|
||||
failed = report.when == "call" and report.failed
|
||||
if failed or found_errors:
|
||||
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
|
||||
# all related logs in one dir.
|
||||
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
|
||||
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
|
||||
str.maketrans('[]', '()'))
|
||||
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Check if the test has the check_nodes_for_errors marker
|
||||
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
|
||||
if failed:
|
||||
await manager_client.gather_related_logs(
|
||||
failed_test_dir_path,
|
||||
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
|
||||
)
|
||||
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
|
||||
f.write(report.longreprtext)
|
||||
if request.config.getoption('artifacts_dir_url') is not None:
|
||||
# get the relative path to the tmpdir for the failed directory
|
||||
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
|
||||
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
|
||||
urllib.parse.quote(dir_path_relative))
|
||||
record_property("TEST_LOGS", full_url)
|
||||
|
||||
if failed or found_errors:
|
||||
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
|
||||
# all related logs in one dir.
|
||||
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
|
||||
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
|
||||
str.maketrans('[]', '()'))
|
||||
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
|
||||
cluster_status = await manager_client.after_test(test_case_name, not failed)
|
||||
await manager_client.stop() # Stop client session and close driver after each test
|
||||
|
||||
if failed:
|
||||
await manager_client.gather_related_logs(
|
||||
failed_test_dir_path,
|
||||
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
|
||||
)
|
||||
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
|
||||
f.write(report.longreprtext)
|
||||
if request.config.getoption('artifacts_dir_url') is not None:
|
||||
# get the relative path to the tmpdir for the failed directory
|
||||
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
|
||||
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
|
||||
urllib.parse.quote(dir_path_relative))
|
||||
record_property("TEST_LOGS", full_url)
|
||||
|
||||
cluster_status = await manager_client.after_test(test_case_name, not failed)
|
||||
finally:
|
||||
await manager_client.stop() # Stop client session and close driver after each test
|
||||
|
||||
if cluster_status is not None and cluster_status["server_broken"] and not failed:
|
||||
if cluster_status["server_broken"] and not failed:
|
||||
failed = True
|
||||
pytest.fail(
|
||||
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"
|
||||
|
||||
@@ -10,7 +10,6 @@ import random
|
||||
import string
|
||||
import tempfile
|
||||
import time
|
||||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from pprint import pformat
|
||||
|
||||
@@ -274,30 +273,6 @@ class TesterAlternator(BaseAlternator):
|
||||
logger.info("Testing and validating an update query using key condition expression")
|
||||
logger.info(f"ConditionExpression update of short circuit is: {conditional_update_short_circuit}")
|
||||
dc2_table.update_item(**conditional_update_short_circuit)
|
||||
# Wait for cross-DC replication to reach both live DC1 nodes
|
||||
# before stopping dc2_node. The LWT commit uses LOCAL_QUORUM,
|
||||
# which only guarantees DC2 persistence; replication to DC1 is
|
||||
# async background work. Without this wait, stopping dc2_node
|
||||
# can drop in-flight RPCs to DC1 while CAS mutations don't
|
||||
# store hints. We must confirm both live DC1 replicas have the
|
||||
# data so that the later ConsistentRead=True (LOCAL_QUORUM)
|
||||
# read on restarted node1 is guaranteed to succeed.
|
||||
# See https://scylladb.atlassian.net/browse/SCYLLADB-1267
|
||||
dc1_live_nodes = [
|
||||
node for node in self.cluster.nodelist()
|
||||
if node.data_center == node1.data_center and node.server_id != node1.server_id
|
||||
]
|
||||
dc1_live_tables = [self.get_table(table_name=TABLE_NAME, node=node) for node in dc1_live_nodes]
|
||||
wait_for(
|
||||
lambda: all(
|
||||
t.get_item(
|
||||
Key={self._table_primary_key: new_pk_val}, ConsistentRead=False
|
||||
).get("Item", {}).get("c") == 3
|
||||
for t in dc1_live_tables
|
||||
),
|
||||
timeout=60,
|
||||
text="Waiting for cross-DC replication of conditional update to both live DC1 nodes",
|
||||
)
|
||||
dc2_node.stop()
|
||||
node1.start()
|
||||
|
||||
@@ -506,33 +481,28 @@ class TesterAlternator(BaseAlternator):
|
||||
2) Issue Alternator 'heavy' requests concurrently (create-table)
|
||||
3) wait for RequestLimitExceeded error response.
|
||||
"""
|
||||
# Keep the limit low to avoid exhausting LSA memory on the 1GB test node
|
||||
# when multiple CreateTable requests (Raft + schema + flush) run concurrently.
|
||||
concurrent_requests_limit = 3
|
||||
concurrent_requests_limit = 5
|
||||
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
|
||||
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
|
||||
node1 = self.cluster.nodelist()[0]
|
||||
stop_workers = threading.Event()
|
||||
create_tables_threads = []
|
||||
for tables_num in range(concurrent_requests_limit * 5):
|
||||
create_tables_threads.append(self.run_create_table_thread())
|
||||
|
||||
def run_create_table_until_limited() -> None:
|
||||
while not stop_workers.is_set():
|
||||
try:
|
||||
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
|
||||
except Exception as error: # noqa: BLE001
|
||||
if "RequestLimitExceeded" in str(error):
|
||||
stop_workers.set()
|
||||
return
|
||||
raise
|
||||
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
|
||||
def wait_for_create_table_request_failure():
|
||||
try:
|
||||
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
|
||||
except Exception as error:
|
||||
if "RequestLimitExceeded" in error.args[0]:
|
||||
return
|
||||
raise
|
||||
raise ConcurrencyLimitNotExceededError
|
||||
|
||||
with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor:
|
||||
create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)]
|
||||
wait_for_create_table_request_failure()
|
||||
|
||||
if not stop_workers.wait(timeout=30):
|
||||
raise ConcurrencyLimitNotExceededError
|
||||
|
||||
stop_workers.set()
|
||||
for future in create_table_futures:
|
||||
future.result(timeout=60)
|
||||
for thread in create_tables_threads:
|
||||
thread.join()
|
||||
|
||||
@staticmethod
|
||||
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1182,9 +1182,9 @@ class TestAuth(Tester):
|
||||
def get_session(self, node_idx=0, user=None, password=None, exclusive=True):
|
||||
node = self.cluster.nodelist()[node_idx]
|
||||
if exclusive:
|
||||
conn = self.patient_exclusive_cql_connection(node, user=user, password=password)
|
||||
conn = self.patient_exclusive_cql_connection(node, user=user, password=password, timeout=0.1)
|
||||
else:
|
||||
conn = self.patient_cql_connection(node, user=user, password=password)
|
||||
conn = self.patient_cql_connection(node, user=user, password=password, timeout=0.1)
|
||||
return conn
|
||||
|
||||
def assert_permissions_listed(self, expected, session, query, include_superuser=False):
|
||||
|
||||
@@ -199,7 +199,7 @@ class GSServer(GSFront):
|
||||
|
||||
def unpublish(self):
|
||||
for k in self.vars:
|
||||
v = self.oldvars.get(k)
|
||||
v = self.oldvars[k]
|
||||
if v:
|
||||
os.environ[k] = v
|
||||
elif os.environ.get(k):
|
||||
|
||||
@@ -215,11 +215,6 @@ async def test_node_ops_tasks_tree(manager: ManagerClient):
|
||||
servers, vt_ids = await check_remove_node_tasks_tree(manager, tm, module_name, servers, vt_ids)
|
||||
servers, vt_ids = await check_decommission_tasks_tree(manager, tm, module_name, servers, vt_ids)
|
||||
|
||||
# Reconnect the driver after topology changes (replace, removenode,
|
||||
# decommission) so that the new_test_keyspace cleanup can reach a
|
||||
# live node for DROP KEYSPACE.
|
||||
await manager.driver_connect()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_node_ops_tasks_ttl(manager: ManagerClient):
|
||||
"""Test node ops virtual tasks' ttl."""
|
||||
|
||||
@@ -1,70 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
import logging
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from test.cluster.util import get_current_group0_config
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.util import wait_for
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_bootstrap_with_quick_group0_join(manager: ManagerClient):
|
||||
"""Regression test for https://scylladb.atlassian.net/browse/SCYLLADB-959.
|
||||
|
||||
The bug was that when the bootstrapping node joined group0 before reaching
|
||||
post_server_start, it skipped post_server_start and thus hung forever.
|
||||
|
||||
The test simulates the scenario by starting the second node with the
|
||||
join_group0_pause_before_config_check injection. Without the fix, the
|
||||
startup times out.
|
||||
"""
|
||||
logger.info("Adding first server")
|
||||
s1 = await manager.server_add()
|
||||
|
||||
logger.info("Adding second server with join_group0_pause_before_config_check enabled")
|
||||
s2 = await manager.server_add(start=False, config={
|
||||
'error_injections_at_startup': ['join_group0_pause_before_config_check']
|
||||
})
|
||||
|
||||
logger.info(f"Starting {s2}")
|
||||
start_task = asyncio.create_task(manager.server_start(s2.server_id))
|
||||
|
||||
s2_log = await manager.server_open_log(s2.server_id)
|
||||
|
||||
await s2_log.wait_for("join_group0_pause_before_config_check: waiting for message", timeout=60)
|
||||
|
||||
s1_host_id = await manager.get_host_id(s1.server_id)
|
||||
s2_host_id = await manager.get_host_id(s2.server_id)
|
||||
|
||||
async def s2_in_group0_config_on_s1():
|
||||
config = await get_current_group0_config(manager, s1)
|
||||
ids = {m[0] for m in config}
|
||||
assert s1_host_id in ids # sanity check
|
||||
return True if s2_host_id in ids else None
|
||||
|
||||
# Note: we would like to wait for s2 to see itself in the group0 config, but we can't execute
|
||||
# get_current_group0_config for s2, as s2 doesn't handle CQL requests at this point. As a workaround, we wait for s1
|
||||
# to see s2 and then perform a read barrier on s2.
|
||||
logger.info(f"Waiting for {s1} to see {s2} in the group0 config")
|
||||
await wait_for(s2_in_group0_config_on_s1, deadline=time.time() + 60, period=0.1)
|
||||
|
||||
logger.info(f"Performing read barrier on {s2} to make sure it sees itself in the group0 config")
|
||||
await read_barrier(manager.api, s2.ip_addr)
|
||||
|
||||
logger.info(f"Unblocking {s2}")
|
||||
await manager.api.message_injection(s2.ip_addr, 'join_group0_pause_before_config_check')
|
||||
|
||||
logger.info(f"Waiting for {s2} to complete bootstrap")
|
||||
await asyncio.wait_for(start_task, timeout=60)
|
||||
@@ -44,7 +44,6 @@ run_in_dev:
|
||||
- dtest/bypass_cache_test
|
||||
- dtest/auth_roles_test
|
||||
- dtest/audit_test
|
||||
- audit/test_audit
|
||||
- dtest/commitlog_test
|
||||
- dtest/cfid_test
|
||||
- dtest/rebuild_test
|
||||
|
||||
@@ -177,7 +177,7 @@ async def _smoke_test(manager: ManagerClient, key_provider: KeyProviderFactory,
|
||||
# restart the cluster
|
||||
if restart:
|
||||
await restart(manager, servers, cfs)
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
else:
|
||||
await manager.rolling_restart(servers)
|
||||
for table_name in cfs:
|
||||
|
||||
@@ -438,7 +438,6 @@ async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: Scylla
|
||||
await wait_for(all_hosts_are_alive, deadline=time.time() + 60, period=0.1)
|
||||
logger.info(f"Upgrading {s.server_id}")
|
||||
await manager.server_change_version(s.server_id, scylla_binary)
|
||||
await manager.server_sees_others(s.server_id, 2, interval=60.0)
|
||||
|
||||
logger.info("Done upgrading servers")
|
||||
|
||||
|
||||
@@ -8,10 +8,7 @@ import asyncio
|
||||
import time
|
||||
import pytest
|
||||
import logging
|
||||
from functools import partial
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -19,26 +16,6 @@ logger = logging.getLogger(__name__)
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_crashed_node_substitution(manager: ManagerClient):
|
||||
"""Test that a node which crashed after starting gossip but before joining group0
|
||||
(an 'orphan' node) is eventually removed from gossip by the gossiper_orphan_remover_fiber.
|
||||
|
||||
The scenario:
|
||||
1. Start 3 nodes with the 'fast_orphan_removal_fiber' injection enabled. This freezes
|
||||
the gossiper_orphan_remover_fiber on each node before it enters its polling loop,
|
||||
so it cannot remove any orphan until explicitly unblocked.
|
||||
2. Start a 4th node with the 'crash_before_group0_join' injection enabled. This node
|
||||
starts gossip normally but blocks inside pre_server_start(), just before sending
|
||||
the join RPC to the topology coordinator. It never joins group0.
|
||||
3. Wait until the 4th node's gossip state has fully propagated to all 3 running peers,
|
||||
then trigger its crash via the injection. At this point all peers see it as an orphan:
|
||||
present in gossip but absent from the group0 topology.
|
||||
4. Assert the orphan is visible in gossip (live or down) on the surviving nodes.
|
||||
5. Unblock the gossiper_orphan_remover_fiber on all 3 nodes (via message_injection) and
|
||||
enable the 'speedup_orphan_removal' injection so the fiber removes the orphan immediately
|
||||
without waiting for the normal 60-second age threshold.
|
||||
6. Wait for the 'Finished to force remove node' log line confirming removal, then assert
|
||||
the orphan is no longer present in gossip.
|
||||
"""
|
||||
servers = await manager.servers_add(3, config={
|
||||
'error_injections_at_startup': ['fast_orphan_removal_fiber']
|
||||
})
|
||||
@@ -53,24 +30,10 @@ async def test_crashed_node_substitution(manager: ManagerClient):
|
||||
log = await manager.server_open_log(failed_server.server_id)
|
||||
await log.wait_for("finished do_send_ack2_msg")
|
||||
failed_id = await manager.get_host_id(failed_server.server_id)
|
||||
|
||||
# Wait until the failed server's gossip state has propagated to all running peers.
|
||||
# "finished do_send_ack2_msg" only guarantees that one peer completed a gossip round
|
||||
# with the failed server; other nodes learn about it only in subsequent gossip rounds.
|
||||
# Querying gossip before propagation completes would cause the assertion below to fail
|
||||
# because the orphan node would not yet appear as live or down on every peer.
|
||||
async def gossip_has_node(server: ServerInfo):
|
||||
live = await manager.api.client.get_json("/gossiper/endpoint/live", host=server.ip_addr)
|
||||
down = await manager.api.client.get_json("/gossiper/endpoint/down", host=server.ip_addr)
|
||||
return True if failed_server.ip_addr in live + down else None
|
||||
|
||||
for s in servers:
|
||||
await wait_for(partial(gossip_has_node, s), deadline=time.time() + 30)
|
||||
|
||||
await manager.api.message_injection(failed_server.ip_addr, 'crash_before_group0_join')
|
||||
|
||||
|
||||
await task
|
||||
|
||||
|
||||
live_eps = await manager.api.client.get_json("/gossiper/endpoint/live", host=servers[0].ip_addr)
|
||||
down_eps = await manager.api.client.get_json("/gossiper/endpoint/down", host=servers[0].ip_addr)
|
||||
|
||||
|
||||
@@ -17,9 +17,9 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
|
||||
from test.pylib.tablets import get_tablet_replicas
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.util import gather_safely, wait_for
|
||||
from test.pylib.util import wait_for
|
||||
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -51,42 +51,28 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
|
||||
node_count = 2
|
||||
cmdline = ["--logger-log-level", "hints_manager=trace"]
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline)
|
||||
|
||||
async def wait_for_hints_written(min_hint_count: int, timeout: int):
|
||||
async def aux():
|
||||
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
if hints_written >= min_hint_count:
|
||||
return True
|
||||
return None
|
||||
assert await wait_for(aux, time.time() + timeout)
|
||||
servers = await manager.servers_add(node_count)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
uses_tablets = await keyspace_has_tablets(manager, ks)
|
||||
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
|
||||
# Otherwise, it could happen that all mutations would target servers[0] only, which would
|
||||
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
|
||||
# distributed more or less uniformly!
|
||||
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
table = f"{ks}.t"
|
||||
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
|
||||
|
||||
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
|
||||
stmt.consistency_level = ConsistencyLevel.ANY
|
||||
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
|
||||
# Some of the inserts will be targeted to the dead node.
|
||||
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
||||
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
|
||||
# Some of the inserts will be targeted to the dead node.
|
||||
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
||||
for i in range(100):
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
|
||||
|
||||
# Verify hints are written
|
||||
await wait_for_hints_written(hints_before + 1, timeout=60)
|
||||
# Verify hints are written
|
||||
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
assert hints_after > hints_before
|
||||
|
||||
# For dropping the keyspace
|
||||
await manager.server_start(servers[1].server_id)
|
||||
# For dropping the keyspace
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_limited_concurrency_of_writes(manager: ManagerClient):
|
||||
@@ -100,7 +86,7 @@ async def test_limited_concurrency_of_writes(manager: ManagerClient):
|
||||
})
|
||||
node2 = await manager.server_add()
|
||||
|
||||
cql = await manager.get_cql_exclusive(node1)
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}") as ks:
|
||||
table = f"{ks}.t"
|
||||
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
|
||||
|
||||
@@ -151,7 +151,7 @@ async def trigger_tablet_merge(manager, servers, logs):
|
||||
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
|
||||
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
|
||||
|
||||
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
|
||||
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
|
||||
repaired_keys = set(range(0, nr_keys))
|
||||
unrepaired_keys = set()
|
||||
@@ -164,7 +164,7 @@ async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdlin
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_sstable_skipped_read_metrics(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
|
||||
await insert_keys(cql, ks, 0, 100)
|
||||
|
||||
@@ -274,7 +274,7 @@ async def test_tablet_incremental_repair_error(manager: ManagerClient):
|
||||
|
||||
async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline=['--logger-log-level', 'compaction=debug'])
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
token = -1
|
||||
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
@@ -335,7 +335,7 @@ async def test_tablet_incremental_repair_and_major(manager: ManagerClient):
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# Disable autocompaction
|
||||
for server in servers:
|
||||
@@ -381,7 +381,7 @@ async def test_tablet_incremental_repair_and_minor(manager: ManagerClient):
|
||||
|
||||
async def do_test_tablet_incremental_repair_with_split_and_merge(manager, do_split, do_merge):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -442,7 +442,7 @@ async def test_tablet_incremental_repair_with_merge(manager: ManagerClient):
|
||||
async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(manager: ManagerClient):
|
||||
nr_keys = 100
|
||||
cmdline = ["--hinted-handoff-enabled", "0"]
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
@@ -466,7 +466,7 @@ async def test_tablet_incremental_repair_existing_and_repair_produced_sstable(ma
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -507,7 +507,7 @@ async def test_tablet_incremental_repair_merge_higher_repaired_at_number(manager
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_merge_correct_repaired_at_number_after_merge(manager):
|
||||
nr_keys = 100
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -541,7 +541,7 @@ async def do_test_tablet_incremental_repair_merge_error(manager, error):
|
||||
nr_keys = 100
|
||||
# Make sure no data commit log replay after force server stop
|
||||
cmdline = ['--enable-commitlog', '0']
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await prepare_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys, cmdline)
|
||||
|
||||
# First repair
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') # sstables_repaired_at 1
|
||||
@@ -587,7 +587,7 @@ async def test_tablet_incremental_repair_merge_error_in_merge_completion_fiber(m
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
token = -1
|
||||
|
||||
sstables_repaired_at = 0
|
||||
@@ -632,7 +632,7 @@ async def test_tablet_repair_with_incremental_option(manager: ManagerClient):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_incremental_repair_tablet_time_metrics(manager: ManagerClient):
|
||||
servers, _, _, ks, _, _, _, _, _, token = await prepare_cluster_for_incremental_repair(manager)
|
||||
servers, _, _, ks, _, _, _, _, _, token = await preapre_cluster_for_incremental_repair(manager)
|
||||
time1 = 0
|
||||
time2 = 0
|
||||
|
||||
@@ -820,7 +820,7 @@ async def test_repair_sigsegv_with_diff_shard_count(manager: ManagerClient, use_
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manager: ManagerClient):
|
||||
cmdline = ['--logger-log-level', 'repair=debug']
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await prepare_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
||||
servers, cql, hosts, ks, table_id, logs, _, _, _, _ = await preapre_cluster_for_incremental_repair(manager, cmdline=cmdline)
|
||||
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
|
||||
@@ -20,7 +20,6 @@ from cassandra.query import SimpleStatement
|
||||
from test.pylib.async_cql import _wrap_future
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables, TextType, Column
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.util import unique_name
|
||||
from test.cluster.conftest import cluster_con
|
||||
|
||||
@@ -404,7 +403,6 @@ async def test_arbiter_dc_rf_rack_valid_keyspaces(manager: ManagerClient):
|
||||
for task in [*valid_keyspaces, *invalid_keyspaces]:
|
||||
_ = tg.create_task(task)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager: ManagerClient):
|
||||
"""
|
||||
This test verifies that starting a Scylla node fails when there's an RF-rack-invalid keyspace.
|
||||
@@ -466,50 +464,22 @@ async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces(manager:
|
||||
for rfs, tablets in valid_keyspaces:
|
||||
_ = tg.create_task(create_keyspace(rfs, tablets))
|
||||
|
||||
# Precondition: s1 has rf_rack_valid_keyspaces set to false.
|
||||
# Postcondition: s1 still has rf_rack_valid_keyspaces set to false.
|
||||
await manager.server_stop_gracefully(s1.server_id)
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||
|
||||
async def try_fail(rfs: List[int], dc: str, rf: int, rack_count: int):
|
||||
running_servers = await manager.running_servers()
|
||||
should_start = s1.server_id not in [server.server_id for server in running_servers]
|
||||
if should_start:
|
||||
await manager.server_start(s1.server_id)
|
||||
|
||||
ks = await create_keyspace(rfs, True)
|
||||
# We need to wait for the new schema to propagate.
|
||||
# Otherwise, it's not clear when the mutation
|
||||
# corresponding to the created keyspace will
|
||||
# arrive at server 1.
|
||||
# It could happen only after the node performs
|
||||
# the check upon start-up, effectively leading
|
||||
# to a successful start-up, which we don't want.
|
||||
# For more context, see issue: SCYLLADB-1137.
|
||||
await read_barrier(manager.api, s1.ip_addr)
|
||||
|
||||
await manager.server_stop_gracefully(s1.server_id)
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||
|
||||
err = f"The keyspace '{ks}' is required to be RF-rack-valid. " \
|
||||
f"That condition is violated for DC '{dc}': RF={rf} vs. rack count={rack_count}."
|
||||
await manager.server_start(s1.server_id, expected_error=err)
|
||||
_ = await manager.server_start(s1.server_id, expected_error=err)
|
||||
await cql.run_async(f"DROP KEYSPACE {ks}")
|
||||
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "false")
|
||||
|
||||
# Test RF-rack-invalid keyspaces.
|
||||
await try_fail([2, 0], "dc1", 2, 3)
|
||||
await try_fail([3, 2], "dc2", 2, 1)
|
||||
await try_fail([4, 1], "dc1", 4, 3)
|
||||
|
||||
# We need to perform a read barrier on the node to make
|
||||
# sure that it processes the last DROP KEYSPACE.
|
||||
# Otherwise, the node could think the RF-rack-invalid
|
||||
# keyspace still exists.
|
||||
await manager.server_start(s1.server_id)
|
||||
await read_barrier(manager.api, s1.ip_addr)
|
||||
await manager.server_stop_gracefully(s1.server_id)
|
||||
|
||||
await manager.server_update_config(s1.server_id, "rf_rack_valid_keyspaces", "true")
|
||||
await manager.server_start(s1.server_id)
|
||||
_ = await manager.server_start(s1.server_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_startup_with_keyspaces_violating_rf_rack_valid_keyspaces_but_not_enforced(manager: ManagerClient):
|
||||
|
||||
@@ -23,25 +23,10 @@ from test.cluster.object_store.conftest import format_tuples
|
||||
from test.cluster.object_store.test_backup import topo, take_snapshot, do_test_streaming_scopes
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.util import unique_name, wait_for
|
||||
from test.pylib.util import unique_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def wait_for_upload_dir_empty(upload_dir, timeout=30):
|
||||
'''
|
||||
Wait until the upload directory is empty with a timeout.
|
||||
SSTable unlinking is asynchronous and in rare situations, it can happen
|
||||
that not all sstables are deleted from the upload dir immediately after refresh is done.
|
||||
'''
|
||||
deadline = time.time() + timeout
|
||||
async def check_empty():
|
||||
files = os.listdir(upload_dir)
|
||||
if not files:
|
||||
return True
|
||||
return None
|
||||
await wait_for(check_empty, deadline, period=0.5)
|
||||
|
||||
class SSTablesOnLocalStorage:
|
||||
def __init__(self):
|
||||
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
||||
@@ -168,8 +153,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
|
||||
for s in servers:
|
||||
cf_dir = dirs[s.server_id]["cf_dir"]
|
||||
upload_dir = os.path.join(cf_dir, 'upload')
|
||||
assert os.path.exists(upload_dir)
|
||||
await wait_for_upload_dir_empty(upload_dir)
|
||||
files = os.listdir(os.path.join(cf_dir, 'upload'))
|
||||
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
|
||||
|
||||
shutil.rmtree(tmpbackup)
|
||||
|
||||
@@ -11,6 +11,7 @@ from test.cluster.util import check_token_ring_and_group0_consistency, new_test_
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@pytest.mark.asyncio
|
||||
@@ -52,7 +53,7 @@ async def test_cleanup_stop(manager: ManagerClient):
|
||||
await s0_log.wait_for('sstable_cleanup_wait: waiting', from_mark=s0_mark)
|
||||
|
||||
stop_cleanup = asyncio.create_task(manager.api.stop_compaction(servers[0].ip_addr, "CLEANUP"))
|
||||
await asyncio.sleep(1)
|
||||
time.sleep(1)
|
||||
|
||||
await manager.api.message_injection(servers[0].ip_addr, "sstable_cleanup_wait")
|
||||
await stop_cleanup
|
||||
|
||||
@@ -2279,7 +2279,7 @@ async def test_split_stopped_on_shutdown(manager: ManagerClient):
|
||||
|
||||
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
|
||||
|
||||
await log.wait_for('Stopping.*ongoing compactions', from_mark=log_mark)
|
||||
await log.wait_for('Stopping.*ongoing compactions')
|
||||
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
||||
|
||||
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)
|
||||
|
||||
@@ -196,7 +196,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
|
||||
tombstone_mark = datetime.now(timezone.utc)
|
||||
|
||||
# test #2: the tombstones are not cleaned up when one node is down
|
||||
with pytest.raises(AssertionError, match="timed out"):
|
||||
with pytest.raises(AssertionError, match="Deadline exceeded"):
|
||||
# waiting for shorter time (5s normally enough for a successful case, we expect the timeout here)
|
||||
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
||||
|
||||
@@ -249,7 +249,7 @@ async def test_group0_tombstone_gc(manager: ManagerClient):
|
||||
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# test #4a: the tombstones are not cleaned up after both live nodes join the new group0
|
||||
with pytest.raises(AssertionError, match="timed out"):
|
||||
with pytest.raises(AssertionError, match="Deadline exceeded"):
|
||||
await verify_tombstone_gc(tombstone_mark, timeout=5)
|
||||
|
||||
await manager.remove_node(servers[0].server_id, down_server.server_id)
|
||||
|
||||
@@ -165,7 +165,7 @@ async def wait_for_cdc_generations_publishing(cql: Session, hosts: list[Host], d
|
||||
unpublished_generations = topo_res[0].unpublished_cdc_generations
|
||||
return unpublished_generations is None or len(unpublished_generations) == 0 or None
|
||||
|
||||
await wait_for(all_generations_published, deadline=deadline)
|
||||
await wait_for(all_generations_published, deadline=deadline, period=1.0)
|
||||
|
||||
|
||||
async def check_system_topology_and_cdc_generations_v3_consistency(manager: ManagerClient, live_hosts: list[Host], cqls: Optional[list[Session]] = None, ignored_hosts: list[Host] = []):
|
||||
@@ -470,17 +470,6 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
|
||||
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
|
||||
|
||||
|
||||
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
|
||||
"""
|
||||
Checks whether the given keyspace uses tablets.
|
||||
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
|
||||
"""
|
||||
cql = manager.get_cql()
|
||||
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
|
||||
rows = list(rows_iter)
|
||||
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
|
||||
|
||||
|
||||
async def get_raft_log_size(cql, host) -> int:
|
||||
query = "select count(\"index\") from system.raft"
|
||||
return (await cql.run_async(query, host=host))[0][0]
|
||||
|
||||
@@ -7,42 +7,54 @@
|
||||
*/
|
||||
|
||||
#include "limiting_data_source.hh"
|
||||
#include <seastar/core/iostream.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
#include <cstdint>
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
future<temporary_buffer<char>> limiting_data_source_impl::do_get() {
|
||||
uint64_t size = std::min(_limit, _buf.size());
|
||||
auto res = _buf.share(0, size);
|
||||
_buf.trim_front(size);
|
||||
return make_ready_future<temporary_buffer<char>>(std::move(res));
|
||||
}
|
||||
class limiting_data_source_impl final : public data_source_impl {
|
||||
data_source _src;
|
||||
size_t _limit;
|
||||
temporary_buffer<char> _buf;
|
||||
future<temporary_buffer<char>> do_get() {
|
||||
uint64_t size = std::min(_limit, _buf.size());
|
||||
auto res = _buf.share(0, size);
|
||||
_buf.trim_front(size);
|
||||
return make_ready_future<temporary_buffer<char>>(std::move(res));
|
||||
}
|
||||
public:
|
||||
limiting_data_source_impl(data_source&& src, size_t limit)
|
||||
: _src(std::move(src))
|
||||
, _limit(limit)
|
||||
{}
|
||||
|
||||
limiting_data_source_impl::limiting_data_source_impl(data_source&& src, size_t limit) : _src(std::move(src)), _limit(limit) {
|
||||
}
|
||||
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
|
||||
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
|
||||
|
||||
future<temporary_buffer<char>> limiting_data_source_impl::get() {
|
||||
if (_buf.empty()) {
|
||||
virtual future<temporary_buffer<char>> get() override {
|
||||
if (_buf.empty()) {
|
||||
_buf.release();
|
||||
return _src.get().then([this] (auto&& buf) {
|
||||
_buf = std::move(buf);
|
||||
return do_get();
|
||||
});
|
||||
}
|
||||
return do_get();
|
||||
}
|
||||
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
|
||||
if (n < _buf.size()) {
|
||||
_buf.trim_front(n);
|
||||
return do_get();
|
||||
}
|
||||
n -= _buf.size();
|
||||
_buf.release();
|
||||
return _src.get().then([this](auto&& buf) {
|
||||
return _src.skip(n).then([this] (auto&& buf) {
|
||||
_buf = std::move(buf);
|
||||
return do_get();
|
||||
});
|
||||
}
|
||||
return do_get();
|
||||
}
|
||||
|
||||
future<temporary_buffer<char>> limiting_data_source_impl::skip(uint64_t n) {
|
||||
if (n < _buf.size()) {
|
||||
_buf.trim_front(n);
|
||||
return do_get();
|
||||
}
|
||||
n -= _buf.size();
|
||||
_buf.release();
|
||||
return _src.skip(n).then([this](auto&& buf) {
|
||||
_buf = std::move(buf);
|
||||
return do_get();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
data_source make_limiting_data_source(data_source&& src, size_t limit) {
|
||||
return data_source{std::make_unique<limiting_data_source_impl>(std::move(src), limit)};
|
||||
|
||||
@@ -8,25 +8,13 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/iostream.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
#include <stddef.h>
|
||||
|
||||
namespace seastar {
|
||||
|
||||
class limiting_data_source_impl : public seastar::data_source_impl {
|
||||
seastar::data_source _src;
|
||||
size_t _limit;
|
||||
seastar::temporary_buffer<char> _buf;
|
||||
seastar::future<seastar::temporary_buffer<char>> do_get();
|
||||
class data_source;
|
||||
|
||||
public:
|
||||
limiting_data_source_impl(seastar::data_source&& src, size_t limit);
|
||||
|
||||
limiting_data_source_impl(limiting_data_source_impl&&) noexcept = default;
|
||||
limiting_data_source_impl& operator=(limiting_data_source_impl&&) noexcept = default;
|
||||
|
||||
seastar::future<seastar::temporary_buffer<char>> get() override;
|
||||
seastar::future<seastar::temporary_buffer<char>> skip(uint64_t n) override;
|
||||
};
|
||||
}
|
||||
|
||||
/// \brief Creates an data_source from another data_source but returns its data in chunks not bigger than a given limit
|
||||
///
|
||||
|
||||
@@ -271,21 +271,10 @@ future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_
|
||||
// arbitrary timeout of 120s for the server to make some output. Very generous.
|
||||
// but since we (maybe) run docker, and might need to pull image, this can take
|
||||
// some time if we're unlucky.
|
||||
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
|
||||
for (auto* f : {&f1, &f2}) {
|
||||
if (f->failed()) {
|
||||
try {
|
||||
f->get();
|
||||
} catch (in_use&) {
|
||||
retry = true;
|
||||
p = std::current_exception();
|
||||
} catch (...) {
|
||||
if (!p) {
|
||||
p = std::current_exception();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
|
||||
} catch (in_use&) {
|
||||
retry = true;
|
||||
p = std::current_exception();
|
||||
} catch (...) {
|
||||
p = std::current_exception();
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
@@ -60,14 +59,7 @@ async def server_address(request, testpy_test: None|Test):
|
||||
ip = await testpy_test.suite.hosts.lease_host()
|
||||
else:
|
||||
ip = f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
|
||||
# Ask the OS to pick a free port by binding to port 0. This avoids
|
||||
# collisions with ports still in TIME_WAIT from a previous test module
|
||||
# that used the same IP. SO_REUSEADDR is set on the probe socket so it
|
||||
# can reclaim a TIME_WAIT port itself
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind((ip, 0))
|
||||
port = s.getsockname()[1]
|
||||
port = random.randint(10000, 65535)
|
||||
yield ServerAddress(ip, port)
|
||||
if testpy_test is not None:
|
||||
await testpy_test.suite.hosts.release_host(ip)
|
||||
|
||||
@@ -257,7 +257,7 @@ async def run_server(ip, port):
|
||||
|
||||
runner = aiohttp.web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = aiohttp.web.TCPSite(runner, ip, port, reuse_address=True, reuse_port=True)
|
||||
site = aiohttp.web.TCPSite(runner, ip, port)
|
||||
await site.start()
|
||||
|
||||
try:
|
||||
|
||||
@@ -4,18 +4,14 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
import itertools
|
||||
import asyncio
|
||||
import pathlib
|
||||
import re
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Callable
|
||||
|
||||
logger = logging.getLogger("DockerizedServer")
|
||||
|
||||
class DockerizedServer:
|
||||
"""class for running an external dockerized service image, typically mock server"""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
@@ -41,7 +37,6 @@ class DockerizedServer:
|
||||
self.port = None
|
||||
self.proc = None
|
||||
self.service_port = port
|
||||
self.echo_thread = None
|
||||
|
||||
async def start(self):
|
||||
"""Starts docker image on a random port"""
|
||||
@@ -50,107 +45,77 @@ class DockerizedServer:
|
||||
if exe is not None)).resolve()
|
||||
sid = f"{os.getpid()}-{DockerizedServer.newid()}"
|
||||
name = f'{self.logfilenamebase}-{sid}'
|
||||
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
|
||||
self.logfile = logfilename.open("wb")
|
||||
|
||||
docker_args = self.docker_args(self.host, self.service_port)
|
||||
image_args = self.image_args(self.host, self.service_port)
|
||||
|
||||
args = [exe, "run", "--name", name, "--rm" ]
|
||||
if self.service_port is None:
|
||||
args = args + ["-P"]
|
||||
else:
|
||||
args = args + ["-p", str(self.service_port)]
|
||||
|
||||
args = args + docker_args + [self.image] + image_args
|
||||
|
||||
# This seems weird, using the blocking IO subprocess.
|
||||
# However, we want to use a pipe reader so we can push the
|
||||
# output into the test log (because we are bad at propagating
|
||||
# log files etc from CI)
|
||||
# But the pipe reader needs to read until EOF, otherwise the
|
||||
# docker process will eventually hang. So we can't await a
|
||||
# coroutine.
|
||||
# We _can_, sort of, use pool.create_task(...) to send a coro
|
||||
# to the background, and use a signal for waiting, like here,
|
||||
# thus ensuring the coro runs forever, sort of... However,
|
||||
# this currently breaks, probably due to some part of the
|
||||
# machinery/tests that don't async fully, causing us to not
|
||||
# process the log, and thus hand/fail, bla bla.
|
||||
# The solution is to make the process synced, and use a
|
||||
# background thread (execution pool) for the processing.
|
||||
# This way we know the pipe reader will not suddenly get
|
||||
# blocked at inconvinient times.
|
||||
proc = subprocess.Popen(args, stderr=subprocess.PIPE)
|
||||
loop = asyncio.get_running_loop()
|
||||
ready_fut = loop.create_future()
|
||||
|
||||
def process_io():
|
||||
f = ready_fut
|
||||
try:
|
||||
while True:
|
||||
data = proc.stderr.readline()
|
||||
if not data:
|
||||
if f:
|
||||
loop.call_soon_threadsafe(f.set_exception, RuntimeError("Log EOF"))
|
||||
logger.debug("EOF received")
|
||||
break
|
||||
line = data.decode()
|
||||
self.logfile.write(data)
|
||||
logger.debug(line)
|
||||
if f and self.is_success_line(line, self.service_port):
|
||||
logger.info('Got start message: %s', line)
|
||||
loop.call_soon_threadsafe(f.set_result, True)
|
||||
f = None
|
||||
if f and self.is_failure_line(line, self.service_port):
|
||||
logger.info('Got fail message: %s', line)
|
||||
loop.call_soon_threadsafe(f.set_result, False)
|
||||
f = None
|
||||
except Exception as e:
|
||||
logger.error("Exception in log processing: %s", e)
|
||||
if f:
|
||||
loop.call_soon_threadsafe(f.set_exception, e)
|
||||
|
||||
self.echo_thread = loop.run_in_executor(None, process_io)
|
||||
ok = await ready_fut
|
||||
if not ok:
|
||||
self.logfile.close()
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
raise RuntimeError("Could not parse expected launch message from container")
|
||||
|
||||
check_proc = await asyncio.create_subprocess_exec(exe
|
||||
, *["container", "port", name]
|
||||
, stdout=asyncio.subprocess.PIPE
|
||||
)
|
||||
while True:
|
||||
data = await check_proc.stdout.readline()
|
||||
if not data:
|
||||
break
|
||||
s = data.decode()
|
||||
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
|
||||
if m:
|
||||
self.port = int(m.group(1))
|
||||
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
|
||||
self.logfile = logfilename.open("wb")
|
||||
|
||||
await check_proc.wait()
|
||||
if not self.port:
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
raise RuntimeError("Could not query port from container")
|
||||
self.proc = proc
|
||||
docker_args = self.docker_args(self.host, self.service_port)
|
||||
image_args = self.image_args(self.host, self.service_port)
|
||||
|
||||
args = ["run", "--name", name, "--rm" ]
|
||||
if self.service_port is None:
|
||||
args = args + ["-P"]
|
||||
else:
|
||||
args = args + ["-p", str(self.service_port)]
|
||||
|
||||
args = args + docker_args + [self.image] + image_args
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(exe, *args, stderr=self.logfile)
|
||||
failed = False
|
||||
|
||||
# In any sane world we would just pipe stderr to a pipe and launch a background
|
||||
# task to just readline from there to both check the start message as well as
|
||||
# add it to the log (preferrably via logger).
|
||||
# This works fine when doing this in a standalone python script.
|
||||
# However, for some reason, when run in a pytest fixture, the pipe will fill up,
|
||||
# without or reader waking up and doing anyhing, and for any test longer than very
|
||||
# short, we will fill the stderr buffer and hang.
|
||||
# I cannot figure out how to get around this, so we workaround it
|
||||
# instead by directing stderr to a log file, and simply repeatedly
|
||||
# try to read the info from this file until we are happy.
|
||||
async with asyncio.timeout(120):
|
||||
done = False
|
||||
while not done and not failed:
|
||||
with logfilename.open("r") as f:
|
||||
for line in f:
|
||||
if self.is_success_line(line, self.service_port):
|
||||
print(f'Got start message: {line}')
|
||||
done = True
|
||||
break
|
||||
if self.is_failure_line(line, self.service_port):
|
||||
print(f'Got fail message: {line}')
|
||||
failed = True
|
||||
break
|
||||
|
||||
if failed:
|
||||
self.logfile.close()
|
||||
await proc.wait()
|
||||
continue
|
||||
|
||||
check_proc = await asyncio.create_subprocess_exec(exe
|
||||
, *["container", "port", name]
|
||||
, stdout=asyncio.subprocess.PIPE
|
||||
)
|
||||
while True:
|
||||
data = await check_proc.stdout.readline()
|
||||
if not data:
|
||||
break
|
||||
s = data.decode()
|
||||
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
|
||||
if m:
|
||||
self.port = int(m.group(1))
|
||||
|
||||
await check_proc.wait()
|
||||
if not self.port:
|
||||
proc.kill()
|
||||
raise RuntimeError("Could not query port from container")
|
||||
self.proc = proc
|
||||
break
|
||||
|
||||
async def stop(self):
|
||||
"""Stops docker image"""
|
||||
if self.proc:
|
||||
logger.debug("Stopping docker process")
|
||||
self.proc.terminate()
|
||||
self.proc.wait()
|
||||
self.proc = None
|
||||
if self.echo_thread:
|
||||
logger.debug("Waiting for IO thread")
|
||||
await self.echo_thread
|
||||
self.echo_thread = None
|
||||
await self.proc.wait()
|
||||
if self.logfile:
|
||||
logger.debug("Closing log file")
|
||||
self.logfile.close()
|
||||
self.logfile = None
|
||||
|
||||
@@ -60,7 +60,6 @@ class ManagerClient:
|
||||
self.con_gen = con_gen
|
||||
self.ccluster: Optional[CassandraCluster] = None
|
||||
self.cql: Optional[CassandraSession] = None
|
||||
self.exclusive_clusters: List[CassandraCluster] = []
|
||||
# A client for communicating with ScyllaClusterManager (server)
|
||||
self.sock_path = sock_path
|
||||
self.client_for_asyncio_loop = {asyncio.get_running_loop(): UnixRESTClient(sock_path)}
|
||||
@@ -114,9 +113,6 @@ class ManagerClient:
|
||||
|
||||
def driver_close(self) -> None:
|
||||
"""Disconnect from cluster"""
|
||||
for cluster in self.exclusive_clusters:
|
||||
cluster.shutdown()
|
||||
self.exclusive_clusters.clear()
|
||||
if self.ccluster is not None:
|
||||
logger.debug("shutting down driver")
|
||||
safe_driver_shutdown(self.ccluster)
|
||||
@@ -138,12 +134,9 @@ class ManagerClient:
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time() + 60)
|
||||
return cql, hosts
|
||||
|
||||
async def get_cql_exclusive(self, server: ServerInfo, auth_provider: Optional[AuthProvider] = None):
|
||||
cluster = self.con_gen([server.ip_addr], self.port, self.use_ssl,
|
||||
auth_provider if auth_provider else self.auth_provider,
|
||||
WhiteListRoundRobinPolicy([server.ip_addr]))
|
||||
self.exclusive_clusters.append(cluster)
|
||||
cql = cluster.connect()
|
||||
async def get_cql_exclusive(self, server: ServerInfo):
|
||||
cql = self.con_gen([server.ip_addr], self.port, self.use_ssl, self.auth_provider,
|
||||
WhiteListRoundRobinPolicy([server.ip_addr])).connect()
|
||||
await wait_for_cql_and_get_hosts(cql, [server], time() + 60)
|
||||
return cql
|
||||
|
||||
|
||||
@@ -747,8 +747,6 @@ class ScyllaServer:
|
||||
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
|
||||
self.notify_socket.bind(str(self.notify_socket_path))
|
||||
self._received_serving = False
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def poll_status(s: socket.socket, f: asyncio.Future, logger: Union[logging.Logger, logging.LoggerAdapter]):
|
||||
# Try to read all available messages from the socket
|
||||
while True:
|
||||
@@ -758,7 +756,7 @@ class ScyllaServer:
|
||||
message = data.decode('utf-8', errors='replace')
|
||||
if 'STATUS=serving' in message:
|
||||
logger.debug("Received sd_notify 'serving' message")
|
||||
loop.call_soon_threadsafe(f.set_result, True)
|
||||
f.set_result(True)
|
||||
return
|
||||
if 'STATUS=entering maintenance mode' in message:
|
||||
logger.debug("Receive sd_notify 'entering maintenance mode'")
|
||||
@@ -768,9 +766,9 @@ class ScyllaServer:
|
||||
except Exception as e:
|
||||
logger.debug("Error reading from notify socket: %s", e)
|
||||
break
|
||||
loop.call_soon_threadsafe(f.set_result, False)
|
||||
f.set_result(False)
|
||||
|
||||
self.serving_signal = loop.create_future()
|
||||
self.serving_signal = asyncio.get_running_loop().create_future()
|
||||
t = threading.Thread(target=poll_status, args=[self.notify_socket, self.serving_signal, self.logger], daemon=True)
|
||||
t.start()
|
||||
|
||||
@@ -894,6 +892,7 @@ class ScyllaServer:
|
||||
return
|
||||
await report_error("the node startup failed, but the log file doesn't contain the expected error")
|
||||
await report_error("failed to start the node")
|
||||
self.logger.info("Wait me %s expect %s is %s", self.server_id, expected_server_up_state, server_up_state)
|
||||
if await self.try_get_host_id(api):
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
@@ -1395,11 +1394,7 @@ class ScyllaCluster:
|
||||
f"the test must drop all keyspaces it creates.")
|
||||
for server in itertools.chain(self.running.values(), self.stopped.values()):
|
||||
server.write_log_marker(f"------ Ending test {name} ------\n")
|
||||
# Only close log files when the cluster is dirty (will be destroyed).
|
||||
# If the cluster is clean and will be reused, keep the log file open
|
||||
# so that write_log_marker() and take_log_savepoint() work in the
|
||||
# next test's before_test().
|
||||
if self.is_dirty and not server.log_file.closed:
|
||||
if not server.log_file.closed:
|
||||
server.log_file.close()
|
||||
|
||||
async def server_stop(self, server_id: ServerNum, gracefully: bool) -> None:
|
||||
|
||||
@@ -56,39 +56,15 @@ def unique_name(unique_name_prefix = 'test_'):
|
||||
async def wait_for(
|
||||
pred: Callable[[], Awaitable[Optional[T]]],
|
||||
deadline: float,
|
||||
period: float = 0.1,
|
||||
period: float = 1,
|
||||
before_retry: Optional[Callable[[], Any]] = None,
|
||||
backoff_factor: float = 1.5,
|
||||
max_period: float = 1.0,
|
||||
label: Optional[str] = None) -> T:
|
||||
tag = label or getattr(pred, '__name__', 'unlabeled')
|
||||
start = time.time()
|
||||
retries = 0
|
||||
last_exception: Exception | None = None
|
||||
backoff_factor: float = 1,
|
||||
max_period: float = None) -> T:
|
||||
while True:
|
||||
elapsed = time.time() - start
|
||||
if time.time() >= deadline:
|
||||
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
|
||||
if last_exception is not None:
|
||||
timeout_msg += (
|
||||
f"; last exception: {type(last_exception).__name__}: {last_exception}"
|
||||
)
|
||||
raise AssertionError(timeout_msg) from last_exception
|
||||
raise AssertionError(timeout_msg)
|
||||
|
||||
try:
|
||||
res = await pred()
|
||||
last_exception = None
|
||||
except Exception as exc:
|
||||
res = None
|
||||
last_exception = exc
|
||||
|
||||
assert(time.time() < deadline), "Deadline exceeded, failing test."
|
||||
res = await pred()
|
||||
if res is not None:
|
||||
if retries > 0:
|
||||
logger.debug(f"wait_for({tag}) completed "
|
||||
f"in {elapsed:.2f}s ({retries} retries)")
|
||||
return res
|
||||
retries += 1
|
||||
await asyncio.sleep(period)
|
||||
period *= backoff_factor
|
||||
if max_period is not None:
|
||||
@@ -297,14 +273,14 @@ async def wait_for_view_v1(cql: Session, name: str, node_count: int, timeout: in
|
||||
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
||||
return done[0][0] == node_count or None
|
||||
deadline = time.time() + timeout
|
||||
await wait_for(view_is_built, deadline, label=f"view_v1_{name}")
|
||||
await wait_for(view_is_built, deadline)
|
||||
|
||||
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
|
||||
async def view_is_built():
|
||||
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
||||
return done[0][0] == node_count or None
|
||||
deadline = time.time() + timeout
|
||||
await wait_for(view_is_built, deadline, label=f"view_{name}")
|
||||
await wait_for(view_is_built, deadline)
|
||||
|
||||
|
||||
async def wait_for_first_completed(coros: list[Coroutine], timeout: int|None = None):
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
"""
|
||||
GDB helper functions for `scylla_gdb` tests.
|
||||
They should be loaded to GDB by "-x {dir}/gdb_utils.py}",
|
||||
when loaded, they can be run in gdb e.g. `$get_sstables()`
|
||||
when loaded, they can be run in gdb e.g. `python get_sstables()`
|
||||
|
||||
Depends on helper functions injected to GDB by `scylla-gdb.py` script.
|
||||
(sharded, for_each_table, seastar_lw_shared_ptr, find_sstables, find_vptrs, resolve,
|
||||
@@ -15,65 +15,39 @@ import gdb
|
||||
import uuid
|
||||
|
||||
|
||||
class get_schema(gdb.Function):
|
||||
"""Finds and returns a schema pointer."""
|
||||
|
||||
def __init__(self):
|
||||
super(get_schema, self).__init__('get_schema')
|
||||
|
||||
def invoke(self):
|
||||
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
|
||||
table = next(for_each_table(db))
|
||||
return seastar_lw_shared_ptr(table['_schema']).get()
|
||||
def get_schema():
|
||||
"""Execute GDB commands to get schema information."""
|
||||
db = sharded(gdb.parse_and_eval('::debug::the_database')).local()
|
||||
table = next(for_each_table(db))
|
||||
ptr = seastar_lw_shared_ptr(table['_schema']).get()
|
||||
print('schema=', ptr)
|
||||
|
||||
|
||||
class get_sstable(gdb.Function):
|
||||
"""Finds and returns an sstable pointer."""
|
||||
|
||||
def __init__(self):
|
||||
super(get_sstable, self).__init__('get_sstable')
|
||||
|
||||
def invoke(self):
|
||||
return next(find_sstables())
|
||||
def get_sstables():
|
||||
"""Execute GDB commands to get sstables information."""
|
||||
sst = next(find_sstables())
|
||||
print(f"sst=(sstables::sstable *)", sst)
|
||||
|
||||
|
||||
class get_task(gdb.Function):
|
||||
def get_task():
|
||||
"""
|
||||
Finds and returns a Scylla fiber task.
|
||||
Some commands need a task to work on. The following fixture finds one.
|
||||
Because we stopped Scylla while it was idle, we don't expect to find
|
||||
any ready task with get_local_tasks(), but we can find one with a
|
||||
find_vptrs() loop. I noticed that a nice one (with multiple tasks chained
|
||||
to it for "scylla fiber") is one from http_server::do_accept_one.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(get_task, self).__init__('get_task')
|
||||
|
||||
def invoke(self):
|
||||
for obj_addr, vtable_addr in find_vptrs():
|
||||
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
|
||||
if name and 'do_accept_one' in name:
|
||||
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
|
||||
for obj_addr, vtable_addr in find_vptrs():
|
||||
name = resolve(vtable_addr, startswith='vtable for seastar::continuation')
|
||||
if name and 'do_accept_one' in name:
|
||||
print(f"task={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
|
||||
break
|
||||
|
||||
|
||||
class get_coroutine(gdb.Function):
|
||||
"""
|
||||
Finds and returns a coroutine frame.
|
||||
Prints COROUTINE_NOT_FOUND if the coroutine is not present.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(get_coroutine, self).__init__('get_coroutine')
|
||||
|
||||
def invoke(self):
|
||||
target = 'service::topology_coordinator::run() [clone .resume]'
|
||||
for obj_addr, vtable_addr in find_vptrs():
|
||||
name = resolve(vtable_addr)
|
||||
if name and name.strip() == target:
|
||||
return obj_addr.cast(gdb.lookup_type('uintptr_t'))
|
||||
print("COROUTINE_NOT_FOUND")
|
||||
|
||||
|
||||
# Register the functions in GDB
|
||||
get_schema()
|
||||
get_sstable()
|
||||
get_task()
|
||||
get_coroutine()
|
||||
def get_coroutine():
|
||||
"""Similar to get_task(), but looks for a coroutine frame."""
|
||||
target = 'service::topology_coordinator::run() [clone .resume]'
|
||||
for obj_addr, vtable_addr in find_vptrs():
|
||||
name = resolve(vtable_addr)
|
||||
if name and name.strip() == target:
|
||||
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
|
||||
|
||||
@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import re
|
||||
|
||||
from test.scylla_gdb.conftest import execute_gdb_command
|
||||
|
||||
@@ -22,6 +23,20 @@ pytestmark = [
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def schema(gdb_cmd):
|
||||
"""
|
||||
Returns pointer to schema of the first table it finds
|
||||
Even without any user tables, we will always have system tables.
|
||||
"""
|
||||
result = execute_gdb_command(gdb_cmd, full_command="python get_schema()").stdout
|
||||
match = re.search(r"schema=\s*(0x[0-9a-fA-F]+)", result)
|
||||
schema_pointer = match.group(1) if match else None
|
||||
|
||||
return schema_pointer
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"command",
|
||||
[
|
||||
@@ -30,17 +45,21 @@ pytestmark = [
|
||||
"schema (const schema *)", # `schema` requires type-casted pointer
|
||||
],
|
||||
)
|
||||
def test_schema(gdb_cmd, command):
|
||||
result = execute_gdb_command(gdb_cmd, f"{command} $get_schema()")
|
||||
def test_schema(gdb_cmd, command, schema):
|
||||
assert schema, "Failed to find schema of any table"
|
||||
|
||||
result = execute_gdb_command(gdb_cmd, f"{command} {schema}")
|
||||
assert result.returncode == 0, (
|
||||
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
|
||||
)
|
||||
|
||||
|
||||
def test_generate_object_graph(gdb_cmd, request):
|
||||
def test_generate_object_graph(gdb_cmd, schema, request):
|
||||
assert schema, "Failed to find schema of any table"
|
||||
|
||||
tmpdir = request.config.getoption("--tmpdir")
|
||||
result = execute_gdb_command(
|
||||
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 $get_schema()"
|
||||
gdb_cmd, f"generate-object-graph -o {tmpdir}/og.dot -d 2 -t 10 {schema}"
|
||||
)
|
||||
assert result.returncode == 0, (
|
||||
f"GDB command `generate-object-graph` failed. stdout: {result.stdout} stderr: {result.stderr}"
|
||||
|
||||
@@ -7,6 +7,7 @@ Each only checks that the command does not fail - but not what it does or return
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import re
|
||||
|
||||
from test.scylla_gdb.conftest import execute_gdb_command
|
||||
|
||||
@@ -23,6 +24,16 @@ pytestmark = [
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def sstable(gdb_cmd):
|
||||
"""Finds sstable"""
|
||||
result = execute_gdb_command(gdb_cmd, full_command="python get_sstables()").stdout
|
||||
match = re.search(r"(\(sstables::sstable \*\) 0x)([0-9a-f]+)", result)
|
||||
sstable_pointer = match.group(0).strip() if match else None
|
||||
|
||||
return sstable_pointer
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"command",
|
||||
[
|
||||
@@ -30,8 +41,10 @@ pytestmark = [
|
||||
"sstable-index-cache",
|
||||
],
|
||||
)
|
||||
def test_sstable(gdb_cmd, command):
|
||||
result = execute_gdb_command(gdb_cmd, f"{command} $get_sstable()")
|
||||
def test_sstable(gdb_cmd, command, sstable):
|
||||
assert sstable, "No sstable was found"
|
||||
|
||||
result = execute_gdb_command(gdb_cmd, f"{command} {sstable}")
|
||||
assert result.returncode == 0, (
|
||||
f"GDB command {command} failed. stdout: {result.stdout} stderr: {result.stderr}"
|
||||
)
|
||||
|
||||
@@ -6,6 +6,8 @@ Tests for commands, that need a some task to work on.
|
||||
Each only checks that the command does not fail - but not what it does or returns.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from test.scylla_gdb.conftest import execute_gdb_command
|
||||
@@ -23,25 +25,59 @@ pytestmark = [
|
||||
]
|
||||
|
||||
|
||||
def test_coroutine_frame(gdb_cmd):
|
||||
@pytest.fixture(scope="module")
|
||||
def task(gdb_cmd):
|
||||
"""
|
||||
Finds a Scylla fiber task using a `find_vptrs()` loop.
|
||||
|
||||
Since Scylla is fresh‑booted, `get_local_tasks()` returns nothing.
|
||||
Nevertheless, a `find_vptrs()` scan can still discover the first task
|
||||
skeleton created by `http_server::do_accept_one` (often the earliest
|
||||
“Scylla fiber” to appear).
|
||||
"""
|
||||
result = execute_gdb_command(gdb_cmd, full_command="python get_task()").stdout
|
||||
match = re.search(r"task=(\d+)", result)
|
||||
task = match.group(1) if match else None
|
||||
return task
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def coroutine_task(gdb_cmd, scylla_server):
|
||||
"""
|
||||
Finds a coroutine task, similar to the `task` fixture.
|
||||
|
||||
This fixture executes the `coroutine_config` script in GDB to locate a
|
||||
specific coroutine task.
|
||||
"""
|
||||
result = execute_gdb_command(gdb_cmd, full_command="python get_coroutine()").stdout
|
||||
match = re.search(r"coroutine_config=\s*(.*)", result)
|
||||
if not match:
|
||||
# See https://github.com/scylladb/scylladb/issues/22501
|
||||
pytest.skip("Failed to find coroutine task. Skipping test.")
|
||||
|
||||
return match.group(1).strip()
|
||||
|
||||
|
||||
def test_coroutine_frame(gdb_cmd, coroutine_task):
|
||||
"""
|
||||
Offsets the pointer by two words to shift from the outer coroutine frame
|
||||
to the inner `seastar::task`, as required by `$coro_frame`, which expects
|
||||
a `seastar::task*`.
|
||||
"""
|
||||
assert coroutine_task, "No coroutine task was found"
|
||||
|
||||
result = execute_gdb_command(
|
||||
gdb_cmd, full_command="p *$coro_frame($get_coroutine() + 16)"
|
||||
gdb_cmd, full_command=f"p *$coro_frame({coroutine_task} + 16)"
|
||||
)
|
||||
if "COROUTINE_NOT_FOUND" in result.stdout:
|
||||
# See https://github.com/scylladb/scylladb/issues/22501
|
||||
pytest.skip("Failed to find coroutine task. Skipping test.")
|
||||
assert result.returncode == 0, (
|
||||
f"GDB command `coro_frame` failed. stdout: {result.stdout} stderr: {result.stderr}"
|
||||
)
|
||||
|
||||
|
||||
def test_fiber(gdb_cmd):
|
||||
result = execute_gdb_command(gdb_cmd, "fiber $get_task()")
|
||||
def test_fiber(gdb_cmd, task):
|
||||
assert task, f"No task was found using `find_vptrs()`"
|
||||
|
||||
result = execute_gdb_command(gdb_cmd, f"fiber {task}")
|
||||
assert result.returncode == 0, (
|
||||
f"GDB command `fiber` failed. stdout: {result.stdout} stderr: {result.stderr}"
|
||||
)
|
||||
|
||||
@@ -200,13 +200,13 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
|
||||
_metrics.add_group("tracing_keyspace_helper", {
|
||||
sm::make_counter("tracing_errors", [this] { return _stats.tracing_errors; },
|
||||
sm::description("Counts a number of errors during writing to a system_traces keyspace. "
|
||||
"One error may cause one or more tracing records to be lost.")).set_skip_when_empty(),
|
||||
"One error may cause one or more tracing records to be lost.")),
|
||||
|
||||
sm::make_counter("bad_column_family_errors", [this] { return _stats.bad_column_family_errors; },
|
||||
sm::description("Counts a number of times write failed due to one of the tables in the system_traces keyspace has an incompatible schema. "
|
||||
"One error may result one or more tracing records to be lost. "
|
||||
"Non-zero value indicates that the administrator has to take immediate steps to fix the corresponding schema. "
|
||||
"The appropriate error message will be printed in the syslog.")).set_skip_when_empty(),
|
||||
"The appropriate error message will be printed in the syslog.")),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -39,17 +39,17 @@ tracing::tracing(sstring tracing_backend_helper_class_name)
|
||||
_metrics.add_group("tracing", {
|
||||
sm::make_counter("dropped_sessions", stats.dropped_sessions,
|
||||
sm::description("Counts a number of dropped sessions due to too many pending sessions/records. "
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
|
||||
|
||||
sm::make_counter("dropped_records", stats.dropped_records,
|
||||
sm::description("Counts a number of dropped records due to too many pending records. "
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
|
||||
|
||||
sm::make_counter("trace_records_count", stats.trace_records_count,
|
||||
sm::description("This metric is a rate of tracing records generation.")),
|
||||
|
||||
sm::make_counter("trace_errors", stats.trace_errors,
|
||||
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")).set_skip_when_empty(),
|
||||
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")),
|
||||
|
||||
sm::make_gauge("active_sessions", _active_sessions,
|
||||
sm::description("Holds a number of a currently active tracing sessions.")),
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "init.hh"
|
||||
#include "supervisor.hh"
|
||||
#include "directories.hh"
|
||||
|
||||
Reference in New Issue
Block a user