Files
scylladb/test/vector_search/utils.hh
Karol Nowacki b6afacfc1e vector_search: Reduce connection and keep-alive timeouts
The connection timeout was 2 minutes and the keep-alive
timeout was 11 minutes. If a vector store node became unreachable, these
long timeouts caused significant delays before the system could recover,
negatively impacting high availability.

This change aligns both timeouts with the `request_timeout`
configuration, which defaults to 10 seconds. This allows for much
faster failure detection and recovery, ensuring that unresponsive nodes
are failed over from more quickly.
2025-12-02 01:17:01 +01:00

179 lines
6.4 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "test/lib/cql_test_env.hh"
#include <seastar/core/future.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/http/httpd.hh>
#include <seastar/net/api.hh>
#include <seastar/util/tmp_file.hh>
#include <functional>
#include <chrono>
namespace test::vector_search {
constexpr auto STANDARD_WAIT = std::chrono::seconds(10);
class abort_source_timeout {
abort_source as;
timer<> t;
public:
explicit abort_source_timeout(std::chrono::milliseconds timeout = STANDARD_WAIT)
: t(timer([&]() {
as.request_abort();
})) {
t.arm(timeout);
}
abort_source& get() {
return as;
}
abort_source& reset(std::chrono::milliseconds timeout = STANDARD_WAIT) {
t.cancel();
as = abort_source();
t.arm(timeout);
return as;
}
};
inline auto repeat_until(std::chrono::milliseconds timeout, std::function<seastar::future<bool>()> func) -> seastar::future<bool> {
auto begin = seastar::lowres_clock::now();
while (!co_await func()) {
if (seastar::lowres_clock::now() - begin > timeout) {
co_return false;
}
co_await seastar::yield();
}
co_return true;
}
inline auto repeat_until(std::function<seastar::future<bool>()> func) -> seastar::future<bool> {
return repeat_until(STANDARD_WAIT, std::move(func));
}
inline seastar::future<> try_on_loopback_address(std::function<seastar::future<>(seastar::sstring)> func) {
constexpr size_t MAX_LOCALHOST_ADDR_TO_TRY = 127;
for (size_t i = 1; i < MAX_LOCALHOST_ADDR_TO_TRY; i++) {
auto host = fmt::format("127.0.0.{}", i);
try {
co_await func(std::move(host));
co_return;
} catch (...) {
}
}
throw std::runtime_error("unable to perform action on any 127.0.0.x address");
}
constexpr auto const* LOCALHOST = "127.0.0.1";
inline auto listen_on_port(std::unique_ptr<seastar::httpd::http_server> server, seastar::sstring host, uint16_t port,
seastar::httpd::http_server::server_credentials_ptr credentials)
-> seastar::future<std::tuple<std::unique_ptr<seastar::httpd::http_server>, seastar::socket_address>> {
auto inaddr = seastar::net::inet_address(host);
auto const addr = seastar::socket_address(inaddr, port);
seastar::listen_options opts;
opts.set_fixed_cpu(seastar::this_shard_id());
co_await server->listen(addr, opts, credentials);
auto const& listeners = seastar::httpd::http_server_tester::listeners(*server);
co_return std::make_tuple(std::move(server), listeners.at(0).local_address().port());
}
inline auto make_http_server(std::function<void(seastar::httpd::routes& r)> set_routes) {
static unsigned id = 0;
auto server = std::make_unique<seastar::httpd::http_server>(fmt::format("test_vector_store_client_{}", id++));
set_routes(server->_routes);
server->set_content_streaming(true);
return server;
}
inline auto new_http_server(std::function<void(seastar::httpd::routes& r)> set_routes, seastar::sstring host = LOCALHOST, uint16_t port = 0,
seastar::httpd::http_server::server_credentials_ptr credentials = nullptr)
-> seastar::future<std::tuple<std::unique_ptr<seastar::httpd::http_server>, seastar::socket_address>> {
co_return co_await listen_on_port(make_http_server(set_routes), std::move(host), port, credentials);
}
inline auto new_http_server(std::function<void(seastar::httpd::routes& r)> set_routes, seastar::server_socket socket)
-> seastar::future<std::tuple<std::unique_ptr<seastar::httpd::http_server>, seastar::socket_address>> {
auto server = make_http_server(set_routes);
auto& listeners = seastar::httpd::http_server_tester::listeners(*server);
listeners.push_back(std::move(socket));
co_await server->do_accepts(listeners.size() - 1);
co_return std::make_tuple(std::move(server), listeners.back().local_address().port());
}
// A sample correct ANN response for the test table created in create_test_table().
constexpr auto CORRECT_RESPONSE_FOR_TEST_TABLE = R"({
"primary_keys": {
"pk1": [5, 6],
"pk2": [7, 8],
"ck1": [9, 1],
"ck2": [2, 3]
},
"distances": [0.1, 0.2]
})";
inline auto create_test_table(cql_test_env& env, const seastar::sstring& ks, const seastar::sstring& cf) -> future<schema_ptr> {
co_await env.execute_cql(fmt::format(R"(
create table {}.{} (
pk1 tinyint, pk2 tinyint,
ck1 tinyint, ck2 tinyint,
embedding vector<float, 3>,
primary key ((pk1, pk2), ck1, ck2))
)",
ks, cf));
co_return env.local_db().find_schema(ks, cf);
}
inline seastar::future<> remove(seastar::tmp_file& f) {
co_await f.close().finally([&f] {
return f.remove();
});
}
struct unreachable_socket {
seastar::server_socket socket;
std::vector<connected_socket> connections;
sstring host = "127.0.0.1";
std::uint16_t port;
seastar::future<> close() {
socket.abort_accept();
for (auto& conn : connections) {
conn.shutdown_input();
conn.shutdown_output();
co_await conn.wait_input_shutdown();
}
// There is currently no effective way to abort an ongoing connect in Seastar.
// Timing out connect by with_timeout, remains pending coroutine in the reactor.
// To prevent resource leaks, we close the unreachable socket and sleep,
// allowing the pending connect coroutines to fail and release their resources.
co_await seastar::sleep(3s);
}
};
inline seastar::future<unreachable_socket> make_unreachable_socket() {
unreachable_socket ret;
seastar::listen_options opts;
opts.listen_backlog = 1;
opts.set_fixed_cpu(seastar::this_shard_id());
ret.socket = seastar::listen(seastar::socket_address(seastar::net::inet_address(ret.host), 0), opts);
ret.port = ret.socket.local_address().port();
// Make two (backlog + 1) connections to occupy the backlog.
ret.connections.push_back(co_await seastar::connect(seastar::socket_address(seastar::net::inet_address(ret.host), ret.port)));
ret.connections.push_back(co_await seastar::connect(seastar::socket_address(seastar::net::inet_address(ret.host), ret.port)));
co_return std::move(ret);
}
} // namespace test::vector_search