mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
Compare commits
2 Commits
ykaul/skip
...
SCYLLADB-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ce2a2a479 | ||
|
|
2cdd178379 |
@@ -699,17 +699,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// for such a size.
|
||||
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
|
||||
}
|
||||
// Check the concurrency limit early, before acquiring memory and
|
||||
// reading the request body, to avoid piling up memory from excess
|
||||
// requests that will be rejected anyway. This mirrors the CQL
|
||||
// transport which also checks concurrency before memory acquisition
|
||||
// (transport/server.cc).
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
// JSON parsing can allocate up to roughly 2x the size of the raw
|
||||
// document, + a couple of bytes for maintenance.
|
||||
// If the Content-Length of the request is not available, we assume
|
||||
@@ -771,6 +760,12 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
_executor._stats.unsupported_operations++;
|
||||
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
|
||||
}
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
executor::client_state client_state(service::client_state::external_tag(),
|
||||
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
|
||||
if (!username.empty()) {
|
||||
|
||||
@@ -143,6 +143,15 @@ public:
|
||||
return value_type();
|
||||
}
|
||||
|
||||
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
|
||||
cache_value_ptr vp = _cache.find(key.key());
|
||||
if (!vp) {
|
||||
return false;
|
||||
}
|
||||
(*vp)->update_result_metadata_id(std::move(metadata_id));
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename Pred>
|
||||
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
|
||||
void remove_if(Pred&& pred) {
|
||||
|
||||
@@ -260,6 +260,10 @@ public:
|
||||
return _prepared_cache.find(key);
|
||||
}
|
||||
|
||||
bool update_prepared_result_metadata_id(const prepared_cache_key_type& key, cql_metadata_id_type metadata_id) {
|
||||
return _prepared_cache.update_result_metadata_id(key, std::move(metadata_id));
|
||||
}
|
||||
|
||||
inline
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_prepared(
|
||||
|
||||
@@ -52,6 +52,7 @@ public:
|
||||
std::vector<sstring> warnings;
|
||||
private:
|
||||
cql_metadata_id_type _metadata_id;
|
||||
bool _result_metadata_is_empty;
|
||||
|
||||
public:
|
||||
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
|
||||
@@ -71,6 +72,15 @@ public:
|
||||
void calculate_metadata_id();
|
||||
|
||||
cql_metadata_id_type get_metadata_id() const;
|
||||
|
||||
bool result_metadata_is_empty() const {
|
||||
return _result_metadata_is_empty;
|
||||
}
|
||||
|
||||
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
|
||||
_metadata_id = std::move(metadata_id);
|
||||
_result_metadata_is_empty = false;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
|
||||
, partition_key_bind_indices(std::move(partition_key_bind_indices))
|
||||
, warnings(std::move(warnings))
|
||||
, _metadata_id(bytes{})
|
||||
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
|
||||
{
|
||||
statement->set_audit_info(std::move(audit_info));
|
||||
}
|
||||
|
||||
28
dist/common/scripts/scylla_swap_setup
vendored
28
dist/common/scripts/scylla_swap_setup
vendored
@@ -9,7 +9,6 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shlex
|
||||
import argparse
|
||||
import psutil
|
||||
from pathlib import Path
|
||||
@@ -104,41 +103,16 @@ if __name__ == '__main__':
|
||||
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
|
||||
swapfile.chmod(0o600)
|
||||
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
|
||||
|
||||
mount_point = find_mount_point(swap_directory)
|
||||
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
|
||||
|
||||
# Add DefaultDependencies=no to the swap unit to avoid getting the default
|
||||
# Before=swap.target dependency. We apply this to all clouds, but the
|
||||
# requirement came from Azure:
|
||||
#
|
||||
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
|
||||
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
|
||||
# the network (After=network-online.target). By extension, this means that
|
||||
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
|
||||
# then the swap unit would be part of the swap.target which other services
|
||||
# assume to be a local boot target, so we would end up with dependency cycles
|
||||
# such as:
|
||||
#
|
||||
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
|
||||
#
|
||||
# By removing the automatic Before=swap.target, the swap unit is no longer
|
||||
# part of swap.target, avoiding such cycles. The swap will still be
|
||||
# activated via WantedBy=multi-user.target.
|
||||
unit_data = '''
|
||||
[Unit]
|
||||
Description=swapfile
|
||||
DefaultDependencies=no
|
||||
After={}
|
||||
Conflicts=umount.target
|
||||
Before=umount.target
|
||||
|
||||
[Swap]
|
||||
What={}
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
'''[1:-1].format(mount_unit, swapfile)
|
||||
'''[1:-1].format(swapfile)
|
||||
with swapunit.open('w') as f:
|
||||
f.write(unit_data)
|
||||
systemd_unit.reload()
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:54662978b9ce4a6e25790b1b0a5099e6063173ffa95a399a6287cf474376ed09
|
||||
size 6595952
|
||||
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
|
||||
size 6505524
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:0cf44ea1fb2ae20de45d26fe8095054e60cb8700cddcb2fd79ef79705484b18a
|
||||
size 6603780
|
||||
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
|
||||
size 6598648
|
||||
|
||||
23
raft/fsm.cc
23
raft/fsm.cc
@@ -1098,8 +1098,7 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
|
||||
|
||||
// Make sure that only a leader or a node that is part of the config can request read barrier
|
||||
// Nodes outside of the config may never get the data, so they will not be able to read it.
|
||||
follower_progress* opt_progress = leader_state().tracker.find(requester);
|
||||
if (requester != _my_id && opt_progress == nullptr) {
|
||||
if (requester != _my_id && leader_state().tracker.find(requester) == nullptr) {
|
||||
throw std::runtime_error(fmt::format("Read barrier requested by a node outside of the configuration {}", requester));
|
||||
}
|
||||
|
||||
@@ -1110,23 +1109,19 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
|
||||
return {};
|
||||
}
|
||||
|
||||
// Optimization for read barriers requested on non-voters. A non-voter doesn't receive the read_quorum message, so
|
||||
// it might update its commit index only after another leader tick, which would slow down wait_for_apply() at the
|
||||
// end of the read barrier. Prevent that by replicating to the non-voting requester here.
|
||||
if (requester != _my_id && opt_progress->commit_idx < _commit_idx && opt_progress->match_idx == _log.last_idx()
|
||||
&& !opt_progress->can_vote) {
|
||||
logger.trace("start_read_barrier[{}]: replicate to {} because follower commit_idx={} < commit_idx={}, "
|
||||
"follower match_idx={} == last_idx={}, and follower can_vote={}",
|
||||
_my_id, requester, opt_progress->commit_idx, _commit_idx, opt_progress->match_idx,
|
||||
_log.last_idx(), opt_progress->can_vote);
|
||||
replicate_to(*opt_progress, true);
|
||||
}
|
||||
|
||||
read_id id = next_read_id();
|
||||
logger.trace("start_read_barrier[{}] starting read barrier with id {}", _my_id, id);
|
||||
return std::make_pair(id, _commit_idx);
|
||||
}
|
||||
|
||||
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
|
||||
// read_idx from the leader might not be replicated to the local node yet.
|
||||
const bool in_local_log = read_idx <= _log.last_idx();
|
||||
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
|
||||
advance_commit_idx(read_idx);
|
||||
}
|
||||
}
|
||||
|
||||
void fsm::stop() {
|
||||
if (is_leader()) {
|
||||
// Become follower to stop accepting requests
|
||||
|
||||
@@ -480,6 +480,15 @@ public:
|
||||
|
||||
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
|
||||
|
||||
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
|
||||
// read index belongs to the current term.
|
||||
//
|
||||
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
|
||||
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
|
||||
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
|
||||
// Completeness Property). Hence, updating the commit index is safe.
|
||||
void maybe_update_commit_idx_for_read(index_t read_idx);
|
||||
|
||||
size_t in_memory_log_size() const {
|
||||
return _log.in_memory_size();
|
||||
}
|
||||
|
||||
@@ -1571,6 +1571,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
read_idx = std::get<index_t>(res);
|
||||
_fsm->maybe_update_commit_idx_for_read(read_idx);
|
||||
co_return stop_iteration::yes;
|
||||
});
|
||||
|
||||
|
||||
@@ -538,7 +538,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
|
||||
group0_id = g0_info.group0_id;
|
||||
raft::server_address my_addr{my_id, {}};
|
||||
|
||||
bool starting_server_as_follower = false;
|
||||
if (server == nullptr) {
|
||||
// This is the first time discovery is run. Create and start a Raft server for group 0 on this node.
|
||||
raft::configuration initial_configuration;
|
||||
@@ -566,7 +565,6 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
|
||||
// trigger an empty snapshot transfer.
|
||||
nontrivial_snapshot = true;
|
||||
} else {
|
||||
starting_server_as_follower = true;
|
||||
co_await handshaker->pre_server_start(g0_info);
|
||||
}
|
||||
|
||||
@@ -593,9 +591,7 @@ future<> raft_group0::join_group0(std::vector<gms::inet_address> seeds, shared_p
|
||||
}
|
||||
|
||||
SCYLLA_ASSERT(server);
|
||||
co_await utils::get_local_injector().inject("join_group0_pause_before_config_check",
|
||||
utils::wait_for_message(std::chrono::minutes{5}));
|
||||
if (!starting_server_as_follower && server->get_configuration().contains(my_id)) {
|
||||
if (server->get_configuration().contains(my_id)) {
|
||||
// True if we started a new group or completed a configuration change initiated earlier.
|
||||
group0_log.info("server {} already in group 0 (id {}) as {}", my_id, group0_id,
|
||||
server->get_configuration().can_vote(my_id)? "voter" : "non-voter");
|
||||
|
||||
@@ -239,9 +239,11 @@ public:
|
||||
// The i-th element corresponds to the i-th entry in _entries.
|
||||
// Can be smaller than _entries. If _entries[i] doesn't have a matching element in _promoted_indexes then
|
||||
// that entry doesn't have a promoted index.
|
||||
// It's not chunked, because promoted index is present only when there are large partitions in the page,
|
||||
// which also means the page will have typically only 1 entry due to summary:data_file size ratio.
|
||||
// Kept separately to avoid paying for storage cost in pages where no entry has a promoted index,
|
||||
// which is typical in workloads with small partitions.
|
||||
lsa::chunked_managed_vector<promoted_index> _promoted_indexes;
|
||||
managed_vector<promoted_index> _promoted_indexes;
|
||||
public:
|
||||
partition_index_page() = default;
|
||||
partition_index_page(partition_index_page&&) noexcept = default;
|
||||
|
||||
73
test.py
73
test.py
@@ -11,7 +11,6 @@ from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import math
|
||||
import shlex
|
||||
import textwrap
|
||||
from random import randint
|
||||
@@ -74,51 +73,6 @@ PYTEST_RUNNER_DIRECTORIES = [
|
||||
|
||||
launch_time = time.monotonic()
|
||||
|
||||
class ThreadsCalculator:
|
||||
"""
|
||||
The ThreadsCalculator class calculates the number of jobs that can be run concurrently based on system
|
||||
memory and CPU constraints. It allows resource reservation and configurable parameters for
|
||||
flexible job scheduling in various modes, such as `debug`.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
modes: list[str],
|
||||
min_system_memory_reserve: float = 5e9,
|
||||
max_system_memory_reserve: float = 8e9,
|
||||
system_memory_reserve_fraction = 16,
|
||||
max_test_memory: float = 5e9,
|
||||
test_memory_fraction: float = 8.0,
|
||||
debug_test_memory_multiplier: float = 1.5,
|
||||
debug_cpus_per_test_job=1.5,
|
||||
non_debug_cpus_per_test_job: float =1.0,
|
||||
non_debug_max_test_memory: float = 4e9
|
||||
):
|
||||
sys_mem = int(os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES"))
|
||||
test_mem = min(sys_mem / test_memory_fraction, max_test_memory)
|
||||
if "debug" in modes:
|
||||
test_mem *= debug_test_memory_multiplier
|
||||
system_memory_reserve = int(min(
|
||||
max(sys_mem / system_memory_reserve_fraction, min_system_memory_reserve),
|
||||
max_system_memory_reserve,
|
||||
))
|
||||
available_mem = max(0, sys_mem - system_memory_reserve)
|
||||
is_debug = "debug" in modes
|
||||
test_mem = min(
|
||||
sys_mem / test_memory_fraction,
|
||||
max_test_memory if is_debug else non_debug_max_test_memory,
|
||||
)
|
||||
if is_debug:
|
||||
test_mem *= debug_test_memory_multiplier
|
||||
self.cpus_per_test_job = (
|
||||
debug_cpus_per_test_job if is_debug else non_debug_cpus_per_test_job
|
||||
)
|
||||
self.default_num_jobs_mem = max(1, int(available_mem // test_mem))
|
||||
|
||||
def get_number_of_threads(self, nr_cpus: int) -> int:
|
||||
default_num_jobs_cpu = max(1, math.ceil(nr_cpus / self.cpus_per_test_job))
|
||||
return min(self.default_num_jobs_mem, default_num_jobs_cpu)
|
||||
|
||||
|
||||
|
||||
class TabularConsoleOutput:
|
||||
"""Print test progress to the console"""
|
||||
@@ -319,13 +273,6 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
if args.skip_patterns and args.k:
|
||||
parser.error(palette.fail('arguments --skip and -k are mutually exclusive, please use only one of them'))
|
||||
|
||||
if not args.modes:
|
||||
try:
|
||||
args.modes = get_configured_modes()
|
||||
except Exception:
|
||||
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
|
||||
raise
|
||||
|
||||
if not args.jobs:
|
||||
if not args.cpus:
|
||||
nr_cpus = multiprocessing.cpu_count()
|
||||
@@ -333,7 +280,19 @@ def parse_cmd_line() -> argparse.Namespace:
|
||||
nr_cpus = int(subprocess.check_output(
|
||||
['taskset', '-c', args.cpus, 'python3', '-c',
|
||||
'import os; print(len(os.sched_getaffinity(0)))']))
|
||||
args.jobs = ThreadsCalculator(args.modes).get_number_of_threads(nr_cpus)
|
||||
|
||||
cpus_per_test_job = 1
|
||||
sysmem = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
|
||||
testmem = 6e9 if os.sysconf('SC_PAGE_SIZE') > 4096 else 2e9
|
||||
default_num_jobs_mem = ((sysmem - 4e9) // testmem)
|
||||
args.jobs = min(default_num_jobs_mem, nr_cpus // cpus_per_test_job)
|
||||
|
||||
if not args.modes:
|
||||
try:
|
||||
args.modes = get_configured_modes()
|
||||
except Exception:
|
||||
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
|
||||
raise
|
||||
|
||||
if not args.coverage_modes and args.coverage:
|
||||
args.coverage_modes = list(args.modes)
|
||||
@@ -391,12 +350,16 @@ def run_pytest(options: argparse.Namespace) -> tuple[int, list[SimpleNamespace]]
|
||||
if options.list_tests:
|
||||
args.extend(['--collect-only', '--quiet', '--no-header'])
|
||||
else:
|
||||
threads = int(options.jobs)
|
||||
# debug mode is very CPU and memory hungry, so we need to lower the number of threads to be able to finish tests
|
||||
if 'debug' in options.modes:
|
||||
threads = int(threads * 0.5)
|
||||
args.extend([
|
||||
"--log-level=DEBUG", # Capture logs
|
||||
f'--junit-xml={junit_output_file}',
|
||||
"-rf",
|
||||
'--test-py-init',
|
||||
f'-n{options.jobs}',
|
||||
f'-n{threads}',
|
||||
f'--tmpdir={temp_dir}',
|
||||
f'--maxfail={options.max_failures}',
|
||||
f'--alluredir={report_dir / f"allure_{HOST_ID}"}',
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/manual_clock.hh>
|
||||
#include <seastar/core/timer.hh>
|
||||
#include <seastar/util/later.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/util/alloc_failure_injector.hh>
|
||||
@@ -290,17 +290,12 @@ SEASTAR_THREAD_TEST_CASE(test_address_map_replication) {
|
||||
m.set_expiring(id1);
|
||||
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
|
||||
m.barrier().get();
|
||||
promise<> shard0_timer_expired;
|
||||
timer<manual_clock> shard0_timer([&shard0_timer_expired] {
|
||||
shard0_timer_expired.set_value();
|
||||
});
|
||||
shard0_timer.arm(manual_clock::now() + expiration_time);
|
||||
m_svc.invoke_on(1, [] (address_map_t<manual_clock>& m) {
|
||||
BOOST_CHECK(m.find(id1) && *m.find(id1) == addr1);
|
||||
manual_clock::advance(expiration_time);
|
||||
BOOST_CHECK(!m.find(id1));
|
||||
return smp::submit_to(0, []{}); // Ensure shard 0 notices timer is expired.
|
||||
}).get();
|
||||
shard0_timer_expired.get_future().get();
|
||||
BOOST_CHECK(!m.find(id1));
|
||||
|
||||
// Expiring entries are replicated
|
||||
|
||||
@@ -1045,6 +1045,7 @@ validate_result_size(size_t i, schema_ptr schema, const utils::chunked_vector<mu
|
||||
|
||||
struct fuzzy_test_config {
|
||||
uint32_t seed;
|
||||
std::chrono::seconds timeout;
|
||||
unsigned concurrency;
|
||||
unsigned scans;
|
||||
};
|
||||
@@ -1076,9 +1077,6 @@ run_fuzzy_test_scan(size_t i, fuzzy_test_config cfg, sharded<replica::database>&
|
||||
testlog.debug("[scan#{}]: seed={}, is_stateful={}, prange={}, ckranges={}", i, seed, is_stateful, partition_range,
|
||||
partition_slice.default_row_ranges());
|
||||
|
||||
// Use a small max_size to force many pages per scan, stressing the
|
||||
// paging and result-merging logic. With the large row limit here,
|
||||
// the byte limit is typically the tighter bound.
|
||||
const auto [results, npages] = read_partitions_with_paged_scan(db, schema, 1000, 1024, is_stateful, partition_range, partition_slice);
|
||||
|
||||
const auto expected_partitions = slice_partitions(*schema, mutations, partition_index_range, partition_slice);
|
||||
@@ -1162,27 +1160,21 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
|
||||
std::uniform_int_distribution<size_t>(0, 100), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 100), // range-tombstones
|
||||
#else
|
||||
// Keep these values moderate: with complex randomly-generated
|
||||
// schemas (deeply nested frozen collections/UDTs), large row
|
||||
// counts cause data generation and paged scanning to be very
|
||||
// slow, leading to CI timeouts. The test's value comes from
|
||||
// schema variety and paging correctness, not from sheer data
|
||||
// volume.
|
||||
std::uniform_int_distribution<size_t>(32, 64), // partitions
|
||||
std::uniform_int_distribution<size_t>(0, 200), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 200), // range-tombstones
|
||||
std::uniform_int_distribution<size_t>(0, 1000), // clustering-rows
|
||||
std::uniform_int_distribution<size_t>(0, 1000), // range-tombstones
|
||||
#endif
|
||||
tests::default_timestamp_generator());
|
||||
|
||||
#if defined DEBUG
|
||||
auto cfg = fuzzy_test_config{seed, 1, 1};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{8}, 1, 1};
|
||||
#elif defined DEVEL
|
||||
auto cfg = fuzzy_test_config{seed, 2, 4};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 2, 4};
|
||||
#else
|
||||
auto cfg = fuzzy_test_config{seed, 4, 8};
|
||||
auto cfg = fuzzy_test_config{seed, std::chrono::seconds{2}, 4, 8};
|
||||
#endif
|
||||
|
||||
testlog.info("Running test workload with configuration: seed={}, concurrency={}, scans={}", cfg.seed,
|
||||
testlog.info("Running test workload with configuration: seed={}, timeout={}s, concurrency={}, scans={}", cfg.seed, cfg.timeout.count(),
|
||||
cfg.concurrency, cfg.scans);
|
||||
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(tbl.schema), &compacted_frozen_mutations = tbl.compacted_frozen_mutations] {
|
||||
|
||||
@@ -906,13 +906,9 @@ SEASTAR_THREAD_TEST_CASE(test_timeout_is_applied_on_lookup) {
|
||||
BOOST_REQUIRE(entry.permit.timeout() == new_timeout);
|
||||
BOOST_REQUIRE(!entry.permit.get_abort_exception());
|
||||
|
||||
// Don't waste time retrying before the timeout is up
|
||||
sleep(ttl_timeout_test_timeout).get();
|
||||
|
||||
eventually_true([&entry] {
|
||||
return bool(entry.permit.get_abort_exception());
|
||||
});
|
||||
sleep(ttl_timeout_test_timeout * 2).get();
|
||||
|
||||
BOOST_REQUIRE(entry.permit.get_abort_exception());
|
||||
BOOST_REQUIRE_THROW(std::rethrow_exception(entry.permit.get_abort_exception()), seastar::named_semaphore_timed_out);
|
||||
}
|
||||
|
||||
|
||||
@@ -2644,10 +2644,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
|
||||
return rd;
|
||||
};
|
||||
|
||||
{
|
||||
memory::scoped_critical_alloc_section dfg;
|
||||
populate_range(cache, population_range);
|
||||
}
|
||||
populate_range(cache, population_range);
|
||||
auto rd1_v1 = assert_that(make_reader(population_range));
|
||||
mutation_reader_opt snap;
|
||||
auto close_snap = defer([&snap] {
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "transport/response.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(schema_change_test)
|
||||
|
||||
@@ -701,6 +702,16 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
|
||||
return cql3::metadata{columns_specification}.calculate_metadata_id();
|
||||
}
|
||||
|
||||
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
|
||||
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
|
||||
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
|
||||
columns_specification.reserve(columns.size());
|
||||
for (const auto& column : columns) {
|
||||
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
|
||||
}
|
||||
return columns_specification;
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
|
||||
const auto c = std::make_pair("id", uuid_type);
|
||||
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
|
||||
@@ -751,6 +762,39 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
|
||||
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
|
||||
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
|
||||
auto stale_response_metadata_id = empty_metadata_id;
|
||||
auto columns_specification = make_columns_specification({{"role", utf8_type}});
|
||||
cql3::metadata rows_metadata(columns_specification);
|
||||
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
|
||||
|
||||
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
|
||||
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
|
||||
std::move(empty_metadata_id),
|
||||
std::move(stale_response_metadata_id)), true);
|
||||
|
||||
auto body_stream = std::move(resp).extract_body();
|
||||
auto body = body_stream.linearize();
|
||||
const auto* ptr = reinterpret_cast<const char*>(body.begin());
|
||||
|
||||
const auto flags_mask = read_be<int32_t>(ptr);
|
||||
ptr += sizeof(int32_t);
|
||||
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
|
||||
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
|
||||
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
|
||||
|
||||
const auto column_count = read_be<int32_t>(ptr);
|
||||
ptr += sizeof(int32_t);
|
||||
BOOST_REQUIRE_EQUAL(column_count, 1);
|
||||
|
||||
const auto metadata_id_length = read_be<uint16_t>(ptr);
|
||||
ptr += sizeof(uint16_t);
|
||||
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
|
||||
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
|
||||
reinterpret_cast<const bytes::value_type*>(ptr)));
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
|
||||
|
||||
auto compute_metadata_id_for_type = [&](
|
||||
|
||||
328
test/cluster/auth_cluster/test_prepared_metadata_promotion.py
Normal file
328
test/cluster/auth_cluster/test_prepared_metadata_promotion.py
Normal file
@@ -0,0 +1,328 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import dataclasses
|
||||
import hashlib
|
||||
import socket
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error
|
||||
from test.pylib.util import unique_name
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Minimal raw CQL v4 socket helpers with SCYLLA_USE_METADATA_ID extension.
|
||||
#
|
||||
# The standard Python driver never negotiates SCYLLA_USE_METADATA_ID and
|
||||
# therefore never includes result_metadata_id in EXECUTE requests for
|
||||
# protocol v4. In CQL v5 result_metadata_id exchange is mandatory and
|
||||
# built into the wire format; until Scylla implements v5, this extension
|
||||
# provides the same semantics on v4. The helpers below implement just
|
||||
# enough of the CQL wire protocol to exercise the server-side prepared
|
||||
# metadata promotion path introduced for v5 compatibility.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# CQL opcodes
|
||||
_OP_STARTUP = 0x01
|
||||
_OP_AUTH_RESPONSE = 0x0F
|
||||
_OP_PREPARE = 0x09
|
||||
_OP_EXECUTE = 0x0A
|
||||
_OP_READY = 0x02
|
||||
_OP_AUTHENTICATE = 0x03
|
||||
_OP_RESULT = 0x08
|
||||
_OP_AUTH_SUCCESS = 0x10
|
||||
|
||||
# RESULT kind codes
|
||||
_RESULT_KIND_ROWS = 0x00000002
|
||||
_RESULT_KIND_PREPARED = 0x00000004
|
||||
|
||||
# Rows metadata flags (bit positions in the uint32 flags field)
|
||||
_META_NO_METADATA = 1 << 2
|
||||
_META_METADATA_CHANGED = 1 << 3
|
||||
|
||||
# EXECUTE options flags (1-byte field in CQL v4)
|
||||
_FLAG_SKIP_METADATA = 0x02
|
||||
|
||||
_FRAME_HEADER_SIZE = 9 # version(1)+flags(1)+stream(2)+opcode(1)+length(4)
|
||||
_CQL_VERSION = "3.0.0"
|
||||
_DEFAULT_CONSISTENCY = 0x0006 # LOCAL_QUORUM
|
||||
|
||||
|
||||
def _pack_short(v: int) -> bytes:
|
||||
return struct.pack(">H", v)
|
||||
|
||||
|
||||
def _pack_int(v: int) -> bytes:
|
||||
return struct.pack(">I", v)
|
||||
|
||||
|
||||
def _short_bytes(b: bytes) -> bytes:
|
||||
"""CQL [short bytes]: uint16 length prefix + payload."""
|
||||
return _pack_short(len(b)) + b
|
||||
|
||||
|
||||
def _long_string(s: str) -> bytes:
|
||||
"""CQL [long string]: uint32 length prefix + UTF-8 bytes."""
|
||||
b = s.encode()
|
||||
return _pack_int(len(b)) + b
|
||||
|
||||
|
||||
def _string_map(d: dict[str, str]) -> bytes:
|
||||
"""CQL [string map]: uint16 count + (uint16-prefixed-string, uint16-prefixed-string)*."""
|
||||
out = _pack_short(len(d))
|
||||
for k, v in d.items():
|
||||
out += _short_bytes(k.encode())
|
||||
out += _short_bytes(v.encode())
|
||||
return out
|
||||
|
||||
|
||||
def _frame(opcode: int, body: bytes, stream: int) -> bytes:
|
||||
"""Build a CQL v4 request frame."""
|
||||
return struct.pack(">BBHBI", 0x04, 0x00, stream, opcode, len(body)) + body
|
||||
|
||||
|
||||
def _recv_frame(sock: socket.socket) -> tuple[int, int, bytes]:
|
||||
"""Read one CQL v4 response frame; return (stream, opcode, body)."""
|
||||
header = b""
|
||||
while len(header) < _FRAME_HEADER_SIZE:
|
||||
chunk = sock.recv(_FRAME_HEADER_SIZE - len(header))
|
||||
assert chunk, "Connection closed while reading frame header"
|
||||
header += chunk
|
||||
_version, _flags = struct.unpack(">BB", header[0:2])
|
||||
stream = struct.unpack(">H", header[2:4])[0]
|
||||
opcode = header[4]
|
||||
length = struct.unpack(">I", header[5:9])[0]
|
||||
body = b""
|
||||
while len(body) < length:
|
||||
chunk = sock.recv(length - len(body))
|
||||
assert chunk, "Connection closed while reading frame body"
|
||||
body += chunk
|
||||
return stream, opcode, body
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class ExecuteResult:
|
||||
"""Parsed outcome of a ROWS EXECUTE response."""
|
||||
|
||||
metadata_changed: bool
|
||||
no_metadata: bool
|
||||
column_count: int
|
||||
result_metadata_id: bytes | None
|
||||
|
||||
|
||||
def _cql_connect(host: str, port: int, username: str, password: str) -> socket.socket:
|
||||
"""
|
||||
Open a raw TCP socket to *host*:*port* and perform the CQL v4 handshake,
|
||||
negotiating the SCYLLA_USE_METADATA_ID extension so that result_metadata_id
|
||||
is exchanged on the wire — identical to the mandatory CQL v5 behaviour.
|
||||
"""
|
||||
sock = socket.create_connection((host, port))
|
||||
stream = 1
|
||||
|
||||
# STARTUP with SCYLLA_USE_METADATA_ID enables the v5-style metadata_id
|
||||
# exchange for this v4 connection.
|
||||
startup_opts = {"CQL_VERSION": _CQL_VERSION, "SCYLLA_USE_METADATA_ID": ""}
|
||||
sock.sendall(_frame(_OP_STARTUP, _string_map(startup_opts), stream))
|
||||
_, opcode, payload = _recv_frame(sock)
|
||||
|
||||
if opcode == _OP_READY:
|
||||
return sock
|
||||
|
||||
assert opcode == _OP_AUTHENTICATE, (
|
||||
f"Expected AUTHENTICATE(0x{_OP_AUTHENTICATE:02x}), got 0x{opcode:02x}"
|
||||
)
|
||||
|
||||
# PlainText SASL token: NUL + username + NUL + password
|
||||
creds = b"\x00" + username.encode() + b"\x00" + password.encode()
|
||||
stream += 1
|
||||
sock.sendall(_frame(_OP_AUTH_RESPONSE, _short_bytes(creds), stream))
|
||||
_, auth_op, _ = _recv_frame(sock)
|
||||
assert auth_op == _OP_AUTH_SUCCESS, f"Authentication failed: opcode=0x{auth_op:02x}"
|
||||
return sock
|
||||
|
||||
|
||||
def _cql_prepare(sock: socket.socket, stream: int, query: str) -> bytes:
|
||||
"""PREPARE *query* and return the server-assigned query_id."""
|
||||
sock.sendall(_frame(_OP_PREPARE, _long_string(query), stream))
|
||||
_, opcode, payload = _recv_frame(sock)
|
||||
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
|
||||
|
||||
pos = 0
|
||||
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||
pos += 4
|
||||
assert kind == _RESULT_KIND_PREPARED, f"Expected PREPARED kind, got {kind}"
|
||||
|
||||
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
|
||||
pos += 2
|
||||
return bytes(payload[pos : pos + id_len])
|
||||
|
||||
|
||||
def _cql_execute_with_metadata_id(
|
||||
sock: socket.socket,
|
||||
stream: int,
|
||||
query_id: bytes,
|
||||
result_metadata_id: bytes,
|
||||
consistency: int = _DEFAULT_CONSISTENCY,
|
||||
) -> ExecuteResult:
|
||||
"""
|
||||
Send EXECUTE carrying *result_metadata_id* on the wire.
|
||||
|
||||
With SCYLLA_USE_METADATA_ID active the server reads result_metadata_id
|
||||
immediately after query_id (before the options block), mirroring CQL v5
|
||||
wire format. SKIP_METADATA is set so a normal response returns no column
|
||||
specs; only the METADATA_CHANGED promotion path returns actual metadata.
|
||||
"""
|
||||
# options block: [consistency: uint16][flags: byte]
|
||||
options = struct.pack(">HB", consistency, _FLAG_SKIP_METADATA)
|
||||
body = _short_bytes(query_id) + _short_bytes(result_metadata_id) + options
|
||||
sock.sendall(_frame(_OP_EXECUTE, body, stream))
|
||||
_, opcode, payload = _recv_frame(sock)
|
||||
assert opcode == _OP_RESULT, f"Expected RESULT, got 0x{opcode:02x}"
|
||||
|
||||
pos = 0
|
||||
kind = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||
pos += 4
|
||||
assert kind == _RESULT_KIND_ROWS, f"Expected ROWS kind, got {kind}"
|
||||
|
||||
meta_flags = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||
pos += 4
|
||||
column_count = struct.unpack(">I", payload[pos : pos + 4])[0]
|
||||
pos += 4
|
||||
|
||||
metadata_changed = bool(meta_flags & _META_METADATA_CHANGED)
|
||||
no_metadata = bool(meta_flags & _META_NO_METADATA)
|
||||
|
||||
response_metadata_id: bytes | None = None
|
||||
if metadata_changed:
|
||||
id_len = struct.unpack(">H", payload[pos : pos + 2])[0]
|
||||
pos += 2
|
||||
response_metadata_id = bytes(payload[pos : pos + id_len])
|
||||
|
||||
return ExecuteResult(
|
||||
metadata_changed=metadata_changed,
|
||||
no_metadata=no_metadata,
|
||||
column_count=column_count,
|
||||
result_metadata_id=response_metadata_id,
|
||||
)
|
||||
|
||||
|
||||
def _prepare_and_execute(
|
||||
host: str, query: str, stale_metadata_id: bytes
|
||||
) -> ExecuteResult:
|
||||
"""
|
||||
Open a raw socket connection (negotiating SCYLLA_USE_METADATA_ID), prepare
|
||||
*query*, execute it with *stale_metadata_id*, and return the parsed result.
|
||||
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
|
||||
"""
|
||||
sock = _cql_connect(host, 9042, "cassandra", "cassandra")
|
||||
try:
|
||||
stream = 1
|
||||
stream += 1
|
||||
query_id = _cql_prepare(sock, stream, query)
|
||||
stream += 1
|
||||
return _cql_execute_with_metadata_id(sock, stream, query_id, stale_metadata_id)
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_roles_of_prepared_metadata_promotion(
|
||||
manager: ManagerClient,
|
||||
) -> None:
|
||||
"""Verify that the server promotes the prepared metadata_id for statements
|
||||
whose PREPARE response carries empty result metadata (NO_METADATA).
|
||||
|
||||
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
|
||||
does not know the result set schema because the statement implementation
|
||||
builds the metadata dynamically at execute time. The server therefore
|
||||
returns the metadata_id of empty metadata in the PREPARE response.
|
||||
|
||||
When the client later sends EXECUTE with SKIP_METADATA and the stale
|
||||
empty metadata_id, the server should detect the mismatch (the actual rows
|
||||
have real metadata) and respond with a ``METADATA_CHANGED`` result that
|
||||
carries the real metadata_id so the client can update its cache. This is
|
||||
the behaviour mandated by CQL v5; on CQL v4 it is exercised via the
|
||||
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
|
||||
wire-level exchange.
|
||||
"""
|
||||
server = await manager.server_add(config=auth_config)
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
|
||||
role = "r" + unique_name()
|
||||
await cql.run_async(f"CREATE ROLE {role}")
|
||||
|
||||
# Any non-empty bytes that differ from the real metadata_id serves as the
|
||||
# "stale" cache entry the client would send after a PREPARE that returned
|
||||
# empty metadata.
|
||||
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
|
||||
|
||||
result = await asyncio.to_thread(
|
||||
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}", stale_metadata_id
|
||||
)
|
||||
|
||||
assert result.metadata_changed, (
|
||||
f"expected EXECUTE for LIST ROLES OF {role} to return METADATA_CHANGED "
|
||||
f"after PREPARE returned an empty result_metadata_id"
|
||||
)
|
||||
assert not result.no_metadata, (
|
||||
f"expected EXECUTE for LIST ROLES OF {role} to not have NO_METADATA flag "
|
||||
f"when METADATA_CHANGED is set"
|
||||
)
|
||||
assert result.result_metadata_id is not None, (
|
||||
f"expected EXECUTE for LIST ROLES OF {role} to return a result_metadata_id "
|
||||
f"alongside METADATA_CHANGED"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(
|
||||
mode="release", reason="error injection is disabled in release mode"
|
||||
)
|
||||
async def test_list_roles_of_prepared_metadata_promotion_suppressed_by_injection(
|
||||
manager: ManagerClient,
|
||||
) -> None:
|
||||
"""Verify that the ``skip_rows_metadata_changed_response`` error injection
|
||||
suppresses the metadata promotion, leaving the response with NO_METADATA
|
||||
and without METADATA_CHANGED.
|
||||
|
||||
This is the negative/regression counterpart of
|
||||
``test_list_roles_of_prepared_metadata_promotion``: it confirms that the
|
||||
happy-path test is not a false positive by showing that the promotion can
|
||||
be disabled, and that the injection point itself works correctly.
|
||||
"""
|
||||
server = await manager.server_add(config=auth_config)
|
||||
cql, _ = await manager.get_ready_cql([server])
|
||||
|
||||
role = "r" + unique_name()
|
||||
await cql.run_async(f"CREATE ROLE {role}")
|
||||
|
||||
stale_metadata_id = hashlib.sha256(b"").digest()[:16]
|
||||
|
||||
async with inject_error(
|
||||
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
|
||||
):
|
||||
async with inject_error(
|
||||
manager.api, server.ip_addr, "skip_rows_metadata_changed_response"
|
||||
):
|
||||
result = await asyncio.to_thread(
|
||||
_prepare_and_execute,
|
||||
server.ip_addr,
|
||||
f"LIST ROLES OF {role}",
|
||||
stale_metadata_id,
|
||||
)
|
||||
|
||||
assert not result.metadata_changed, (
|
||||
f"expected injected EXECUTE for LIST ROLES OF {role} to suppress "
|
||||
f"METADATA_CHANGED, but the flag was set"
|
||||
)
|
||||
assert result.no_metadata, (
|
||||
f"expected injected EXECUTE for LIST ROLES OF {role} to keep the "
|
||||
f"stale NO_METADATA path, but no_metadata flag was not set"
|
||||
)
|
||||
@@ -257,44 +257,39 @@ async def manager(request: pytest.FixtureRequest,
|
||||
yield manager_client
|
||||
# `request.node.stash` contains a report stored in `pytest_runtest_makereport` from where we can retrieve
|
||||
# test failure.
|
||||
cluster_status = None
|
||||
found_errors = {}
|
||||
failed = False
|
||||
report = request.node.stash[PHASE_REPORT_KEY]
|
||||
failed = report.when == "call" and report.failed
|
||||
|
||||
# Check if the test has the check_nodes_for_errors marker
|
||||
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
|
||||
|
||||
failed_test_dir_path = None
|
||||
try:
|
||||
report = request.node.stash[PHASE_REPORT_KEY]
|
||||
failed = report.when == "call" and report.failed
|
||||
if failed or found_errors:
|
||||
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
|
||||
# all related logs in one dir.
|
||||
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
|
||||
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
|
||||
str.maketrans('[]', '()'))
|
||||
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Check if the test has the check_nodes_for_errors marker
|
||||
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
|
||||
if failed:
|
||||
await manager_client.gather_related_logs(
|
||||
failed_test_dir_path,
|
||||
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
|
||||
)
|
||||
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
|
||||
f.write(report.longreprtext)
|
||||
if request.config.getoption('artifacts_dir_url') is not None:
|
||||
# get the relative path to the tmpdir for the failed directory
|
||||
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
|
||||
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
|
||||
urllib.parse.quote(dir_path_relative))
|
||||
record_property("TEST_LOGS", full_url)
|
||||
|
||||
if failed or found_errors:
|
||||
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
|
||||
# all related logs in one dir.
|
||||
# Then add property to the XML report with the path to the directory, so it can be visible in Jenkins
|
||||
failed_test_dir_path = testpy_test.suite.log_dir / "failed_test" / test_case_name.translate(
|
||||
str.maketrans('[]', '()'))
|
||||
failed_test_dir_path.mkdir(parents=True, exist_ok=True)
|
||||
cluster_status = await manager_client.after_test(test_case_name, not failed)
|
||||
await manager_client.stop() # Stop client session and close driver after each test
|
||||
|
||||
if failed:
|
||||
await manager_client.gather_related_logs(
|
||||
failed_test_dir_path,
|
||||
{'pytest.log': test_log, 'test_py.log': test_py_log_test}
|
||||
)
|
||||
with open(failed_test_dir_path / "stacktrace.txt", "w") as f:
|
||||
f.write(report.longreprtext)
|
||||
if request.config.getoption('artifacts_dir_url') is not None:
|
||||
# get the relative path to the tmpdir for the failed directory
|
||||
dir_path_relative = f"{failed_test_dir_path.as_posix()[failed_test_dir_path.as_posix().find('testlog'):]}"
|
||||
full_url = urllib.parse.urljoin(request.config.getoption('artifacts_dir_url') + '/',
|
||||
urllib.parse.quote(dir_path_relative))
|
||||
record_property("TEST_LOGS", full_url)
|
||||
|
||||
cluster_status = await manager_client.after_test(test_case_name, not failed)
|
||||
finally:
|
||||
await manager_client.stop() # Stop client session and close driver after each test
|
||||
|
||||
if cluster_status is not None and cluster_status["server_broken"] and not failed:
|
||||
if cluster_status["server_broken"] and not failed:
|
||||
failed = True
|
||||
pytest.fail(
|
||||
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"
|
||||
|
||||
@@ -10,7 +10,6 @@ import random
|
||||
import string
|
||||
import tempfile
|
||||
import time
|
||||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from pprint import pformat
|
||||
|
||||
@@ -274,30 +273,6 @@ class TesterAlternator(BaseAlternator):
|
||||
logger.info("Testing and validating an update query using key condition expression")
|
||||
logger.info(f"ConditionExpression update of short circuit is: {conditional_update_short_circuit}")
|
||||
dc2_table.update_item(**conditional_update_short_circuit)
|
||||
# Wait for cross-DC replication to reach both live DC1 nodes
|
||||
# before stopping dc2_node. The LWT commit uses LOCAL_QUORUM,
|
||||
# which only guarantees DC2 persistence; replication to DC1 is
|
||||
# async background work. Without this wait, stopping dc2_node
|
||||
# can drop in-flight RPCs to DC1 while CAS mutations don't
|
||||
# store hints. We must confirm both live DC1 replicas have the
|
||||
# data so that the later ConsistentRead=True (LOCAL_QUORUM)
|
||||
# read on restarted node1 is guaranteed to succeed.
|
||||
# See https://scylladb.atlassian.net/browse/SCYLLADB-1267
|
||||
dc1_live_nodes = [
|
||||
node for node in self.cluster.nodelist()
|
||||
if node.data_center == node1.data_center and node.server_id != node1.server_id
|
||||
]
|
||||
dc1_live_tables = [self.get_table(table_name=TABLE_NAME, node=node) for node in dc1_live_nodes]
|
||||
wait_for(
|
||||
lambda: all(
|
||||
t.get_item(
|
||||
Key={self._table_primary_key: new_pk_val}, ConsistentRead=False
|
||||
).get("Item", {}).get("c") == 3
|
||||
for t in dc1_live_tables
|
||||
),
|
||||
timeout=60,
|
||||
text="Waiting for cross-DC replication of conditional update to both live DC1 nodes",
|
||||
)
|
||||
dc2_node.stop()
|
||||
node1.start()
|
||||
|
||||
@@ -506,33 +481,28 @@ class TesterAlternator(BaseAlternator):
|
||||
2) Issue Alternator 'heavy' requests concurrently (create-table)
|
||||
3) wait for RequestLimitExceeded error response.
|
||||
"""
|
||||
# Keep the limit low to avoid exhausting LSA memory on the 1GB test node
|
||||
# when multiple CreateTable requests (Raft + schema + flush) run concurrently.
|
||||
concurrent_requests_limit = 3
|
||||
concurrent_requests_limit = 5
|
||||
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
|
||||
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
|
||||
node1 = self.cluster.nodelist()[0]
|
||||
stop_workers = threading.Event()
|
||||
create_tables_threads = []
|
||||
for tables_num in range(concurrent_requests_limit * 5):
|
||||
create_tables_threads.append(self.run_create_table_thread())
|
||||
|
||||
def run_create_table_until_limited() -> None:
|
||||
while not stop_workers.is_set():
|
||||
try:
|
||||
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
|
||||
except Exception as error: # noqa: BLE001
|
||||
if "RequestLimitExceeded" in str(error):
|
||||
stop_workers.set()
|
||||
return
|
||||
raise
|
||||
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
|
||||
def wait_for_create_table_request_failure():
|
||||
try:
|
||||
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
|
||||
except Exception as error:
|
||||
if "RequestLimitExceeded" in error.args[0]:
|
||||
return
|
||||
raise
|
||||
raise ConcurrencyLimitNotExceededError
|
||||
|
||||
with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor:
|
||||
create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)]
|
||||
wait_for_create_table_request_failure()
|
||||
|
||||
if not stop_workers.wait(timeout=30):
|
||||
raise ConcurrencyLimitNotExceededError
|
||||
|
||||
stop_workers.set()
|
||||
for future in create_table_futures:
|
||||
future.result(timeout=60)
|
||||
for thread in create_tables_threads:
|
||||
thread.join()
|
||||
|
||||
@staticmethod
|
||||
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):
|
||||
|
||||
@@ -1182,9 +1182,9 @@ class TestAuth(Tester):
|
||||
def get_session(self, node_idx=0, user=None, password=None, exclusive=True):
|
||||
node = self.cluster.nodelist()[node_idx]
|
||||
if exclusive:
|
||||
conn = self.patient_exclusive_cql_connection(node, user=user, password=password)
|
||||
conn = self.patient_exclusive_cql_connection(node, user=user, password=password, timeout=0.1)
|
||||
else:
|
||||
conn = self.patient_cql_connection(node, user=user, password=password)
|
||||
conn = self.patient_cql_connection(node, user=user, password=password, timeout=0.1)
|
||||
return conn
|
||||
|
||||
def assert_permissions_listed(self, expected, session, query, include_superuser=False):
|
||||
|
||||
@@ -199,7 +199,7 @@ class GSServer(GSFront):
|
||||
|
||||
def unpublish(self):
|
||||
for k in self.vars:
|
||||
v = self.oldvars.get(k)
|
||||
v = self.oldvars[k]
|
||||
if v:
|
||||
os.environ[k] = v
|
||||
elif os.environ.get(k):
|
||||
|
||||
@@ -215,11 +215,6 @@ async def test_node_ops_tasks_tree(manager: ManagerClient):
|
||||
servers, vt_ids = await check_remove_node_tasks_tree(manager, tm, module_name, servers, vt_ids)
|
||||
servers, vt_ids = await check_decommission_tasks_tree(manager, tm, module_name, servers, vt_ids)
|
||||
|
||||
# Reconnect the driver after topology changes (replace, removenode,
|
||||
# decommission) so that the new_test_keyspace cleanup can reach a
|
||||
# live node for DROP KEYSPACE.
|
||||
await manager.driver_connect()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_node_ops_tasks_ttl(manager: ManagerClient):
|
||||
"""Test node ops virtual tasks' ttl."""
|
||||
|
||||
@@ -17,7 +17,6 @@ import socket
|
||||
import socketserver
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from collections import namedtuple
|
||||
from contextlib import contextmanager
|
||||
@@ -34,9 +33,9 @@ from test.cluster.dtest.dtest_class import create_ks, wait_for
|
||||
from test.cluster.dtest.tools.assertions import assert_invalid
|
||||
from test.cluster.dtest.tools.data import rows_to_list, run_in_parallel
|
||||
|
||||
from test.cluster.test_config import wait_for_config
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.util import wait_for as wait_for_async
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -114,10 +113,11 @@ class AuditTester:
|
||||
for k in AUTH_CONFIG:
|
||||
await self.manager.server_remove_config_option(srv.server_id, k)
|
||||
|
||||
# Remove absent keys so the server reverts to compiled-in defaults.
|
||||
for k in absent_keys:
|
||||
await self.manager.server_remove_config_option(srv.server_id, k)
|
||||
|
||||
if needs_restart:
|
||||
# Remove absent keys so the server reverts to compiled-in defaults.
|
||||
for k in absent_keys:
|
||||
await self.manager.server_remove_config_option(srv.server_id, k)
|
||||
await self.manager.server_stop_gracefully(srv.server_id)
|
||||
full_cfg = self._build_server_config(needed, enable_compact_storage, user)
|
||||
await self.manager.server_update_config(srv.server_id, config_options=full_cfg)
|
||||
@@ -127,17 +127,10 @@ class AuditTester:
|
||||
# Server stays up — only push live-updatable keys.
|
||||
live_cfg = {k: v for k, v in needed.items() if k in LIVE_AUDIT_KEYS}
|
||||
live_cfg["enable_create_table_with_compact_storage"] = enable_compact_storage
|
||||
log_file = await self.manager.server_open_log(srv.server_id)
|
||||
# Each remove/update sends a SIGHUP. Wait for each one's
|
||||
# "completed re-reading configuration file" before the next
|
||||
# so we never match a stale message.
|
||||
for k in absent_keys:
|
||||
from_mark = await log_file.mark()
|
||||
await self.manager.server_remove_config_option(srv.server_id, k)
|
||||
await log_file.wait_for(r"completed re-reading configuration file", from_mark=from_mark, timeout=60)
|
||||
from_mark = await log_file.mark()
|
||||
await self.manager.server_update_config(srv.server_id, config_options=live_cfg)
|
||||
await log_file.wait_for(r"completed re-reading configuration file", from_mark=from_mark, timeout=60)
|
||||
for key in LIVE_AUDIT_KEYS:
|
||||
if key in live_cfg:
|
||||
await wait_for_config(self.manager, srv, key, live_cfg[key])
|
||||
|
||||
async def _start_fresh_servers(self, needed: dict[str, str],
|
||||
enable_compact_storage: bool,
|
||||
@@ -352,7 +345,7 @@ class UnixSockerListener:
|
||||
elif data != "Initializing syslog audit backend.":
|
||||
self.server.parent_instance.lines.append(data)
|
||||
|
||||
class UnixDatagramServer(socketserver.UnixDatagramServer):
|
||||
class UnixDatagramServer(socketserver.ThreadingUnixDatagramServer):
|
||||
def __init__(self, socket_path, handler, parent_instance, lock):
|
||||
self.parent_instance = parent_instance
|
||||
self.mutex = lock
|
||||
@@ -1349,13 +1342,7 @@ class CQLAuditTester(AuditTester):
|
||||
conn = await self.manager.get_cql_exclusive(srv)
|
||||
stmt = SimpleStatement("INSERT INTO ks.test1 (k, v1) VALUES (1000, 1000)", consistency_level=ConsistencyLevel.THREE)
|
||||
conn.execute(stmt)
|
||||
# The audit log entry may not be visible immediately after the
|
||||
# insert, so retry with exponential backoff until it appears.
|
||||
audit_node_ips = await wait_for_async(
|
||||
lambda: self.get_audit_partitions_for_operation(session, stmt.query_string),
|
||||
deadline=time.time() + 10,
|
||||
period=0.05,
|
||||
label=f"audit entry for node {index}")
|
||||
audit_node_ips = await self.get_audit_partitions_for_operation(session, stmt.query_string)
|
||||
node_to_audit_nodes[index] = set(audit_node_ips)
|
||||
|
||||
all_addresses = set(srv.ip_addr for srv in servers)
|
||||
|
||||
@@ -1,70 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
import logging
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from test.cluster.util import get_current_group0_config
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import read_barrier
|
||||
from test.pylib.util import wait_for
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_bootstrap_with_quick_group0_join(manager: ManagerClient):
|
||||
"""Regression test for https://scylladb.atlassian.net/browse/SCYLLADB-959.
|
||||
|
||||
The bug was that when the bootstrapping node joined group0 before reaching
|
||||
post_server_start, it skipped post_server_start and thus hung forever.
|
||||
|
||||
The test simulates the scenario by starting the second node with the
|
||||
join_group0_pause_before_config_check injection. Without the fix, the
|
||||
startup times out.
|
||||
"""
|
||||
logger.info("Adding first server")
|
||||
s1 = await manager.server_add()
|
||||
|
||||
logger.info("Adding second server with join_group0_pause_before_config_check enabled")
|
||||
s2 = await manager.server_add(start=False, config={
|
||||
'error_injections_at_startup': ['join_group0_pause_before_config_check']
|
||||
})
|
||||
|
||||
logger.info(f"Starting {s2}")
|
||||
start_task = asyncio.create_task(manager.server_start(s2.server_id))
|
||||
|
||||
s2_log = await manager.server_open_log(s2.server_id)
|
||||
|
||||
await s2_log.wait_for("join_group0_pause_before_config_check: waiting for message", timeout=60)
|
||||
|
||||
s1_host_id = await manager.get_host_id(s1.server_id)
|
||||
s2_host_id = await manager.get_host_id(s2.server_id)
|
||||
|
||||
async def s2_in_group0_config_on_s1():
|
||||
config = await get_current_group0_config(manager, s1)
|
||||
ids = {m[0] for m in config}
|
||||
assert s1_host_id in ids # sanity check
|
||||
return True if s2_host_id in ids else None
|
||||
|
||||
# Note: we would like to wait for s2 to see itself in the group0 config, but we can't execute
|
||||
# get_current_group0_config for s2, as s2 doesn't handle CQL requests at this point. As a workaround, we wait for s1
|
||||
# to see s2 and then perform a read barrier on s2.
|
||||
logger.info(f"Waiting for {s1} to see {s2} in the group0 config")
|
||||
await wait_for(s2_in_group0_config_on_s1, deadline=time.time() + 60, period=0.1)
|
||||
|
||||
logger.info(f"Performing read barrier on {s2} to make sure it sees itself in the group0 config")
|
||||
await read_barrier(manager.api, s2.ip_addr)
|
||||
|
||||
logger.info(f"Unblocking {s2}")
|
||||
await manager.api.message_injection(s2.ip_addr, 'join_group0_pause_before_config_check')
|
||||
|
||||
logger.info(f"Waiting for {s2} to complete bootstrap")
|
||||
await asyncio.wait_for(start_task, timeout=60)
|
||||
@@ -100,7 +100,7 @@ async def test_limited_concurrency_of_writes(manager: ManagerClient):
|
||||
})
|
||||
node2 = await manager.server_add()
|
||||
|
||||
cql = await manager.get_cql_exclusive(node1)
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}") as ks:
|
||||
table = f"{ks}.t"
|
||||
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
|
||||
|
||||
@@ -11,6 +11,7 @@ from test.cluster.util import check_token_ring_and_group0_consistency, new_test_
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@pytest.mark.asyncio
|
||||
@@ -52,7 +53,7 @@ async def test_cleanup_stop(manager: ManagerClient):
|
||||
await s0_log.wait_for('sstable_cleanup_wait: waiting', from_mark=s0_mark)
|
||||
|
||||
stop_cleanup = asyncio.create_task(manager.api.stop_compaction(servers[0].ip_addr, "CLEANUP"))
|
||||
await asyncio.sleep(1)
|
||||
time.sleep(1)
|
||||
|
||||
await manager.api.message_injection(servers[0].ip_addr, "sstable_cleanup_wait")
|
||||
await stop_cleanup
|
||||
|
||||
@@ -2279,7 +2279,7 @@ async def test_split_stopped_on_shutdown(manager: ManagerClient):
|
||||
|
||||
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
|
||||
|
||||
await log.wait_for('Stopping.*ongoing compactions', from_mark=log_mark)
|
||||
await log.wait_for('Stopping.*ongoing compactions')
|
||||
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
||||
|
||||
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
@@ -60,14 +59,7 @@ async def server_address(request, testpy_test: None|Test):
|
||||
ip = await testpy_test.suite.hosts.lease_host()
|
||||
else:
|
||||
ip = f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
|
||||
# Ask the OS to pick a free port by binding to port 0. This avoids
|
||||
# collisions with ports still in TIME_WAIT from a previous test module
|
||||
# that used the same IP. SO_REUSEADDR is set on the probe socket so it
|
||||
# can reclaim a TIME_WAIT port itself
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
s.bind((ip, 0))
|
||||
port = s.getsockname()[1]
|
||||
port = random.randint(10000, 65535)
|
||||
yield ServerAddress(ip, port)
|
||||
if testpy_test is not None:
|
||||
await testpy_test.suite.hosts.release_host(ip)
|
||||
|
||||
@@ -257,7 +257,7 @@ async def run_server(ip, port):
|
||||
|
||||
runner = aiohttp.web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = aiohttp.web.TCPSite(runner, ip, port, reuse_address=True, reuse_port=True)
|
||||
site = aiohttp.web.TCPSite(runner, ip, port)
|
||||
await site.start()
|
||||
|
||||
try:
|
||||
|
||||
@@ -4,18 +4,14 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
import itertools
|
||||
import asyncio
|
||||
import pathlib
|
||||
import re
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Callable
|
||||
|
||||
logger = logging.getLogger("DockerizedServer")
|
||||
|
||||
class DockerizedServer:
|
||||
"""class for running an external dockerized service image, typically mock server"""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
@@ -41,7 +37,6 @@ class DockerizedServer:
|
||||
self.port = None
|
||||
self.proc = None
|
||||
self.service_port = port
|
||||
self.echo_thread = None
|
||||
|
||||
async def start(self):
|
||||
"""Starts docker image on a random port"""
|
||||
@@ -50,107 +45,77 @@ class DockerizedServer:
|
||||
if exe is not None)).resolve()
|
||||
sid = f"{os.getpid()}-{DockerizedServer.newid()}"
|
||||
name = f'{self.logfilenamebase}-{sid}'
|
||||
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
|
||||
self.logfile = logfilename.open("wb")
|
||||
|
||||
docker_args = self.docker_args(self.host, self.service_port)
|
||||
image_args = self.image_args(self.host, self.service_port)
|
||||
|
||||
args = [exe, "run", "--name", name, "--rm" ]
|
||||
if self.service_port is None:
|
||||
args = args + ["-P"]
|
||||
else:
|
||||
args = args + ["-p", str(self.service_port)]
|
||||
|
||||
args = args + docker_args + [self.image] + image_args
|
||||
|
||||
# This seems weird, using the blocking IO subprocess.
|
||||
# However, we want to use a pipe reader so we can push the
|
||||
# output into the test log (because we are bad at propagating
|
||||
# log files etc from CI)
|
||||
# But the pipe reader needs to read until EOF, otherwise the
|
||||
# docker process will eventually hang. So we can't await a
|
||||
# coroutine.
|
||||
# We _can_, sort of, use pool.create_task(...) to send a coro
|
||||
# to the background, and use a signal for waiting, like here,
|
||||
# thus ensuring the coro runs forever, sort of... However,
|
||||
# this currently breaks, probably due to some part of the
|
||||
# machinery/tests that don't async fully, causing us to not
|
||||
# process the log, and thus hand/fail, bla bla.
|
||||
# The solution is to make the process synced, and use a
|
||||
# background thread (execution pool) for the processing.
|
||||
# This way we know the pipe reader will not suddenly get
|
||||
# blocked at inconvinient times.
|
||||
proc = subprocess.Popen(args, stderr=subprocess.PIPE)
|
||||
loop = asyncio.get_running_loop()
|
||||
ready_fut = loop.create_future()
|
||||
|
||||
def process_io():
|
||||
f = ready_fut
|
||||
try:
|
||||
while True:
|
||||
data = proc.stderr.readline()
|
||||
if not data:
|
||||
if f:
|
||||
loop.call_soon_threadsafe(f.set_exception, RuntimeError("Log EOF"))
|
||||
logger.debug("EOF received")
|
||||
break
|
||||
line = data.decode()
|
||||
self.logfile.write(data)
|
||||
logger.debug(line)
|
||||
if f and self.is_success_line(line, self.service_port):
|
||||
logger.info('Got start message: %s', line)
|
||||
loop.call_soon_threadsafe(f.set_result, True)
|
||||
f = None
|
||||
if f and self.is_failure_line(line, self.service_port):
|
||||
logger.info('Got fail message: %s', line)
|
||||
loop.call_soon_threadsafe(f.set_result, False)
|
||||
f = None
|
||||
except Exception as e:
|
||||
logger.error("Exception in log processing: %s", e)
|
||||
if f:
|
||||
loop.call_soon_threadsafe(f.set_exception, e)
|
||||
|
||||
self.echo_thread = loop.run_in_executor(None, process_io)
|
||||
ok = await ready_fut
|
||||
if not ok:
|
||||
self.logfile.close()
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
raise RuntimeError("Could not parse expected launch message from container")
|
||||
|
||||
check_proc = await asyncio.create_subprocess_exec(exe
|
||||
, *["container", "port", name]
|
||||
, stdout=asyncio.subprocess.PIPE
|
||||
)
|
||||
while True:
|
||||
data = await check_proc.stdout.readline()
|
||||
if not data:
|
||||
break
|
||||
s = data.decode()
|
||||
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
|
||||
if m:
|
||||
self.port = int(m.group(1))
|
||||
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
|
||||
self.logfile = logfilename.open("wb")
|
||||
|
||||
await check_proc.wait()
|
||||
if not self.port:
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
raise RuntimeError("Could not query port from container")
|
||||
self.proc = proc
|
||||
docker_args = self.docker_args(self.host, self.service_port)
|
||||
image_args = self.image_args(self.host, self.service_port)
|
||||
|
||||
args = ["run", "--name", name, "--rm" ]
|
||||
if self.service_port is None:
|
||||
args = args + ["-P"]
|
||||
else:
|
||||
args = args + ["-p", str(self.service_port)]
|
||||
|
||||
args = args + docker_args + [self.image] + image_args
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(exe, *args, stderr=self.logfile)
|
||||
failed = False
|
||||
|
||||
# In any sane world we would just pipe stderr to a pipe and launch a background
|
||||
# task to just readline from there to both check the start message as well as
|
||||
# add it to the log (preferrably via logger).
|
||||
# This works fine when doing this in a standalone python script.
|
||||
# However, for some reason, when run in a pytest fixture, the pipe will fill up,
|
||||
# without or reader waking up and doing anyhing, and for any test longer than very
|
||||
# short, we will fill the stderr buffer and hang.
|
||||
# I cannot figure out how to get around this, so we workaround it
|
||||
# instead by directing stderr to a log file, and simply repeatedly
|
||||
# try to read the info from this file until we are happy.
|
||||
async with asyncio.timeout(120):
|
||||
done = False
|
||||
while not done and not failed:
|
||||
with logfilename.open("r") as f:
|
||||
for line in f:
|
||||
if self.is_success_line(line, self.service_port):
|
||||
print(f'Got start message: {line}')
|
||||
done = True
|
||||
break
|
||||
if self.is_failure_line(line, self.service_port):
|
||||
print(f'Got fail message: {line}')
|
||||
failed = True
|
||||
break
|
||||
|
||||
if failed:
|
||||
self.logfile.close()
|
||||
await proc.wait()
|
||||
continue
|
||||
|
||||
check_proc = await asyncio.create_subprocess_exec(exe
|
||||
, *["container", "port", name]
|
||||
, stdout=asyncio.subprocess.PIPE
|
||||
)
|
||||
while True:
|
||||
data = await check_proc.stdout.readline()
|
||||
if not data:
|
||||
break
|
||||
s = data.decode()
|
||||
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
|
||||
if m:
|
||||
self.port = int(m.group(1))
|
||||
|
||||
await check_proc.wait()
|
||||
if not self.port:
|
||||
proc.kill()
|
||||
raise RuntimeError("Could not query port from container")
|
||||
self.proc = proc
|
||||
break
|
||||
|
||||
async def stop(self):
|
||||
"""Stops docker image"""
|
||||
if self.proc:
|
||||
logger.debug("Stopping docker process")
|
||||
self.proc.terminate()
|
||||
self.proc.wait()
|
||||
self.proc = None
|
||||
if self.echo_thread:
|
||||
logger.debug("Waiting for IO thread")
|
||||
await self.echo_thread
|
||||
self.echo_thread = None
|
||||
await self.proc.wait()
|
||||
if self.logfile:
|
||||
logger.debug("Closing log file")
|
||||
self.logfile.close()
|
||||
self.logfile = None
|
||||
|
||||
@@ -747,8 +747,6 @@ class ScyllaServer:
|
||||
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
|
||||
self.notify_socket.bind(str(self.notify_socket_path))
|
||||
self._received_serving = False
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def poll_status(s: socket.socket, f: asyncio.Future, logger: Union[logging.Logger, logging.LoggerAdapter]):
|
||||
# Try to read all available messages from the socket
|
||||
while True:
|
||||
@@ -758,7 +756,7 @@ class ScyllaServer:
|
||||
message = data.decode('utf-8', errors='replace')
|
||||
if 'STATUS=serving' in message:
|
||||
logger.debug("Received sd_notify 'serving' message")
|
||||
loop.call_soon_threadsafe(f.set_result, True)
|
||||
f.set_result(True)
|
||||
return
|
||||
if 'STATUS=entering maintenance mode' in message:
|
||||
logger.debug("Receive sd_notify 'entering maintenance mode'")
|
||||
@@ -768,9 +766,9 @@ class ScyllaServer:
|
||||
except Exception as e:
|
||||
logger.debug("Error reading from notify socket: %s", e)
|
||||
break
|
||||
loop.call_soon_threadsafe(f.set_result, False)
|
||||
f.set_result(False)
|
||||
|
||||
self.serving_signal = loop.create_future()
|
||||
self.serving_signal = asyncio.get_running_loop().create_future()
|
||||
t = threading.Thread(target=poll_status, args=[self.notify_socket, self.serving_signal, self.logger], daemon=True)
|
||||
t.start()
|
||||
|
||||
@@ -894,6 +892,7 @@ class ScyllaServer:
|
||||
return
|
||||
await report_error("the node startup failed, but the log file doesn't contain the expected error")
|
||||
await report_error("failed to start the node")
|
||||
self.logger.info("Wait me %s expect %s is %s", self.server_id, expected_server_up_state, server_up_state)
|
||||
if await self.try_get_host_id(api):
|
||||
if server_up_state == ServerUpState.PROCESS_STARTED:
|
||||
server_up_state = ServerUpState.HOST_ID_QUERIED
|
||||
|
||||
@@ -64,25 +64,11 @@ async def wait_for(
|
||||
tag = label or getattr(pred, '__name__', 'unlabeled')
|
||||
start = time.time()
|
||||
retries = 0
|
||||
last_exception: Exception | None = None
|
||||
while True:
|
||||
elapsed = time.time() - start
|
||||
if time.time() >= deadline:
|
||||
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
|
||||
if last_exception is not None:
|
||||
timeout_msg += (
|
||||
f"; last exception: {type(last_exception).__name__}: {last_exception}"
|
||||
)
|
||||
raise AssertionError(timeout_msg) from last_exception
|
||||
raise AssertionError(timeout_msg)
|
||||
|
||||
try:
|
||||
res = await pred()
|
||||
last_exception = None
|
||||
except Exception as exc:
|
||||
res = None
|
||||
last_exception = exc
|
||||
|
||||
assert time.time() < deadline, \
|
||||
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
|
||||
res = await pred()
|
||||
if res is not None:
|
||||
if retries > 0:
|
||||
logger.debug(f"wait_for({tag}) completed "
|
||||
|
||||
@@ -200,13 +200,13 @@ trace_keyspace_helper::trace_keyspace_helper(tracing& tr)
|
||||
_metrics.add_group("tracing_keyspace_helper", {
|
||||
sm::make_counter("tracing_errors", [this] { return _stats.tracing_errors; },
|
||||
sm::description("Counts a number of errors during writing to a system_traces keyspace. "
|
||||
"One error may cause one or more tracing records to be lost.")).set_skip_when_empty(),
|
||||
"One error may cause one or more tracing records to be lost.")),
|
||||
|
||||
sm::make_counter("bad_column_family_errors", [this] { return _stats.bad_column_family_errors; },
|
||||
sm::description("Counts a number of times write failed due to one of the tables in the system_traces keyspace has an incompatible schema. "
|
||||
"One error may result one or more tracing records to be lost. "
|
||||
"Non-zero value indicates that the administrator has to take immediate steps to fix the corresponding schema. "
|
||||
"The appropriate error message will be printed in the syslog.")).set_skip_when_empty(),
|
||||
"The appropriate error message will be printed in the syslog.")),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -39,17 +39,17 @@ tracing::tracing(sstring tracing_backend_helper_class_name)
|
||||
_metrics.add_group("tracing", {
|
||||
sm::make_counter("dropped_sessions", stats.dropped_sessions,
|
||||
sm::description("Counts a number of dropped sessions due to too many pending sessions/records. "
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
|
||||
|
||||
sm::make_counter("dropped_records", stats.dropped_records,
|
||||
sm::description("Counts a number of dropped records due to too many pending records. "
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")).set_skip_when_empty(),
|
||||
"High value indicates that backend is saturated with the rate with which new tracing records are created.")),
|
||||
|
||||
sm::make_counter("trace_records_count", stats.trace_records_count,
|
||||
sm::description("This metric is a rate of tracing records generation.")),
|
||||
|
||||
sm::make_counter("trace_errors", stats.trace_errors,
|
||||
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")).set_skip_when_empty(),
|
||||
sm::description("Counts a number of trace records dropped due to an error (e.g. OOM).")),
|
||||
|
||||
sm::make_gauge("active_sessions", _active_sessions,
|
||||
sm::description("Holds a number of a currently active tracing sessions.")),
|
||||
|
||||
@@ -69,6 +69,7 @@
|
||||
#include "message/messaging_service.hh"
|
||||
#include "idl/forward_cql.dist.hh"
|
||||
#include "utils/bit_cast.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "utils/labels.hh"
|
||||
#include "utils/result.hh"
|
||||
#include "utils/reusable_buffer.hh"
|
||||
@@ -1633,13 +1634,26 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
|
||||
}
|
||||
|
||||
tracing::trace(trace_state, "Processing a statement");
|
||||
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
|
||||
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
|
||||
auto cache_key_for_metadata = cache_key;
|
||||
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, prepared, std::move(cache_key), needs_authorization)
|
||||
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id), &qp, cache_key = std::move(cache_key_for_metadata), prepared = std::move(prepared)] (auto msg) mutable {
|
||||
if (msg->move_to_shard()) {
|
||||
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce>(msg)));
|
||||
} else if (msg->is_exception()) {
|
||||
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
|
||||
} else {
|
||||
if (prepared->result_metadata_is_empty()
|
||||
&& metadata_id.has_request_metadata_id()
|
||||
&& !utils::get_local_injector().enter("skip_prepared_result_metadata_promotion")) {
|
||||
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
|
||||
auto rows_metadata_id = rows->rs().get_metadata().calculate_metadata_id();
|
||||
clogger.debug("prepared result metadata promotion: request_metadata_id_present={}, calculated_rows_metadata_id_size={}",
|
||||
metadata_id.has_request_metadata_id(), rows_metadata_id._metadata_id.size());
|
||||
qp.local().update_prepared_result_metadata_id(cache_key, rows_metadata_id);
|
||||
auto request_metadata_id = metadata_id.get_request_metadata_id();
|
||||
metadata_id = cql_metadata_id_wrapper(std::move(request_metadata_id), std::move(rows_metadata_id));
|
||||
}
|
||||
}
|
||||
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
|
||||
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
|
||||
}
|
||||
@@ -2507,9 +2521,16 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
|
||||
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
|
||||
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
|
||||
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
|
||||
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
||||
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
|
||||
no_metadata = false;
|
||||
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
|
||||
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
|
||||
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
|
||||
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
|
||||
no_metadata, skip_rows_metadata_changed_response);
|
||||
if (!skip_rows_metadata_changed_response) {
|
||||
flags.remove<cql3::metadata::flag::NO_METADATA>();
|
||||
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
|
||||
no_metadata = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user