mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-27 20:05:10 +00:00
Compare commits
29 Commits
scylla-202
...
next-2025.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b20bd887b | ||
|
|
6c51548fd6 | ||
|
|
e114fc8175 | ||
|
|
dfd2507f0d | ||
|
|
04e33d69da | ||
|
|
5468cd49da | ||
|
|
52a3ed4312 | ||
|
|
816526842e | ||
|
|
73d7f9c1fe | ||
|
|
116a2f43ee | ||
|
|
5d1a8b91cd | ||
|
|
c9ee67c85c | ||
|
|
04d8663052 | ||
|
|
72cd145990 | ||
|
|
41e2c2d1c4 | ||
|
|
2d1fdce790 | ||
|
|
1e0487bd57 | ||
|
|
18fc2eff31 | ||
|
|
340369a4d4 | ||
|
|
9041f70f34 | ||
|
|
7ed772866e | ||
|
|
7b97fe4a92 | ||
|
|
ba3b7360e0 | ||
|
|
06e9ecab9b | ||
|
|
c6d356e7cc | ||
|
|
30c2f03749 | ||
|
|
17075bf3f9 | ||
|
|
f5111bfc9b | ||
|
|
6b5de6394b |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=2025.4.7
|
||||
VERSION=2025.4.8
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -3331,7 +3331,11 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
if (should_add_wcu) {
|
||||
rjson::add(ret, "ConsumedCapacity", std::move(consumed_capacity));
|
||||
}
|
||||
_stats.api_operations.batch_write_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_write_item_latency.mark(duration);
|
||||
for (const auto& w : per_table_wcu) {
|
||||
w.first->api_operations.batch_write_item_latency.mark(duration);
|
||||
}
|
||||
co_return rjson::print(std::move(ret));
|
||||
}
|
||||
|
||||
@@ -4815,7 +4819,12 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
|
||||
if (!some_succeeded && eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::move(eptr));
|
||||
}
|
||||
_stats.api_operations.batch_get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
|
||||
auto duration = std::chrono::steady_clock::now() - start_time;
|
||||
_stats.api_operations.batch_get_item_latency.mark(duration);
|
||||
for (const table_requests& rs : requests) {
|
||||
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
|
||||
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
|
||||
}
|
||||
if (is_big(response)) {
|
||||
co_return make_streamed(std::move(response));
|
||||
} else {
|
||||
|
||||
@@ -1119,7 +1119,10 @@ void compaction_manager::enable() {
|
||||
|
||||
_compaction_submission_timer.cancel();
|
||||
_compaction_submission_timer.arm_periodic(periodic_compaction_submission_interval());
|
||||
_waiting_reevalution = postponed_compactions_reevaluation();
|
||||
if (_waiting_reevaluation) {
|
||||
on_internal_error(cmlog, "postponed compactions reevaluation is already running when enabling compaction manager");
|
||||
}
|
||||
_waiting_reevaluation.emplace(postponed_compactions_reevaluation());
|
||||
cmlog.info("Enabled");
|
||||
}
|
||||
|
||||
@@ -1167,6 +1170,16 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept {
|
||||
_postponed_reevaluation.signal();
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_postponed_compactions() noexcept {
|
||||
auto waiting_reevaluation = std::exchange(_waiting_reevaluation, std::nullopt);
|
||||
if (!waiting_reevaluation) {
|
||||
return make_ready_future();
|
||||
}
|
||||
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
|
||||
reevaluate_postponed_compactions();
|
||||
return std::move(*waiting_reevaluation);
|
||||
}
|
||||
|
||||
void compaction_manager::postpone_compaction_for_table(compaction_group_view* t) {
|
||||
_postponed.insert(t);
|
||||
}
|
||||
@@ -1250,8 +1263,7 @@ future<> compaction_manager::drain() {
|
||||
_compaction_submission_timer.cancel();
|
||||
// Stop ongoing compactions, if the request has not been sent already and wait for them to stop.
|
||||
co_await stop_ongoing_compactions("drain");
|
||||
// Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber
|
||||
reevaluate_postponed_compactions();
|
||||
co_await stop_postponed_compactions();
|
||||
cmlog.info("Drained");
|
||||
}
|
||||
|
||||
@@ -1289,8 +1301,7 @@ future<> compaction_manager::really_do_stop() noexcept {
|
||||
if (!_tasks.empty()) {
|
||||
on_fatal_internal_error(cmlog, format("{} tasks still exist after being stopped", _tasks.size()));
|
||||
}
|
||||
reevaluate_postponed_compactions();
|
||||
co_await std::move(_waiting_reevalution);
|
||||
co_await stop_postponed_compactions();
|
||||
co_await _sys_ks.close();
|
||||
_weight_tracker.clear();
|
||||
_compaction_submission_timer.cancel();
|
||||
|
||||
@@ -124,7 +124,7 @@ private:
|
||||
// a sstable from being compacted twice.
|
||||
std::unordered_set<sstables::shared_sstable> _compacting_sstables;
|
||||
|
||||
future<> _waiting_reevalution = make_ready_future<>();
|
||||
std::optional<future<>> _waiting_reevaluation;
|
||||
condition_variable _postponed_reevaluation;
|
||||
// tables that wait for compaction but had its submission postponed due to ongoing compaction.
|
||||
std::unordered_set<compaction::compaction_group_view*> _postponed;
|
||||
@@ -232,6 +232,7 @@ private:
|
||||
|
||||
future<> postponed_compactions_reevaluation();
|
||||
void reevaluate_postponed_compactions() noexcept;
|
||||
future<> stop_postponed_compactions() noexcept;
|
||||
// Postpone compaction for a table that couldn't be executed due to ongoing
|
||||
// similar-sized compaction.
|
||||
void postpone_compaction_for_table(compaction::compaction_group_view* t);
|
||||
|
||||
@@ -105,6 +105,7 @@ public:
|
||||
static const std::chrono::minutes entry_expiry;
|
||||
|
||||
using key_type = prepared_cache_key_type;
|
||||
using pinned_value_type = cache_value_ptr;
|
||||
using value_type = checked_weak_ptr;
|
||||
using statement_is_too_big = typename cache_type::entry_is_too_big;
|
||||
|
||||
@@ -116,9 +117,14 @@ public:
|
||||
: _cache(size, entry_expiry, logger)
|
||||
{}
|
||||
|
||||
template <typename LoadFunc>
|
||||
future<pinned_value_type> get_pinned(const key_type& key, LoadFunc&& load) {
|
||||
return _cache.get_ptr(key.key(), [load = std::forward<LoadFunc>(load)] (const cache_key_type&) { return load(); });
|
||||
}
|
||||
|
||||
template <typename LoadFunc>
|
||||
future<value_type> get(const key_type& key, LoadFunc&& load) {
|
||||
return _cache.get_ptr(key.key(), [load = std::forward<LoadFunc>(load)] (const cache_key_type&) { return load(); }).then([] (cache_value_ptr v_ptr) {
|
||||
return get_pinned(key, std::forward<LoadFunc>(load)).then([] (cache_value_ptr v_ptr) {
|
||||
return make_ready_future<value_type>((*v_ptr)->checked_weak_from_this());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -682,7 +682,7 @@ future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
||||
query_processor::prepare(sstring query_string, const service::client_state& client_state, cql3::dialect d) {
|
||||
try {
|
||||
auto key = compute_id(query_string, client_state.get_raw_keyspace(), d);
|
||||
auto prep_ptr = co_await _prepared_cache.get(key, [this, &query_string, &client_state, d] {
|
||||
auto prep_entry = co_await _prepared_cache.get_pinned(key, [this, &query_string, &client_state, d] {
|
||||
auto prepared = get_statement(query_string, client_state, d);
|
||||
prepared->calculate_metadata_id();
|
||||
auto bound_terms = prepared->statement->get_bound_terms();
|
||||
@@ -696,13 +696,13 @@ query_processor::prepare(sstring query_string, const service::client_state& clie
|
||||
return make_ready_future<std::unique_ptr<statements::prepared_statement>>(std::move(prepared));
|
||||
});
|
||||
|
||||
const auto& warnings = prep_ptr->warnings;
|
||||
const auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_ptr),
|
||||
co_await utils::get_local_injector().inject(
|
||||
"query_processor_prepare_wait_after_cache_get",
|
||||
utils::wait_for_message(std::chrono::seconds(60)));
|
||||
|
||||
auto msg = ::make_shared<result_message::prepared::cql>(prepared_cache_key_type::cql_id(key), std::move(prep_entry),
|
||||
client_state.is_protocol_extension_set(cql_transport::cql_protocol_extension::LWT_ADD_METADATA_MARK));
|
||||
for (const auto& w : warnings) {
|
||||
msg->add_warning(w);
|
||||
}
|
||||
co_return ::shared_ptr<cql_transport::messages::result_message::prepared>(std::move(msg));
|
||||
co_return std::move(msg);
|
||||
} catch(typename prepared_statements_cache::statement_is_too_big&) {
|
||||
throw prepared_statement_is_too_big(query_string);
|
||||
}
|
||||
|
||||
@@ -1672,7 +1672,7 @@ auto fmt::formatter<db::error_injection_at_startup>::format(const db::error_inje
|
||||
|
||||
auto fmt::formatter<db::object_storage_endpoint_param>::format(const db::object_storage_endpoint_param& e, fmt::format_context& ctx) const
|
||||
-> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{{}}", e.to_json_string());
|
||||
return fmt::format_to(ctx.out(), "object_storage_endpoint_param{}", e.to_json_string());
|
||||
}
|
||||
|
||||
namespace utils {
|
||||
|
||||
@@ -1594,9 +1594,10 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
return should_stop_updates() ? stop() : advance_existings();
|
||||
}
|
||||
|
||||
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
|
||||
if (_update && !_update->is_end_of_partition()) {
|
||||
if (_update->is_clustering_row()) {
|
||||
if (_update->is_range_tombstone_change()) {
|
||||
_update_current_tombstone = _update->as_range_tombstone_change().tombstone();
|
||||
} else if (_update->is_clustering_row()) {
|
||||
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
|
||||
cr.apply(std::max(_update_partition_tombstone, _update_current_tombstone));
|
||||
});
|
||||
|
||||
28
dist/common/scripts/scylla_swap_setup
vendored
28
dist/common/scripts/scylla_swap_setup
vendored
@@ -9,6 +9,7 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shlex
|
||||
import argparse
|
||||
import psutil
|
||||
from pathlib import Path
|
||||
@@ -103,16 +104,41 @@ if __name__ == '__main__':
|
||||
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
|
||||
swapfile.chmod(0o600)
|
||||
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
|
||||
|
||||
mount_point = find_mount_point(swap_directory)
|
||||
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
|
||||
|
||||
# Add DefaultDependencies=no to the swap unit to avoid getting the default
|
||||
# Before=swap.target dependency. We apply this to all clouds, but the
|
||||
# requirement came from Azure:
|
||||
#
|
||||
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
|
||||
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
|
||||
# the network (After=network-online.target). By extension, this means that
|
||||
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
|
||||
# then the swap unit would be part of the swap.target which other services
|
||||
# assume to be a local boot target, so we would end up with dependency cycles
|
||||
# such as:
|
||||
#
|
||||
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
|
||||
#
|
||||
# By removing the automatic Before=swap.target, the swap unit is no longer
|
||||
# part of swap.target, avoiding such cycles. The swap will still be
|
||||
# activated via WantedBy=multi-user.target.
|
||||
unit_data = '''
|
||||
[Unit]
|
||||
Description=swapfile
|
||||
DefaultDependencies=no
|
||||
After={}
|
||||
Conflicts=umount.target
|
||||
Before=umount.target
|
||||
|
||||
[Swap]
|
||||
What={}
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
'''[1:-1].format(swapfile)
|
||||
'''[1:-1].format(mount_unit, swapfile)
|
||||
with swapunit.open('w') as f:
|
||||
f.write(unit_data)
|
||||
systemd_unit.reload()
|
||||
|
||||
@@ -727,7 +727,12 @@ public:
|
||||
|
||||
// now we need one page more to be able to save one for next lap
|
||||
auto fill_size = align_up(buf1.size(), block_size) + block_size - buf1.size();
|
||||
auto buf2 = co_await _input.read_exactly(fill_size);
|
||||
// If the underlying stream is already at EOF (e.g. buf1 came from
|
||||
// cached _next while the previous read_exactly drained the source),
|
||||
// skip the read_exactly call — it would return empty anyway.
|
||||
auto buf2 = _input.eof()
|
||||
? temporary_buffer<char>()
|
||||
: co_await _input.read_exactly(fill_size);
|
||||
|
||||
temporary_buffer<char> output(buf1.size() + buf2.size());
|
||||
|
||||
|
||||
@@ -1045,7 +1045,7 @@ future<seastar::shared_ptr<encryption_context>> register_extensions(const db::co
|
||||
// Since we are in pre-init phase, this should be safe.
|
||||
co_await smp::invoke_on_all([&opts, &exts] () mutable {
|
||||
auto& f = exts.schema_extensions().at(encryption_attribute);
|
||||
for (auto& s : { db::system_keyspace::paxos(), db::system_keyspace::batchlog(), db::system_keyspace::dicts() }) {
|
||||
for (auto& s : { db::system_keyspace::paxos(), db::system_keyspace::batchlog(), db::system_keyspace::dicts(), db::system_keyspace::raft() }) {
|
||||
exts.add_extension_to_schema(s, encryption_attribute, f(*opts));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -38,7 +38,7 @@ This directory should have 700 permissions and belong to the scylla user)foo")
|
||||
R"foo(System information encryption settings
|
||||
|
||||
If enabled, system tables that may contain sensitive information (system.batchlog,
|
||||
system.paxos), hints files and commit logs are encrypted with the
|
||||
system.paxos, system.raft), hints files and commit logs are encrypted with the
|
||||
encryption settings below.
|
||||
|
||||
When enabling system table encryption on a node with existing data, run
|
||||
|
||||
@@ -437,7 +437,6 @@ void ldap_connection::poll_results() {
|
||||
const auto found = _msgid_to_promise.find(id);
|
||||
if (found == _msgid_to_promise.end()) {
|
||||
mylog.error("poll_results: got valid result for unregistered id {}, dropping it", id);
|
||||
ldap_msgfree(result);
|
||||
} else {
|
||||
found->second.set_value(std::move(result_ptr));
|
||||
_msgid_to_promise.erase(found);
|
||||
|
||||
@@ -101,7 +101,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status_help
|
||||
.entity = "",
|
||||
.progress_units = "",
|
||||
.progress = tasks::task_manager::task::progress{},
|
||||
.children = started ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{}
|
||||
.children = started ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:a0222f3a92cdaa1af5290e74fd86fb664db0f4d254af4c7abeab6c44ae991ecf
|
||||
size 6300272
|
||||
oid sha256:9e5a3737655db66bc8c963154c38462d3efcd444ff2efa2623e0ab15e49207f7
|
||||
size 6297128
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:aee13f1b8ce82992e0a5929c62baa3af28d2d0381c188d515538eb5664766903
|
||||
size 6311280
|
||||
oid sha256:71512bffe908b40a4a0d8fd89a6ee48e7c7e7093f88e3a59ece3de31a72bffa1
|
||||
size 6302908
|
||||
|
||||
@@ -1023,8 +1023,8 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
on_internal_error_noexcept(rcslog,
|
||||
format("reader_concurrency_semaphore::signal(): semaphore {} detected resource leak, available {} exceeds initial {}", _name,
|
||||
_resources, _initial_resources));
|
||||
_resources.count = std::max(_resources.count, _initial_resources.count);
|
||||
_resources.memory = std::max(_resources.memory, _initial_resources.memory);
|
||||
_resources.count = std::min(_resources.count, _initial_resources.count);
|
||||
_resources.memory = std::min(_resources.memory, _initial_resources.memory);
|
||||
}
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
|
||||
@@ -2957,7 +2957,7 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(const locator:
|
||||
|
||||
auto it = _storage_groups.find(group_id);
|
||||
if (it == _storage_groups.end()) {
|
||||
throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id));
|
||||
throw std::runtime_error(format("Unable to find sibling tablet of id {} for table {}", group_id, table_id));
|
||||
}
|
||||
auto& sg = it->second;
|
||||
sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) {
|
||||
|
||||
@@ -454,7 +454,7 @@ static future<cql3::untyped_result_set> do_execute_cql_with_timeout(sstring req,
|
||||
auto ps_ptr = qp.get_prepared(cache_key);
|
||||
if (!ps_ptr) {
|
||||
const auto msg_ptr = co_await qp.prepare(req, qs, cql3::internal_dialect());
|
||||
ps_ptr = std::move(msg_ptr->get_prepared());
|
||||
ps_ptr = msg_ptr->get_prepared();
|
||||
if (!ps_ptr) {
|
||||
on_internal_error(paxos_state::logger, "prepared statement is null");
|
||||
}
|
||||
|
||||
@@ -140,14 +140,19 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
|
||||
auto task_type = hint.get_task_type();
|
||||
auto tablet_id_opt = tablet_id_provided(task_type) ? std::make_optional(hint.get_tablet_id()) : std::nullopt;
|
||||
|
||||
size_t tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
|
||||
const auto& tablets = _ss.get_token_metadata().tablets();
|
||||
size_t tablet_count = tablets.has_tablet_map(table) ? tablets.get_tablet_map(table).tablet_count() : 0;
|
||||
auto res = co_await get_status_helper(id, std::move(hint));
|
||||
if (!res) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished");
|
||||
co_await utils::get_local_injector().inject("tablet_virtual_task_wait", utils::wait_for_message(60s));
|
||||
co_await _ss._topology_state_machine.event.wait([&] {
|
||||
if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) {
|
||||
return true;
|
||||
}
|
||||
auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table);
|
||||
if (is_resize_task(task_type)) { // Resize task.
|
||||
return tmap.resize_task_info().tablet_task_id.uuid() != id.uuid();
|
||||
@@ -161,6 +166,10 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
|
||||
});
|
||||
|
||||
res->status.state = tasks::task_manager::task_state::done; // Failed repair task is retried.
|
||||
if (!_ss.get_token_metadata().tablets().has_tablet_map(table)) {
|
||||
res->status.end_time = db_clock::now();
|
||||
co_return res->status;
|
||||
}
|
||||
if (is_migration_task(task_type)) {
|
||||
auto& replicas = _ss.get_token_metadata().tablets().get_tablet_map(table).get_tablet_info(tablet_id_opt.value()).replicas;
|
||||
auto migration_failed = std::all_of(replicas.begin(), replicas.end(), [&] (const auto& replica) { return res->pending_replica.has_value() && replica != res->pending_replica.value(); });
|
||||
@@ -168,9 +177,9 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
|
||||
} else if (is_resize_task(task_type)) {
|
||||
auto new_tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
|
||||
res->status.state = new_tablet_count == tablet_count ? tasks::task_manager::task_state::suspended : tasks::task_manager::task_state::done;
|
||||
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{};
|
||||
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{};
|
||||
} else {
|
||||
res->status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()));
|
||||
res->status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr());
|
||||
}
|
||||
res->status.end_time = db_clock::now(); // FIXME: Get precise end time.
|
||||
co_return res->status;
|
||||
@@ -243,7 +252,15 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
status_helper res;
|
||||
auto table = hint.get_table_id();
|
||||
auto task_type = hint.get_task_type();
|
||||
auto schema = _ss._db.local().get_tables_metadata().get_table(table).schema();
|
||||
auto table_ptr = _ss._db.local().get_tables_metadata().get_table_if_exists(table);
|
||||
if (!table_ptr) {
|
||||
co_return tasks::task_status {
|
||||
.task_id = id,
|
||||
.kind = tasks::task_kind::cluster,
|
||||
.is_abortable = co_await is_abortable(std::move(hint)),
|
||||
};
|
||||
}
|
||||
auto schema = table_ptr->schema();
|
||||
res.status = {
|
||||
.task_id = id,
|
||||
.kind = tasks::task_kind::cluster,
|
||||
@@ -276,7 +293,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
}
|
||||
return make_ready_future();
|
||||
});
|
||||
res.status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()));
|
||||
res.status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr());
|
||||
} else if (is_migration_task(task_type)) { // Migration task.
|
||||
auto tablet_id = hint.get_tablet_id();
|
||||
res.pending_replica = tmap.get_tablet_transition_info(tablet_id)->pending_replica;
|
||||
@@ -290,7 +307,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
|
||||
if (task_info.tablet_task_id.uuid() == id.uuid()) {
|
||||
update_status(task_info, res.status, sched_nr);
|
||||
res.status.state = tasks::task_manager::task_state::running;
|
||||
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{};
|
||||
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{};
|
||||
co_return res;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,13 @@ class service_permit {
|
||||
friend service_permit empty_service_permit();
|
||||
public:
|
||||
size_t count() const { return _permit ? _permit->count() : 0; };
|
||||
// Merge additional semaphore units into this permit.
|
||||
// Used to grow the permit after the actual resource cost is known.
|
||||
void adopt(seastar::semaphore_units<>&& units) {
|
||||
if (_permit) {
|
||||
_permit->adopt(std::move(units));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
inline service_permit make_service_permit(seastar::semaphore_units<>&& permit) {
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
#include <cfloat>
|
||||
#include <algorithm>
|
||||
@@ -406,6 +407,9 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::
|
||||
size_t nr_sst_current = 0;
|
||||
|
||||
while (!sstables.empty()) {
|
||||
co_await utils::get_local_injector().inject("load_and_stream_before_streaming_batch",
|
||||
utils::wait_for_message(60s));
|
||||
|
||||
const size_t batch_sst_nr = std::min(16uz, sstables.size());
|
||||
auto sst_processed = sstables
|
||||
| std::views::reverse
|
||||
@@ -576,6 +580,16 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
|
||||
// throughout its lifetime.
|
||||
auto erm = co_await await_topology_quiesced_and_get_erm(table_id);
|
||||
|
||||
// Obtain a phaser guard to prevent the table from being destroyed
|
||||
// while streaming is in progress. table::stop() calls
|
||||
// _pending_streams_phaser.close() which blocks until all outstanding
|
||||
// stream_in_progress() guards are released, so holding this guard
|
||||
// keeps the table alive for the entire streaming operation.
|
||||
// find_column_family throws no_such_column_family if the table was
|
||||
// already dropped before we got here.
|
||||
auto& tbl = _db.local().find_column_family(table_id);
|
||||
auto stream_guard = tbl.stream_in_progress();
|
||||
|
||||
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
|
||||
_messaging, _db.local(), table_id, std::move(erm), std::move(sstables),
|
||||
primary, unlink_sstables(unlink), scope);
|
||||
|
||||
@@ -74,7 +74,7 @@ future<bool> table_helper::try_prepare(bool fallback, cql3::query_processor& qp,
|
||||
auto& stmt = fallback ? _insert_cql_fallback.value() : _insert_cql;
|
||||
try {
|
||||
shared_ptr<cql_transport::messages::result_message::prepared> msg_ptr = co_await qp.prepare(stmt, qs.get_client_state(), dialect);
|
||||
_prepared_stmt = std::move(msg_ptr->get_prepared());
|
||||
_prepared_stmt = msg_ptr->get_prepared();
|
||||
shared_ptr<cql3::cql_statement> cql_stmt = _prepared_stmt->statement;
|
||||
_insert_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(cql_stmt);
|
||||
_is_fallback_stmt = fallback;
|
||||
|
||||
@@ -400,7 +400,7 @@ task_manager::virtual_task::impl::impl(module_ptr module) noexcept
|
||||
: _module(std::move(module))
|
||||
{}
|
||||
|
||||
future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, std::function<bool(locator::host_id)> is_host_alive) {
|
||||
future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr) {
|
||||
auto ms = module->get_task_manager()._messaging;
|
||||
if (!ms) {
|
||||
auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id);
|
||||
@@ -417,19 +417,18 @@ future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::g
|
||||
tmlogger.info("tasks_vt_get_children: waiting");
|
||||
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{10});
|
||||
});
|
||||
co_return co_await map_reduce(nodes, [ms, parent_id, is_host_alive = std::move(is_host_alive)] (auto host_id) -> future<utils::chunked_vector<task_identity>> {
|
||||
if (is_host_alive(host_id)) {
|
||||
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
|
||||
return resp | std::views::transform([host_id] (auto id) {
|
||||
return task_identity{
|
||||
.host_id = host_id,
|
||||
.task_id = id
|
||||
};
|
||||
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<utils::chunked_vector<task_identity>>();
|
||||
}
|
||||
co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future<utils::chunked_vector<task_identity>> {
|
||||
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
|
||||
return resp | std::views::transform([host_id] (auto id) {
|
||||
return task_identity{
|
||||
.host_id = host_id,
|
||||
.task_id = id
|
||||
};
|
||||
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
|
||||
}).handle_exception_type([host_id, parent_id] (const rpc::closed_error& ex) {
|
||||
tmlogger.warn("Failed to get children of virtual task with id={} from node {}: {}", parent_id, host_id, ex);
|
||||
return utils::chunked_vector<task_identity>{};
|
||||
});
|
||||
}, utils::chunked_vector<task_identity>{}, [] (auto a, auto&& b) {
|
||||
std::move(b.begin(), b.end(), std::back_inserter(a));
|
||||
return a;
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "db_clock.hh"
|
||||
#include "utils/log.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "tasks/types.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
@@ -281,7 +282,7 @@ public:
|
||||
impl& operator=(impl&&) = delete;
|
||||
virtual ~impl() = default;
|
||||
protected:
|
||||
static future<utils::chunked_vector<task_identity>> get_children(module_ptr module, task_id parent_id, std::function<bool(locator::host_id)> is_host_alive);
|
||||
static future<utils::chunked_vector<task_identity>> get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr);
|
||||
public:
|
||||
virtual task_group get_group() const noexcept = 0;
|
||||
// Returns std::nullopt if an operation with task_id isn't tracked by this virtual_task.
|
||||
|
||||
@@ -372,14 +372,17 @@ def test_streams_operations(test_table_s, dynamodbstreams, metrics):
|
||||
# to update latencies for one kind of operation (#17616, and compare #9406),
|
||||
# and to do that checking that ..._count increases for that op is enough.
|
||||
@contextmanager
|
||||
def check_sets_latency(metrics, operation_names):
|
||||
def check_sets_latency_by_metric(metrics, operation_names, metric_name):
|
||||
the_metrics = get_metrics(metrics)
|
||||
saved_latency_count = { x: get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': x}, the_metrics) for x in operation_names }
|
||||
saved_latency_count = { x: get_metric(metrics, f'{metric_name}_count', {'op': x}, the_metrics) for x in operation_names }
|
||||
yield
|
||||
the_metrics = get_metrics(metrics)
|
||||
for op in operation_names:
|
||||
# The total "count" on all shards should strictly increase
|
||||
assert saved_latency_count[op] < get_metric(metrics, 'scylla_alternator_op_latency_count', {'op': op}, the_metrics)
|
||||
assert saved_latency_count[op] < get_metric(metrics, f'{metric_name}_count', {'op': op}, the_metrics)
|
||||
|
||||
def check_sets_latency(metrics, operation_names):
|
||||
return check_sets_latency_by_metric(metrics, operation_names, 'scylla_alternator_op_latency')
|
||||
|
||||
# Test latency metrics for PutItem, GetItem, DeleteItem, UpdateItem.
|
||||
# We can't check what exactly the latency is - just that it gets updated.
|
||||
@@ -395,6 +398,18 @@ def test_item_latency(test_table_s, metrics):
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
def test_item_latency_per_table(test_table_s, metrics):
|
||||
with check_sets_latency_by_metric(metrics, ['DeleteItem', 'GetItem', 'PutItem', 'UpdateItem', 'BatchWriteItem', 'BatchGetItem'], 'scylla_alternator_table_op_latency'):
|
||||
p = random_string()
|
||||
test_table_s.put_item(Item={'p': p})
|
||||
test_table_s.get_item(Key={'p': p})
|
||||
test_table_s.delete_item(Key={'p': p})
|
||||
test_table_s.update_item(Key={'p': p})
|
||||
test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
|
||||
# Test latency metrics for GetRecords. Other Streams-related operations -
|
||||
# ListStreams, DescribeStream, and GetShardIterator, have an operation
|
||||
# count (tested above) but do NOT currently have a latency histogram.
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "test/lib/exception_utils.hh"
|
||||
#include "utils/io-wrappers.hh"
|
||||
|
||||
|
||||
using namespace encryption;
|
||||
|
||||
static tmpdir dir;
|
||||
@@ -595,6 +596,123 @@ SEASTAR_TEST_CASE(test_encrypted_data_source_simple) {
|
||||
co_await test_random_data_source(sizes);
|
||||
}
|
||||
|
||||
// Reproduces the production deadlock where encrypted SSTable component downloads
|
||||
// got stuck during restore. The encrypted_data_source::get() caches a block in
|
||||
// _next, then on the next call bypasses input_stream::read()'s _eof check and
|
||||
// calls input_stream::read_exactly() — which does NOT check _eof when _buf is
|
||||
// empty. This causes a second get() on the underlying source after EOS.
|
||||
//
|
||||
// In production the underlying source was chunked_download_source whose get()
|
||||
// hung forever. Here we simulate it with a strict source that fails the test.
|
||||
//
|
||||
// The fix belongs in seastar's input_stream::read_exactly(): check _eof before
|
||||
// calling _fd.get(), consistent with read(), read_up_to(), and consume().
|
||||
static future<> test_encrypted_source_copy(size_t plaintext_size) {
|
||||
testlog.info("test_encrypted_source_copy: plaintext_size={}", plaintext_size);
|
||||
|
||||
key_info info{"AES/CBC", 256};
|
||||
auto k = ::make_shared<symmetric_key>(info);
|
||||
|
||||
// Step 1: Encrypt the plaintext into memory buffers
|
||||
auto plaintext = generate_random<char>(plaintext_size);
|
||||
std::vector<temporary_buffer<char>> encrypted_bufs;
|
||||
{
|
||||
data_sink sink(make_encrypted_sink(create_memory_sink(encrypted_bufs), k));
|
||||
co_await sink.put(plaintext.clone());
|
||||
co_await sink.close();
|
||||
}
|
||||
|
||||
// Flatten encrypted buffers into a single contiguous buffer
|
||||
size_t encrypted_total = 0;
|
||||
for (const auto& b : encrypted_bufs) {
|
||||
encrypted_total += b.size();
|
||||
}
|
||||
temporary_buffer<char> encrypted(encrypted_total);
|
||||
size_t pos = 0;
|
||||
for (const auto& b : encrypted_bufs) {
|
||||
std::copy(b.begin(), b.end(), encrypted.get_write() + pos);
|
||||
pos += b.size();
|
||||
}
|
||||
|
||||
// Step 2: Create a data source from the encrypted data that fails on
|
||||
// post-EOS get() — simulating a source like chunked_download_source
|
||||
// that would hang forever in this situation.
|
||||
// A simple data source that serves data from a temporary_buffer in
|
||||
// chunks of at most chunk_size, then returns EOF. It also asserts
|
||||
// that get() is never called after EOF — simulating a source like
|
||||
// chunked_download_source that would hang forever in that case.
|
||||
class strict_memory_source final : public data_source_impl {
|
||||
temporary_buffer<char> _data;
|
||||
size_t _chunk_size;
|
||||
bool _eof = false;
|
||||
public:
|
||||
strict_memory_source(temporary_buffer<char> data, size_t chunk_size)
|
||||
: _data(std::move(data))
|
||||
, _chunk_size(chunk_size) {}
|
||||
|
||||
future<temporary_buffer<char>> get() override {
|
||||
BOOST_REQUIRE_MESSAGE(!_eof,
|
||||
"get() called on source after it already returned EOS — "
|
||||
"this is the production deadlock: read_exactly() does not "
|
||||
"check _eof before calling _fd.get()");
|
||||
if (_data.empty()) {
|
||||
_eof = true;
|
||||
co_return temporary_buffer<char>{};
|
||||
}
|
||||
auto n = std::min(_chunk_size, _data.size());
|
||||
auto result = _data.share(0, n);
|
||||
_data.trim_front(n);
|
||||
co_return result;
|
||||
}
|
||||
};
|
||||
|
||||
// Step 3: Wrap in encrypted_data_source and drain via consume() —
|
||||
// the exact code path used by seastar::copy() which is what
|
||||
// sstables_loader_helpers::download_sstable() calls.
|
||||
// Try multiple chunk sizes to hit different alignment scenarios.
|
||||
for (size_t chunk_size : {1ul, 7ul, 4096ul, 8192ul, encrypted_total, encrypted_total + 1}) {
|
||||
if (chunk_size == 0) continue;
|
||||
auto src = data_source(make_encrypted_source(
|
||||
data_source(std::make_unique<strict_memory_source>(encrypted.clone(), chunk_size)), k));
|
||||
auto in = input_stream<char>(std::move(src));
|
||||
|
||||
// consume() is what seastar::copy() uses internally. It calls
|
||||
// encrypted_data_source::get() via _fd.get() until EOF.
|
||||
size_t total_decrypted = 0;
|
||||
co_await in.consume([&total_decrypted](temporary_buffer<char> buf) {
|
||||
total_decrypted += buf.size();
|
||||
return make_ready_future<consumption_result<char>>(continue_consuming{});
|
||||
});
|
||||
co_await in.close();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(total_decrypted, plaintext_size);
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_8k) {
|
||||
co_await test_encrypted_source_copy(8192);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_4k) {
|
||||
co_await test_encrypted_source_copy(4096);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_small) {
|
||||
co_await test_encrypted_source_copy(100);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_12k) {
|
||||
co_await test_encrypted_source_copy(12288);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_unaligned) {
|
||||
co_await test_encrypted_source_copy(8193);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_source_copy_1byte) {
|
||||
co_await test_encrypted_source_copy(1);
|
||||
}
|
||||
|
||||
|
||||
SEASTAR_TEST_CASE(test_encrypted_data_source_fuzzy) {
|
||||
std::mt19937_64 rand_gen(std::random_device{}());
|
||||
|
||||
@@ -38,6 +38,7 @@
|
||||
#include "test/lib/proc_utils.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "init.hh"
|
||||
@@ -794,6 +795,39 @@ static auto make_commitlog_config(const test_provider_args& args, const std::uno
|
||||
return std::make_tuple(cfg, ext);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_system_info_encryption_includes_raft_tables) {
|
||||
tmpdir tmp;
|
||||
auto sysdir = tmp.path() / "system_keys";
|
||||
auto syskey = sysdir / "system" / "system_table_keytab";
|
||||
auto yaml = fmt::format("system_key_directory: {}", sysdir.string());
|
||||
|
||||
co_await create_key_file(syskey, { { "AES/CBC/PKCSPadding", 128 }});
|
||||
|
||||
test_provider_args args{
|
||||
.tmp = tmp,
|
||||
.extra_yaml = yaml,
|
||||
};
|
||||
|
||||
auto [cfg, ext] = make_commitlog_config(args, {});
|
||||
|
||||
co_await do_with_cql_env_thread(
|
||||
[](cql_test_env& env) {
|
||||
auto check_has_encryption = [&](schema_ptr s) {
|
||||
auto it = s->extensions().find("scylla_encryption_options");
|
||||
BOOST_REQUIRE_MESSAGE(it != s->extensions().end(),
|
||||
fmt::format("Expected encryption extension on {}.{}",
|
||||
s->ks_name(), s->cf_name()));
|
||||
BOOST_REQUIRE_MESSAGE(!it->second->is_placeholder(),
|
||||
fmt::format("Encryption extension on {}.{} "
|
||||
"should not be a placeholder",
|
||||
s->ks_name(), s->cf_name()));
|
||||
};
|
||||
|
||||
check_has_encryption(db::system_keyspace::raft());
|
||||
},
|
||||
cfg, {}, cql_test_init_configurables{ *ext });
|
||||
}
|
||||
|
||||
static future<> test_encrypted_commitlog(const test_provider_args& args, std::unordered_map<std::string, std::string> scopts = {}) {
|
||||
fs::path clback = args.tmp.path() / "commitlog_back";
|
||||
|
||||
|
||||
@@ -57,6 +57,20 @@ BOOST_AUTO_TEST_CASE(test_null_is_not_empty) {
|
||||
BOOST_REQUIRE(empty != null);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_null_data_value_to_parsable_string) {
|
||||
auto null_utf8 = data_value::make_null(utf8_type);
|
||||
BOOST_REQUIRE_EQUAL(null_utf8.to_parsable_string(), "null");
|
||||
|
||||
auto null_int = data_value::make_null(int32_type);
|
||||
BOOST_REQUIRE_EQUAL(null_int.to_parsable_string(), "null");
|
||||
|
||||
auto null_list = data_value::make_null(list_type_impl::get_instance(int32_type, true));
|
||||
BOOST_REQUIRE_EQUAL(null_list.to_parsable_string(), "null");
|
||||
|
||||
auto null_map = data_value::make_null(map_type_impl::get_instance(utf8_type, int32_type, true));
|
||||
BOOST_REQUIRE_EQUAL(null_map.to_parsable_string(), "null");
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_bytes_type_string_conversions) {
|
||||
BOOST_REQUIRE(bytes_type->equal(bytes_type->from_string("616263646566"), bytes_type->decompose(data_value(bytes{"abcdef"}))));
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import os
|
||||
import random
|
||||
import string
|
||||
import tempfile
|
||||
import threading
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from pprint import pformat
|
||||
|
||||
@@ -476,24 +477,27 @@ class TesterAlternator(BaseAlternator):
|
||||
extra_config = {"max_concurrent_requests_per_shard": concurrent_requests_limit, "num_tokens": 1}
|
||||
self.prepare_dynamodb_cluster(num_of_nodes=1, extra_config=extra_config)
|
||||
node1 = self.cluster.nodelist()[0]
|
||||
create_tables_threads = []
|
||||
for tables_num in range(concurrent_requests_limit * 5):
|
||||
create_tables_threads.append(self.run_create_table_thread())
|
||||
stop_workers = threading.Event()
|
||||
|
||||
@retrying(num_attempts=150, sleep_time=0.2, allowed_exceptions=ConcurrencyLimitNotExceededError, message="Running create-table request")
|
||||
def wait_for_create_table_request_failure():
|
||||
try:
|
||||
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
|
||||
except Exception as error:
|
||||
if "RequestLimitExceeded" in error.args[0]:
|
||||
return
|
||||
raise
|
||||
raise ConcurrencyLimitNotExceededError
|
||||
def run_create_table_until_limited() -> None:
|
||||
while not stop_workers.is_set():
|
||||
try:
|
||||
self.create_table(table_name=random_string(length=10), node=node1, wait_until_table_exists=False)
|
||||
except Exception as error: # noqa: BLE001
|
||||
if "RequestLimitExceeded" in str(error):
|
||||
stop_workers.set()
|
||||
return
|
||||
raise
|
||||
|
||||
wait_for_create_table_request_failure()
|
||||
with ThreadPoolExecutor(max_workers=concurrent_requests_limit * 5) as executor:
|
||||
create_table_futures = [executor.submit(run_create_table_until_limited) for _ in range(concurrent_requests_limit * 5)]
|
||||
|
||||
for thread in create_tables_threads:
|
||||
thread.join()
|
||||
if not stop_workers.wait(timeout=30):
|
||||
raise ConcurrencyLimitNotExceededError
|
||||
|
||||
stop_workers.set()
|
||||
for future in create_table_futures:
|
||||
future.result(timeout=60)
|
||||
|
||||
@staticmethod
|
||||
def _set_slow_query_logging_api(run_on_node: ScyllaNode, is_enable: bool = True, threshold: int | None = None):
|
||||
|
||||
@@ -255,27 +255,3 @@ async def test_node_ops_task_wait(manager: ManagerClient):
|
||||
|
||||
await decommission_task
|
||||
await waiting_task
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_children(manager: ManagerClient):
|
||||
module_name = "node_ops"
|
||||
tm = TaskManagerClient(manager.api)
|
||||
servers = [await manager.server_add(cmdline=cmdline) for _ in range(2)]
|
||||
|
||||
injection = "tasks_vt_get_children"
|
||||
handler = await inject_error_one_shot(manager.api, servers[0].ip_addr, injection)
|
||||
|
||||
log = await manager.server_open_log(servers[0].server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
bootstrap_task = [task for task in await tm.list_tasks(servers[0].ip_addr, module_name) if task.kind == "cluster"][0]
|
||||
|
||||
async def _decommission():
|
||||
await log.wait_for('tasks_vt_get_children: waiting', from_mark=mark)
|
||||
await manager.decommission_node(servers[1].server_id)
|
||||
await handler.message()
|
||||
|
||||
async def _get_status():
|
||||
await tm.get_task_status(servers[0].ip_addr, bootstrap_task.task_id)
|
||||
|
||||
await asyncio.gather(*(_decommission(), _get_status()))
|
||||
|
||||
@@ -96,6 +96,50 @@ async def test_tablet_repair_task(manager: ManagerClient):
|
||||
|
||||
await asyncio.gather(repair_task(), check_and_abort_repair_task(manager, tm, servers, module_name, ks))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_repair_wait_with_table_drop(manager: ManagerClient):
|
||||
module_name = "tablets"
|
||||
tm = TaskManagerClient(manager.api)
|
||||
injection = "tablet_virtual_task_wait"
|
||||
|
||||
cmdline = [
|
||||
'--logger-log-level', 'debug_error_injection=debug',
|
||||
]
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, cmdline=cmdline)
|
||||
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
|
||||
|
||||
token = -1
|
||||
await enable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, await_completion=False)
|
||||
|
||||
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=ks)
|
||||
|
||||
task = repair_tasks[0]
|
||||
assert task.scope == "table"
|
||||
assert task.keyspace == ks
|
||||
assert task.table == "test"
|
||||
assert task.state in ["created", "running"]
|
||||
|
||||
log = await manager.server_open_log(servers[0].server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
await enable_injection(manager, [servers[0]], injection)
|
||||
|
||||
async def wait_for_task():
|
||||
status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
|
||||
assert status_wait.state == "done"
|
||||
|
||||
async def drop_table():
|
||||
await log.wait_for(f'"{injection}"', from_mark=mark)
|
||||
await disable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
|
||||
await manager.get_cql().run_async(f"DROP TABLE {ks}.test")
|
||||
await manager.api.message_injection(servers[0].ip_addr, injection)
|
||||
|
||||
await asyncio.gather(wait_for_task(), drop_table())
|
||||
|
||||
await disable_injection(manager, servers, injection)
|
||||
|
||||
async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str):
|
||||
def get_task_with_id(repair_tasks, task_id):
|
||||
tasks_with_id1 = [task for task in repair_tasks if task.task_id == task_id]
|
||||
|
||||
@@ -17,10 +17,10 @@ from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
|
||||
from test.pylib.tablets import get_tablet_replicas
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.util import wait_for
|
||||
from test.pylib.util import gather_safely, wait_for
|
||||
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace
|
||||
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, keyspace_has_tablets, new_test_keyspace, new_test_table
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -52,28 +52,42 @@ async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_poi
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_cl_any_to_dead_node_generates_hints(manager: ManagerClient):
|
||||
node_count = 2
|
||||
servers = await manager.servers_add(node_count)
|
||||
cmdline = ["--logger-log-level", "hints_manager=trace"]
|
||||
servers = await manager.servers_add(node_count, cmdline=cmdline)
|
||||
|
||||
async def wait_for_hints_written(min_hint_count: int, timeout: int):
|
||||
async def aux():
|
||||
hints_written = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
if hints_written >= min_hint_count:
|
||||
return True
|
||||
return None
|
||||
assert await wait_for(aux, time.time() + timeout)
|
||||
|
||||
cql = manager.get_cql()
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
table = f"{ks}.t"
|
||||
await cql.run_async(f"CREATE TABLE {table} (pk int primary key, v int)")
|
||||
uses_tablets = await keyspace_has_tablets(manager, ks)
|
||||
# If the keyspace uses tablets, let's explicitly require the table to use multiple tablets.
|
||||
# Otherwise, it could happen that all mutations would target servers[0] only, which would
|
||||
# ultimately lead to a test failure here. We rely on the assumption that mutations will be
|
||||
# distributed more or less uniformly!
|
||||
extra_opts = "WITH tablets = {'min_tablet_count': 16}" if uses_tablets else ""
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, v int", extra_opts) as table:
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
|
||||
await manager.server_stop_gracefully(servers[1].server_id)
|
||||
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
|
||||
hints_before = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
stmt = cql.prepare(f"INSERT INTO {table} (pk, v) VALUES (?, ?)")
|
||||
stmt.consistency_level = ConsistencyLevel.ANY
|
||||
|
||||
# Some of the inserts will be targeted to the dead node.
|
||||
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
||||
for i in range(100):
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i+1})", consistency_level=ConsistencyLevel.ANY))
|
||||
# Some of the inserts will be targeted to the dead node.
|
||||
# The coordinator doesn't have live targets to send the write to, but it should write a hint.
|
||||
await gather_safely(*[cql.run_async(stmt, (i, i + 1)) for i in range(100)])
|
||||
|
||||
# Verify hints are written
|
||||
hints_after = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "written")
|
||||
assert hints_after > hints_before
|
||||
# Verify hints are written
|
||||
await wait_for_hints_written(hints_before + 1, timeout=60)
|
||||
|
||||
# For dropping the keyspace
|
||||
await manager.server_start(servers[1].server_id)
|
||||
# For dropping the keyspace
|
||||
await manager.server_start(servers[1].server_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_limited_concurrency_of_writes(manager: ManagerClient):
|
||||
@@ -269,6 +283,18 @@ async def test_hints_consistency_during_replace(manager: ManagerClient):
|
||||
# Write 100 rows with CL=ANY. Some of the rows will only be stored as hints because of RF=1
|
||||
for i in range(100):
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i + 1})", consistency_level=ConsistencyLevel.ANY))
|
||||
|
||||
# Hint writes are fire-and-forget (store_hint() submits do_store_hint()
|
||||
# asynchronously via a gate). Wait for all pending hint writes to complete
|
||||
# before creating the sync point, otherwise it may capture a stale
|
||||
# replay position and miss some hints.
|
||||
async def no_pending_hint_writes():
|
||||
size = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "size_of_hints_in_progress")
|
||||
if size == 0:
|
||||
return True
|
||||
return None
|
||||
await wait_for(no_pending_hint_writes, time.time() + 30)
|
||||
|
||||
sync_point = await create_sync_point(manager.api.client, servers[0].ip_addr)
|
||||
|
||||
await manager.server_add(replace_cfg=ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True))
|
||||
|
||||
@@ -44,10 +44,10 @@ def get_sstables(workdir, ks, table):
|
||||
sstables = glob.glob(base_pattern)
|
||||
return sstables
|
||||
|
||||
async def get_sst_status(run, log):
|
||||
sst_add = await log.grep(rf'.*Added sst.*for incremental repair')
|
||||
sst_skip = await log.grep(rf'.*Skipped adding sst.*for incremental repair')
|
||||
sst_mark = await log.grep(rf'.*Marking.*for incremental repair')
|
||||
async def get_sst_status(run, log, from_mark=None):
|
||||
sst_add = await log.grep(rf'.*Added sst.*for incremental repair', from_mark=from_mark)
|
||||
sst_skip = await log.grep(rf'.*Skipped adding sst.*for incremental repair', from_mark=from_mark)
|
||||
sst_mark = await log.grep(rf'.*Marking.*for incremental repair', from_mark=from_mark)
|
||||
logging.info(f'{run=}: {sst_add=} {sst_skip=} {sst_mark=}');
|
||||
logging.info(f'{run=}: {len(sst_add)=} {len(sst_skip)=} {len(sst_mark)=}');
|
||||
return sst_add, sst_skip, sst_mark
|
||||
@@ -331,10 +331,11 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
|
||||
servers, cql, hosts, ks, table_id, logs, repaired_keys, unrepaired_keys, current_key, token = await preapre_cluster_for_incremental_repair(manager, nr_keys)
|
||||
token = -1
|
||||
|
||||
marks = [await log.mark() for log in logs]
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
# 1 add 0 skip 1 mark
|
||||
for log in logs:
|
||||
sst_add, sst_skip, sst_mark = await get_sst_status("First", log)
|
||||
for log, mark in zip(logs, marks):
|
||||
sst_add, sst_skip, sst_mark = await get_sst_status("First", log, from_mark=mark)
|
||||
assert len(sst_add) == 1
|
||||
assert len(sst_skip) == 0
|
||||
assert len(sst_mark) == 1
|
||||
@@ -357,13 +358,14 @@ async def do_tablet_incremental_repair_and_ops(manager: ManagerClient, ops: str)
|
||||
else:
|
||||
assert False # Wrong ops
|
||||
|
||||
marks = [await log.mark() for log in logs]
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
# 1 add 1 skip 1 mark
|
||||
for log in logs:
|
||||
sst_add, sst_skip, sst_mark = await get_sst_status("Second", log)
|
||||
assert len(sst_add) == 2
|
||||
assert len(sst_mark) == 2
|
||||
for log, mark in zip(logs, marks):
|
||||
sst_add, sst_skip, sst_mark = await get_sst_status("Second", log, from_mark=mark)
|
||||
assert len(sst_add) == 1
|
||||
assert len(sst_mark) == 1
|
||||
assert len(sst_skip) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
65
test/cluster/test_prepare_race.py
Normal file
65
test/cluster/test_prepare_race.py
Normal file
@@ -0,0 +1,65 @@
|
||||
#
|
||||
# Copyright (C) 2026-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
|
||||
from test.cluster.util import new_test_keyspace, new_test_table
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode="release", reason="error injections are not supported in release mode")
|
||||
async def test_prepare_fails_if_cached_statement_is_invalidated_mid_prepare(manager: ManagerClient):
|
||||
server = await manager.server_add()
|
||||
cql = manager.get_cql()
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};") as ks:
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY") as table:
|
||||
query = f"SELECT * FROM {table} WHERE pk = ?"
|
||||
loop = asyncio.get_running_loop()
|
||||
await cql.run_async(f"INSERT INTO {table} (pk) VALUES (7)")
|
||||
await cql.run_async(f"INSERT INTO {table} (pk) VALUES (8)")
|
||||
|
||||
handler = await inject_error_one_shot(manager.api, server.ip_addr, "query_processor_prepare_wait_after_cache_get")
|
||||
mark = await log.mark()
|
||||
prepare_future = loop.run_in_executor(None, lambda: cql.prepare(query))
|
||||
await log.wait_for("query_processor_prepare_wait_after_cache_get: waiting for message", from_mark=mark, timeout=60)
|
||||
|
||||
# Trigger table schema update (metadata-only) to invalidate prepared statements while PREPARE is paused.
|
||||
await cql.run_async(f"ALTER TABLE {table} WITH comment = 'invalidate-prepared-race'")
|
||||
|
||||
await handler.message()
|
||||
done, _ = await asyncio.wait({prepare_future}, timeout=15)
|
||||
if not done:
|
||||
pytest.fail("Timed out waiting for PREPARE to complete after signaling injection")
|
||||
|
||||
result = done.pop().result()
|
||||
print(f"PREPARE succeeded as expected: {result!r}")
|
||||
|
||||
rows = cql.execute(result, [7])
|
||||
row = rows.one()
|
||||
assert row is not None and row.pk == 7
|
||||
|
||||
# Invalidate prepared statements again, then execute the same prepared object.
|
||||
# The driver should transparently re-prepare and re-request execution.
|
||||
await cql.run_async(f"ALTER TABLE {table} WITH comment = 'invalidate-prepared-race-again'")
|
||||
|
||||
reprepare_handler = await inject_error_one_shot(manager.api, server.ip_addr, "query_processor_prepare_wait_after_cache_get")
|
||||
reprepare_mark = await log.mark()
|
||||
execute_future = loop.run_in_executor(None, lambda: cql.execute(result, [8]))
|
||||
await log.wait_for("query_processor_prepare_wait_after_cache_get: waiting for message", from_mark=reprepare_mark, timeout=60)
|
||||
|
||||
await reprepare_handler.message()
|
||||
execute_done, _ = await asyncio.wait({execute_future}, timeout=15)
|
||||
if not execute_done:
|
||||
pytest.fail("Timed out waiting for driver execute to finish after re-prepare signaling")
|
||||
|
||||
retried_rows = execute_done.pop().result()
|
||||
retried_row = retried_rows.one()
|
||||
assert retried_row is not None and retried_row.pk == 8
|
||||
@@ -15,6 +15,7 @@ from test.pylib.manager_client import ManagerClient, ServerInfo
|
||||
from test.pylib.rest_client import read_barrier, HTTPError
|
||||
from test.pylib.scylla_cluster import ScyllaVersionDescription
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
|
||||
from test.cluster.util import reconnect_driver
|
||||
from cassandra.cluster import ConsistencyLevel
|
||||
from cassandra.policies import FallthroughRetryPolicy
|
||||
from cassandra.protocol import ServerError
|
||||
@@ -162,6 +163,7 @@ async def test_upgrade_and_rollback(manager: ManagerClient, scylla_2025_1: Scyll
|
||||
)
|
||||
|
||||
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature")
|
||||
cql = await reconnect_driver(manager)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
await asyncio.gather(*(wait_for_feature("SSTABLE_COMPRESSION_DICTS", cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import unique_name
|
||||
import pytest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drop_table_during_streaming_receiver_side(manager: ManagerClient):
|
||||
servers = [await manager.server_add(config={
|
||||
@@ -24,3 +32,146 @@ async def test_drop_table_during_flush(manager: ManagerClient):
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
|
||||
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_drop_table_during_load_and_stream(manager: ManagerClient):
|
||||
"""Verify that dropping a table while load_and_stream is in progress
|
||||
does not crash. The stream_in_progress() phaser guard acquired in
|
||||
sstables_loader::load_and_stream keeps the table object alive until
|
||||
streaming completes, so table::stop() blocks until the guard is
|
||||
released — preventing a use-after-free on the replica::table&
|
||||
reference held by the streamer.
|
||||
|
||||
Uses the 'load_and_stream_before_streaming_batch' error injection
|
||||
to pause load_and_stream inside the streaming loop (after the
|
||||
streamer is created and holds a replica::table& reference), then
|
||||
issues DROP TABLE concurrently and verifies both operations complete
|
||||
gracefully.
|
||||
|
||||
A single node is sufficient: load_and_stream streams SSTables to
|
||||
the natural replicas (the local node in this case) via RPC.
|
||||
"""
|
||||
server = await manager.server_add()
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
ks = unique_name("ks_")
|
||||
cf = "test"
|
||||
|
||||
await cql.run_async(
|
||||
f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}")
|
||||
try:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.{cf} (pk int PRIMARY KEY, c int)")
|
||||
|
||||
# Insert data and flush to create SSTables on disk
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{cf} (pk, c) VALUES ({k}, {k})") for k in range(64)])
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
|
||||
# Take snapshot so we have SSTables to copy into upload dir
|
||||
snap_name = unique_name("snap_")
|
||||
await manager.api.take_snapshot(server.ip_addr, ks, snap_name)
|
||||
|
||||
# Copy snapshot SSTables into the upload directory.
|
||||
# load_and_stream will read these and stream to replicas.
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
cf_dir = os.listdir(f"{workdir}/data/{ks}")[0]
|
||||
cf_path = os.path.join(f"{workdir}/data/{ks}", cf_dir)
|
||||
upload_dir = os.path.join(cf_path, "upload")
|
||||
os.makedirs(upload_dir, exist_ok=True)
|
||||
|
||||
snapshots_dir = os.path.join(cf_path, "snapshots", snap_name)
|
||||
exclude_list = ["manifest.json", "schema.cql"]
|
||||
for item in os.listdir(snapshots_dir):
|
||||
if item not in exclude_list:
|
||||
shutil.copy2(os.path.join(snapshots_dir, item), os.path.join(upload_dir, item))
|
||||
|
||||
# Enable injection that pauses load_and_stream inside the streaming
|
||||
# loop, after the streamer is created with a replica::table& reference.
|
||||
# one_shot=True: the test only needs one pause to demonstrate the race;
|
||||
# after firing once per shard the injection disables itself, avoiding
|
||||
# blocking on subsequent batches.
|
||||
await manager.api.enable_injection(server.ip_addr, "load_and_stream_before_streaming_batch", one_shot=True)
|
||||
server_log = await manager.server_open_log(server.server_id)
|
||||
log_mark = await server_log.mark()
|
||||
|
||||
# Start load_and_stream in the background — it will pause at the injection.
|
||||
refresh_task = asyncio.create_task(
|
||||
manager.api.load_new_sstables(server.ip_addr, ks, cf, load_and_stream=True))
|
||||
|
||||
# Wait until at least one shard hits the injection point
|
||||
await server_log.wait_for("load_and_stream_before_streaming_batch: waiting for message", from_mark=log_mark)
|
||||
logger.info("load_and_stream paused at injection point")
|
||||
|
||||
# Drop the table while streaming is paused. With the stream_in_progress
|
||||
# guard the DROP will block until the guard is released.
|
||||
drop_task = asyncio.ensure_future(cql.run_async(f"DROP TABLE {ks}.{cf}"))
|
||||
|
||||
# Give the DROP a moment to be submitted and reach the server.
|
||||
# A log-based wait would be more robust but there is no dedicated log
|
||||
# message for "DROP blocked on phaser"; the sleep is acceptable here.
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Release the injection — streaming resumes.
|
||||
# With the fix, the phaser guard keeps the table object alive and
|
||||
# streaming completes (or fails gracefully). Without the fix,
|
||||
# the table is already destroyed and the node crashes
|
||||
# (use-after-free).
|
||||
await manager.api.message_injection(server.ip_addr, "load_and_stream_before_streaming_batch")
|
||||
logger.info("Released injection, waiting for load_and_stream to complete")
|
||||
|
||||
# Wait for both operations with a timeout — if the node crashed the
|
||||
# REST call / CQL query will never return.
|
||||
refresh_error = None
|
||||
try:
|
||||
await asyncio.wait_for(refresh_task, timeout=30)
|
||||
logger.info("load_and_stream completed")
|
||||
except asyncio.TimeoutError:
|
||||
refresh_error = "load_and_stream timed out — node likely crashed"
|
||||
logger.info(refresh_error)
|
||||
except Exception as e:
|
||||
refresh_error = str(e)
|
||||
logger.info(f"load_and_stream finished with error: {e}")
|
||||
|
||||
drop_error = None
|
||||
try:
|
||||
await asyncio.wait_for(drop_task, timeout=30)
|
||||
logger.info("DROP TABLE completed")
|
||||
except asyncio.TimeoutError:
|
||||
drop_error = "DROP TABLE timed out"
|
||||
logger.info(drop_error)
|
||||
except Exception as e:
|
||||
drop_error = str(e)
|
||||
logger.info(f"DROP TABLE finished with error: {e}")
|
||||
|
||||
# The critical assertion: the node must still be alive.
|
||||
# Without the stream_in_progress() guard, the table is destroyed
|
||||
# while streaming holds a dangling reference, causing a crash
|
||||
# (SEGV or ASAN heap-use-after-free).
|
||||
crash_matches = await server_log.grep(
|
||||
r"Segmentation fault|AddressSanitizer|heap-use-after-free|ABORTING",
|
||||
from_mark=log_mark)
|
||||
assert not crash_matches, \
|
||||
"Node crashed during load_and_stream — " \
|
||||
"stream_in_progress() guard is needed to keep the table alive"
|
||||
|
||||
# DROP TABLE must complete.
|
||||
assert not drop_error, f"DROP TABLE failed unexpectedly: {drop_error}"
|
||||
|
||||
# load_and_stream may fail with a "column family not found" error:
|
||||
# database::drop_table() removes the table from metadata (so
|
||||
# find_column_family() fails) before cleanup_drop_table_on_all_shards()
|
||||
# awaits the phaser. When streaming resumes and opens an RPC channel,
|
||||
# the receiver-side handler calls find_column_family() which throws.
|
||||
# This is the expected graceful failure — the important thing is
|
||||
# no crash (checked above).
|
||||
if refresh_error:
|
||||
assert "Can't find a column family" in refresh_error, \
|
||||
f"load_and_stream failed with unexpected error: {refresh_error}"
|
||||
finally:
|
||||
# Clean up keyspace if it still exists
|
||||
try:
|
||||
await cql.run_async(f"DROP KEYSPACE IF EXISTS {ks}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -534,7 +534,9 @@ async def new_test_keyspace(manager: ManagerClient, opts, host=None):
|
||||
logger.info(f"Error happened while using keyspace '{keyspace}', the keyspace is left in place for investigation")
|
||||
raise
|
||||
else:
|
||||
await manager.get_cql().run_async("DROP KEYSPACE " + keyspace, host=host)
|
||||
# Use DROP KEYSPACE IF EXISTS as a workaround for
|
||||
# https://github.com/scylladb/python-driver/issues/317
|
||||
await manager.get_cql().run_async("DROP KEYSPACE IF EXISTS " + keyspace, host=host)
|
||||
|
||||
previously_used_table_names = []
|
||||
@asynccontextmanager
|
||||
@@ -579,6 +581,17 @@ async def new_materialized_view(manager: ManagerClient, table, select, pk, where
|
||||
await manager.get_cql().run_async(f"DROP MATERIALIZED VIEW {mv}")
|
||||
|
||||
|
||||
async def keyspace_has_tablets(manager: ManagerClient, keyspace: str) -> bool:
|
||||
"""
|
||||
Checks whether the given keyspace uses tablets.
|
||||
Adapted from its counterpart in the cqlpy test: cqlpy/util.py::keyspace_has_tablets.
|
||||
"""
|
||||
cql = manager.get_cql()
|
||||
rows_iter = await cql.run_async(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name='{keyspace}'")
|
||||
rows = list(rows_iter)
|
||||
return len(rows) > 0 and getattr(rows[0], "initial_tablets", None) is not None
|
||||
|
||||
|
||||
async def get_raft_log_size(cql, host) -> int:
|
||||
query = "select count(\"index\") from system.raft"
|
||||
return (await cql.run_async(query, host=host))[0][0]
|
||||
|
||||
@@ -1497,6 +1497,42 @@ def test_views_with_future_tombstones(cql, test_keyspace):
|
||||
assert [] == list(cql.execute(f'select * from {table}'))
|
||||
assert [] == list(cql.execute(f'select * from {mv}'))
|
||||
|
||||
# Test that a range delete in the same batch as an insert correctly covers
|
||||
# rows within the deleted range in the materialized view and that it doesn't
|
||||
# cover rows outside the deleted range. The view update builder must track
|
||||
# range tombstone changes from the update stream so that all range tombstones
|
||||
# are applied to the clustering rows that they cover.
|
||||
# Without this, an inserted row within the range incorrectly survives in the
|
||||
# view or is incorrectly deleted.
|
||||
# Reproduces SCYLLADB-1555.
|
||||
def test_mv_range_delete_and_insert_in_same_batch(cql, test_keyspace):
|
||||
# Case 1: Insert within the range-deleted interval. The range tombstone
|
||||
# should shadow the insert, leaving both base and view empty.
|
||||
with new_test_table(cql, test_keyspace,
|
||||
'p int, c int, v int, w int, primary key (p, c)') as table:
|
||||
with new_materialized_view(cql, table, '*', 'v, p, c',
|
||||
'v is not null and p is not null and c is not null') as mv:
|
||||
cql.execute(f"BEGIN BATCH "
|
||||
f"DELETE FROM {table} WHERE p = 1 AND c >= 1 AND c <= 3; "
|
||||
f"INSERT INTO {table} (p, c, v) VALUES (1, 3, 3); "
|
||||
f"APPLY BATCH")
|
||||
assert [] == list(cql.execute(f"SELECT * FROM {table}"))
|
||||
assert [] == list(cql.execute(f"SELECT * FROM {mv}"))
|
||||
# Case 2: A pre-existing row within the range, and an insert outside it.
|
||||
# The range delete should remove the existing row, but the new row at c=4
|
||||
# falls outside the range and should survive in both base and view.
|
||||
with new_test_table(cql, test_keyspace,
|
||||
'p int, c int, v int, w int, primary key (p, c)') as table:
|
||||
with new_materialized_view(cql, table, '*', 'v, p, c',
|
||||
'v is not null and p is not null and c is not null') as mv:
|
||||
cql.execute(f"INSERT INTO {table} (p, c, v) VALUES (1, 2, 1)")
|
||||
cql.execute(f"BEGIN BATCH "
|
||||
f"DELETE FROM {table} WHERE p = 1 AND c >= 1 AND c <= 3; "
|
||||
f"INSERT INTO {table} (p, c, v) VALUES (1, 4, 3); "
|
||||
f"APPLY BATCH")
|
||||
assert [] != list(cql.execute(f"SELECT * FROM {table}"))
|
||||
assert [] != list(cql.execute(f"SELECT * FROM {mv}"))
|
||||
|
||||
# Test view representation in system.* tables
|
||||
def test_view_in_system_tables(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int") as base:
|
||||
|
||||
@@ -235,7 +235,7 @@ SEASTAR_THREAD_TEST_CASE(multiple_outstanding_operations_on_failing_connection)
|
||||
mylog.trace("multiple_outstanding_operations_on_failing_connection");
|
||||
with_ldap_connection(local_fail_inject_address, [] (ldap_connection& c) {
|
||||
mylog.trace("multiple_outstanding_operations_on_failing_connection: invoking bind");
|
||||
bind(c).handle_exception(&ignore).get();;
|
||||
bind(c).handle_exception(&ignore).get();
|
||||
|
||||
std::vector<future<ldap_msg_ptr>> results_base;
|
||||
for (size_t i = 0; i < 10; ++i) {
|
||||
@@ -291,3 +291,31 @@ SEASTAR_THREAD_TEST_CASE(severed_connection_yields_exceptional_future) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Requires ASAN or valgrind to reliably detect the double-free.
|
||||
SEASTAR_THREAD_TEST_CASE(unregistered_msgid_double_free) {
|
||||
set_defbase();
|
||||
with_ldap_connection(local_ldap_address, [] (ldap_connection& c) {
|
||||
const auto bind_res = bind(c).get();
|
||||
BOOST_REQUIRE_EQUAL(LDAP_RES_BIND, ldap_msgtype(bind_res.get()));
|
||||
|
||||
// Bypass the public API to send a search without registering its
|
||||
// message ID, so poll_results() hits the unregistered-ID branch.
|
||||
int msgid = -1;
|
||||
const int rc = ldap_search_ext(c.get_ldap(), const_cast<char*>(base_dn), LDAP_SCOPE_SUBTREE,
|
||||
/*filter=*/nullptr,
|
||||
/*attrs=*/nullptr,
|
||||
/*attrsonly=*/0,
|
||||
/*serverctrls=*/nullptr,
|
||||
/*clientctrls=*/nullptr,
|
||||
/*timeout=*/nullptr,
|
||||
/*sizelimit=*/0, &msgid);
|
||||
BOOST_REQUIRE_EQUAL(LDAP_SUCCESS, rc);
|
||||
BOOST_REQUIRE_NE(-1, msgid);
|
||||
|
||||
// A public-API search forces poll_results() to process the
|
||||
// unregistered response before returning.
|
||||
const auto dummy = search(c, base_dn).get();
|
||||
BOOST_REQUIRE(dummy.get());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -154,11 +154,6 @@ struct unreachable_socket {
|
||||
conn.shutdown_output();
|
||||
co_await conn.wait_input_shutdown();
|
||||
}
|
||||
// There is currently no effective way to abort an ongoing connect in Seastar.
|
||||
// Timing out connect by with_timeout, remains pending coroutine in the reactor.
|
||||
// To prevent resource leaks, we close the unreachable socket and sleep,
|
||||
// allowing the pending connect coroutines to fail and release their resources.
|
||||
co_await seastar::sleep(3s);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -920,18 +920,20 @@ SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_request
|
||||
|
||||
// Verify backoff timing between status check connections.
|
||||
// Skip the first connection (ANN request) and analyze status check intervals.
|
||||
// Allow small tolerance for timer imprecision: measured intervals can be slightly shorter than the programmed sleep duration.
|
||||
constexpr auto TIMER_TOLERANCE = std::chrono::milliseconds(10);
|
||||
auto duration_between_1st_and_2nd_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
unavail_s->connections().at(2).timestamp - unavail_s->connections().at(1).timestamp);
|
||||
BOOST_CHECK_GE(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(100));
|
||||
BOOST_CHECK_GE(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(100) - TIMER_TOLERANCE);
|
||||
BOOST_CHECK_LT(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(200));
|
||||
auto duration_between_2nd_and_3rd_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
unavail_s->connections().at(3).timestamp - unavail_s->connections().at(2).timestamp);
|
||||
// Max backoff time reached at 200ms, so subsequent status checks use fixed 200ms intervals.
|
||||
BOOST_CHECK_GE(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(200)); // 200ms = 100ms * 2
|
||||
BOOST_CHECK_GE(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(200) - TIMER_TOLERANCE); // 200ms = 100ms * 2
|
||||
BOOST_CHECK_LT(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(400));
|
||||
auto duration_between_3rd_and_4th_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
unavail_s->connections().at(4).timestamp - unavail_s->connections().at(3).timestamp);
|
||||
BOOST_CHECK_GE(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(200));
|
||||
BOOST_CHECK_GE(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(200) - TIMER_TOLERANCE);
|
||||
BOOST_CHECK_LT(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(400));
|
||||
},
|
||||
cfg)
|
||||
|
||||
@@ -67,14 +67,17 @@ void result_message::visitor_base::visit(const result_message::exception& ex) {
|
||||
ex.throw_me();
|
||||
}
|
||||
|
||||
result_message::prepared::prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt)
|
||||
: _prepared(std::move(prepared))
|
||||
result_message::prepared::prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
|
||||
: _prepared_entry(std::move(prepared_entry))
|
||||
, _metadata(
|
||||
_prepared->bound_names,
|
||||
_prepared->partition_key_bind_indices,
|
||||
support_lwt_opt ? _prepared->statement->is_conditional() : false)
|
||||
, _result_metadata{extract_result_metadata(_prepared->statement)}
|
||||
(*_prepared_entry)->bound_names,
|
||||
(*_prepared_entry)->partition_key_bind_indices,
|
||||
support_lwt_opt ? (*_prepared_entry)->statement->is_conditional() : false)
|
||||
, _result_metadata{extract_result_metadata((*_prepared_entry)->statement)}
|
||||
{
|
||||
for (const auto& w : (*_prepared_entry)->warnings){
|
||||
add_warning(w);
|
||||
}
|
||||
}
|
||||
|
||||
::shared_ptr<const cql3::metadata> result_message::prepared::extract_result_metadata(::shared_ptr<cql3::cql_statement> statement) {
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <concepts>
|
||||
|
||||
#include "cql3/result_set.hh"
|
||||
#include "cql3/prepared_statements_cache.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
|
||||
@@ -30,14 +31,14 @@ namespace messages {
|
||||
|
||||
class result_message::prepared : public result_message {
|
||||
private:
|
||||
cql3::statements::prepared_statement::checked_weak_ptr _prepared;
|
||||
cql3::prepared_statements_cache::pinned_value_type _prepared_entry;
|
||||
cql3::prepared_metadata _metadata;
|
||||
::shared_ptr<const cql3::metadata> _result_metadata;
|
||||
protected:
|
||||
prepared(cql3::statements::prepared_statement::checked_weak_ptr prepared, bool support_lwt_opt);
|
||||
prepared(cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt);
|
||||
public:
|
||||
cql3::statements::prepared_statement::checked_weak_ptr& get_prepared() {
|
||||
return _prepared;
|
||||
cql3::statements::prepared_statement::checked_weak_ptr get_prepared() {
|
||||
return (*_prepared_entry)->checked_weak_from_this();
|
||||
}
|
||||
|
||||
const cql3::prepared_metadata& metadata() const {
|
||||
@@ -49,7 +50,7 @@ public:
|
||||
}
|
||||
|
||||
cql3::cql_metadata_id_type get_metadata_id() const {
|
||||
return _prepared->get_metadata_id();
|
||||
return (*_prepared_entry)->get_metadata_id();
|
||||
}
|
||||
|
||||
class cql;
|
||||
@@ -166,8 +167,8 @@ std::ostream& operator<<(std::ostream& os, const result_message::set_keyspace& m
|
||||
class result_message::prepared::cql : public result_message::prepared {
|
||||
bytes _id;
|
||||
public:
|
||||
cql(const bytes& id, cql3::statements::prepared_statement::checked_weak_ptr p, bool support_lwt_opt)
|
||||
: result_message::prepared(std::move(p), support_lwt_opt)
|
||||
cql(const bytes& id, cql3::prepared_statements_cache::pinned_value_type prepared_entry, bool support_lwt_opt)
|
||||
: result_message::prepared(std::move(prepared_entry), support_lwt_opt)
|
||||
, _id{id}
|
||||
{ }
|
||||
|
||||
|
||||
@@ -243,6 +243,12 @@ void cql_sg_stats::register_metrics()
|
||||
);
|
||||
}
|
||||
|
||||
transport_metrics.emplace_back(
|
||||
sm::make_gauge("cql_pending_response_memory", [this] { return _pending_response_memory; },
|
||||
sm::description("Holds the total memory in bytes consumed by responses waiting to be sent."),
|
||||
{{"scheduling_group_name", cur_sg_name}}).set_skip_when_empty()
|
||||
);
|
||||
|
||||
new_metrics.add_group("transport", std::move(transport_metrics));
|
||||
_metrics = std::exchange(new_metrics, {});
|
||||
}
|
||||
@@ -831,6 +837,8 @@ future<> cql_server::connection::process_request() {
|
||||
|
||||
future<> request_response_future = request_process_future.then_wrapped([this, buf = std::move(buf), mem_permit, leave = std::move(leave), stream] (future<foreign_ptr<std::unique_ptr<cql_server::response>>> response_f) mutable {
|
||||
try {
|
||||
auto& sg_stats = _server.get_cql_sg_stats();
|
||||
size_t pending_response_size = 0;
|
||||
if (response_f.failed()) {
|
||||
const auto message = format("request processing failed, error [{}]", response_f.get_exception());
|
||||
clogger.error("{}: {}", _client_state.get_remote_address(), message);
|
||||
@@ -838,9 +846,22 @@ future<> cql_server::connection::process_request() {
|
||||
message,
|
||||
tracing::trace_state_ptr()));
|
||||
} else {
|
||||
write_response(response_f.get(), std::move(mem_permit), _compression);
|
||||
auto response = response_f.get();
|
||||
// Account for response body size exceeding the initial estimate.
|
||||
auto resp_size = response->size();
|
||||
auto permit_size = mem_permit.count();
|
||||
if (resp_size > permit_size) {
|
||||
auto extra = resp_size - permit_size;
|
||||
auto extra_units = consume_units(_server._memory_available, extra);
|
||||
mem_permit.adopt(std::move(extra_units));
|
||||
}
|
||||
pending_response_size = resp_size;
|
||||
sg_stats._pending_response_memory += pending_response_size;
|
||||
write_response(std::move(response), _compression);
|
||||
}
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave)] {});
|
||||
_ready_to_respond = _ready_to_respond.finally([leave = std::move(leave), permit = std::move(mem_permit), &sg_stats, pending_response_size] {
|
||||
sg_stats._pending_response_memory -= pending_response_size;
|
||||
});
|
||||
} catch (...) {
|
||||
clogger.error("{}: request processing failed: {}",
|
||||
_client_state.get_remote_address(), std::current_exception());
|
||||
@@ -1754,9 +1775,9 @@ cql_server::connection::make_schema_change_event(const event::schema_change& eve
|
||||
return response;
|
||||
}
|
||||
|
||||
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit, cql_compression compression)
|
||||
void cql_server::connection::write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, cql_compression compression)
|
||||
{
|
||||
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response), permit = std::move(permit)] () mutable {
|
||||
_ready_to_respond = _ready_to_respond.then([this, compression, response = std::move(response)] () mutable {
|
||||
utils::result_with_exception_ptr<scattered_message<char>> message = response->make_message(_version, compression);
|
||||
if (!message) [[unlikely]] {
|
||||
return make_exception_future<>(std::move(message).assume_error());
|
||||
|
||||
@@ -138,6 +138,10 @@ struct cql_sg_stats {
|
||||
request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) { return _cql_requests_stats[static_cast<uint8_t>(op)]; }
|
||||
void register_metrics();
|
||||
void rename_metrics();
|
||||
|
||||
// Track total memory consumed by responses waiting to be sent.
|
||||
// Incremented when a response is queued, decremented when the write completes.
|
||||
int64_t _pending_response_memory = 0;
|
||||
private:
|
||||
bool _use_metrics = false;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
@@ -229,8 +233,11 @@ public:
|
||||
service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept;
|
||||
service::migration_listener* get_migration_listener() const noexcept;
|
||||
qos::qos_configuration_change_subscriber* get_qos_configuration_listener() const noexcept;
|
||||
cql_sg_stats& get_cql_sg_stats() {
|
||||
return scheduling_group_get_specific<cql_sg_stats>(_stats_key);
|
||||
}
|
||||
cql_sg_stats::request_kind_stats& get_cql_opcode_stats(cql_binary_opcode op) {
|
||||
return scheduling_group_get_specific<cql_sg_stats>(_stats_key).get_cql_opcode_stats(op);
|
||||
return get_cql_sg_stats().get_cql_opcode_stats(op);
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<client_data>> get_client_data();
|
||||
@@ -358,7 +365,7 @@ private:
|
||||
process_on_shard(shard_id shard, uint16_t stream, fragmented_temporary_buffer::istream is, service::client_state& cs,
|
||||
tracing::trace_state_ptr trace_state, cql3::dialect dialect, cql3::computed_function_values&& cached_vals, Process process_fn);
|
||||
|
||||
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, service_permit permit = empty_service_permit(), cql_compression compression = cql_compression::none);
|
||||
void write_response(foreign_ptr<std::unique_ptr<cql_server::response>>&& response, cql_compression compression = cql_compression::none);
|
||||
|
||||
friend event_notifier;
|
||||
};
|
||||
|
||||
@@ -3778,6 +3778,10 @@ data_value::data_value(empty_type_representation e) : data_value(make_new(empty_
|
||||
}
|
||||
|
||||
sstring data_value::to_parsable_string() const {
|
||||
if (is_null()) {
|
||||
return "null";
|
||||
}
|
||||
|
||||
// For some reason trying to do it using fmt::format refuses to compile
|
||||
// auto to_parsable_str_transform = std::views::transform([](const data_value& dv) -> sstring {
|
||||
// return dv.to_parsable_string();
|
||||
|
||||
@@ -145,8 +145,10 @@ shared_ptr<client> client::make(std::string endpoint, endpoint_config_ptr cfg, s
|
||||
|
||||
future<> client::update_credentials_and_rearm() {
|
||||
_credentials = co_await _creds_provider_chain.get_aws_credentials();
|
||||
_creds_invalidation_timer.rearm(_credentials.expires_at);
|
||||
_creds_update_timer.rearm(_credentials.expires_at - 1h);
|
||||
if (_credentials) {
|
||||
_creds_invalidation_timer.rearm(_credentials.expires_at);
|
||||
_creds_update_timer.rearm(_credentials.expires_at - 1h);
|
||||
}
|
||||
}
|
||||
|
||||
future<> client::authorize(http::request& req) {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "client.hh"
|
||||
#include "utils.hh"
|
||||
#include "utils/composite_abort_source.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
#include "utils/exponential_backoff_retry.hh"
|
||||
#include "utils/rjson.hh"
|
||||
@@ -18,6 +19,7 @@
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <seastar/core/with_timeout.hh>
|
||||
#include <seastar/core/abort_on_expiry.hh>
|
||||
#include <chrono>
|
||||
#include <fmt/format.h>
|
||||
#include <netinet/tcp.h>
|
||||
@@ -28,6 +30,39 @@ using namespace std::chrono_literals;
|
||||
namespace vector_search {
|
||||
namespace {
|
||||
|
||||
bool is_ip_address(const sstring& host) {
|
||||
return net::inet_address::parse_numerical(host).has_value();
|
||||
}
|
||||
|
||||
future<connected_socket> connect_with_as(socket_address addr, shared_ptr<tls::certificate_credentials> creds, sstring host, abort_source& as) {
|
||||
as.check();
|
||||
auto sock = make_socket();
|
||||
auto sub = as.subscribe([&sock]() noexcept {
|
||||
sock.shutdown();
|
||||
});
|
||||
auto f = co_await coroutine::as_future(sock.connect(addr));
|
||||
if (as.abort_requested()) {
|
||||
f.ignore_ready_future();
|
||||
throw abort_requested_exception();
|
||||
}
|
||||
|
||||
auto cs = co_await std::move(f);
|
||||
if (creds) {
|
||||
tls::tls_options opts;
|
||||
if (!is_ip_address(host)) {
|
||||
opts.server_name = host;
|
||||
}
|
||||
auto tls_cs = co_await tls::wrap_client(creds, std::move(cs), std::move(opts));
|
||||
co_return tls_cs;
|
||||
}
|
||||
co_return cs;
|
||||
}
|
||||
|
||||
|
||||
bool is_request_aborted(std::exception_ptr& err) {
|
||||
return try_catch<abort_requested_exception>(err) != nullptr;
|
||||
}
|
||||
|
||||
class client_connection_factory : public http::experimental::connection_factory {
|
||||
client::endpoint_type _endpoint;
|
||||
shared_ptr<tls::certificate_credentials> _creds;
|
||||
@@ -41,27 +76,35 @@ public:
|
||||
}
|
||||
|
||||
future<connected_socket> make([[maybe_unused]] abort_source* as) override {
|
||||
auto deadline = std::chrono::steady_clock::now() + timeout();
|
||||
auto socket = co_await with_timeout(deadline, connect());
|
||||
auto t = timeout();
|
||||
auto socket = co_await connect(t, as);
|
||||
socket.set_nodelay(true);
|
||||
socket.set_keepalive_parameters(get_keepalive_parameters(timeout()));
|
||||
socket.set_keepalive_parameters(get_keepalive_parameters(t));
|
||||
socket.set_keepalive(true);
|
||||
unsigned int timeout_ms = timeout().count();
|
||||
unsigned int timeout_ms = t.count();
|
||||
socket.set_sockopt(IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_ms, sizeof(timeout_ms));
|
||||
co_return socket;
|
||||
}
|
||||
|
||||
private:
|
||||
future<connected_socket> connect() {
|
||||
auto addr = socket_address(_endpoint.ip, _endpoint.port);
|
||||
if (_creds) {
|
||||
auto socket = co_await tls::connect(_creds, addr, tls::tls_options{.server_name = _endpoint.host});
|
||||
// tls::connect() only performs the TCP handshake — the TLS handshake is deferred until the first I/O operation.
|
||||
// Force the TLS handshake to happen here so that the connection timeout applies to it.
|
||||
co_await tls::check_session_is_resumed(socket);
|
||||
co_return socket;
|
||||
future<connected_socket> connect(std::chrono::milliseconds timeout, abort_source* as) {
|
||||
abort_on_expiry timeout_as(seastar::lowres_clock::now() + timeout);
|
||||
utils::composite_abort_source composite_as;
|
||||
composite_as.add(timeout_as.abort_source());
|
||||
if (as) {
|
||||
composite_as.add(*as);
|
||||
}
|
||||
co_return co_await seastar::connect(addr, {}, transport::TCP);
|
||||
auto f = co_await coroutine::as_future(
|
||||
connect_with_as(socket_address(_endpoint.ip, _endpoint.port), _creds, _endpoint.host, composite_as.abort_source()));
|
||||
if (f.failed()) {
|
||||
auto err = f.get_exception();
|
||||
// When the connection abort was triggered by our own deadline rethrow as timed_out_error.
|
||||
if (is_request_aborted(err) && timeout_as.abort_source().abort_requested()) {
|
||||
co_await coroutine::return_exception(timed_out_error{});
|
||||
}
|
||||
co_await coroutine::return_exception_ptr(std::move(err));
|
||||
}
|
||||
co_return co_await std::move(f);
|
||||
}
|
||||
|
||||
std::chrono::milliseconds timeout() const {
|
||||
@@ -84,10 +127,6 @@ bool is_server_problem(std::exception_ptr& err) {
|
||||
return is_server_unavailable(err) || try_catch<tls::verification_error>(err) != nullptr || try_catch<timed_out_error>(err) != nullptr;
|
||||
}
|
||||
|
||||
bool is_request_aborted(std::exception_ptr& err) {
|
||||
return try_catch<abort_requested_exception>(err) != nullptr;
|
||||
}
|
||||
|
||||
future<client::request_error> map_err(std::exception_ptr& err) {
|
||||
if (is_server_problem(err)) {
|
||||
co_return service_unavailable_error{};
|
||||
|
||||
Reference in New Issue
Block a user