vector_search: fix race condition on connection timeout

When a `with_connect` operation timed out, the underlying connection
attempt continued to run in the reactor. This could lead to a crash
if the connection was established/rejected after the client object had
already been destroyed. This issue was observed during the teardown
phase of a upcoming high-availability test case.

This commit fixes the race condition by ensuring the connection attempt
is properly canceled on timeout.

Additionally, the explicit TLS handshake previously forced during the
connection is now deferred to the first I/O operation, which is the
default and preferred behavior.

Fixes: SCYLLADB-832
This commit is contained in:
Karol Nowacki
2026-03-13 11:50:53 +01:00
parent fc8cebd671
commit 5474cc6cc2
2 changed files with 53 additions and 26 deletions

View File

@@ -155,11 +155,6 @@ struct unreachable_socket {
conn.shutdown_output();
co_await conn.wait_input_shutdown();
}
// There is currently no effective way to abort an ongoing connect in Seastar.
// Timing out connect by with_timeout, remains pending coroutine in the reactor.
// To prevent resource leaks, we close the unreachable socket and sleep,
// allowing the pending connect coroutines to fail and release their resources.
co_await seastar::sleep(3s);
}
};

View File

@@ -8,6 +8,7 @@
#include "client.hh"
#include "utils.hh"
#include "utils/composite_abort_source.hh"
#include "utils/exceptions.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/rjson.hh"
@@ -18,6 +19,8 @@
#include <seastar/coroutine/as_future.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/core/with_timeout.hh>
#include <seastar/core/abort_on_expiry.hh>
#include <seastar/coroutine/try_future.hh>
#include <chrono>
#include <fmt/format.h>
#include <netinet/tcp.h>
@@ -33,6 +36,35 @@ bool is_ip_address(const sstring& host) {
return net::inet_address::parse_numerical(host).has_value();
}
future<connected_socket> connect_with_as(socket_address addr, shared_ptr<tls::certificate_credentials> creds, sstring host, abort_source& as) {
as.check();
auto sock = make_socket();
auto sub = as.subscribe([&sock]() noexcept {
sock.shutdown();
});
auto f = co_await coroutine::as_future(sock.connect(addr));
if (as.abort_requested()) {
f.ignore_ready_future();
throw abort_requested_exception();
}
auto cs = co_await coroutine::try_future(std::move(f));
if (creds) {
tls::tls_options opts;
if (!is_ip_address(host)) {
opts.server_name = host;
}
auto tls_cs = co_await tls::wrap_client(creds, std::move(cs), std::move(opts));
co_return tls_cs;
}
co_return cs;
}
bool is_request_aborted(std::exception_ptr& err) {
return try_catch<abort_requested_exception>(err) != nullptr;
}
class client_connection_factory : public http::experimental::connection_factory {
client::endpoint_type _endpoint;
shared_ptr<tls::certificate_credentials> _creds;
@@ -46,31 +78,35 @@ public:
}
future<connected_socket> make([[maybe_unused]] abort_source* as) override {
auto deadline = std::chrono::steady_clock::now() + timeout();
auto socket = co_await with_timeout(deadline, connect());
auto t = timeout();
auto socket = co_await connect(t, as);
socket.set_nodelay(true);
socket.set_keepalive_parameters(get_keepalive_parameters(timeout()));
socket.set_keepalive_parameters(get_keepalive_parameters(t));
socket.set_keepalive(true);
unsigned int timeout_ms = timeout().count();
unsigned int timeout_ms = t.count();
socket.set_sockopt(IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_ms, sizeof(timeout_ms));
co_return socket;
}
private:
future<connected_socket> connect() {
auto addr = socket_address(_endpoint.ip, _endpoint.port);
if (_creds) {
tls::tls_options opts;
if (!is_ip_address(_endpoint.host)) {
opts.server_name = _endpoint.host;
}
auto socket = co_await tls::connect(_creds, addr, std::move(opts));
// tls::connect() only performs the TCP handshake — the TLS handshake is deferred until the first I/O operation.
// Force the TLS handshake to happen here so that the connection timeout applies to it.
co_await tls::check_session_is_resumed(socket);
co_return socket;
future<connected_socket> connect(std::chrono::milliseconds timeout, abort_source* as) {
abort_on_expiry timeout_as(seastar::lowres_clock::now() + timeout);
utils::composite_abort_source composite_as;
composite_as.add(timeout_as.abort_source());
if (as) {
composite_as.add(*as);
}
co_return co_await seastar::connect(addr, {}, transport::TCP);
auto f = co_await coroutine::as_future(
connect_with_as(socket_address(_endpoint.ip, _endpoint.port), _creds, _endpoint.host, composite_as.abort_source()));
if (f.failed()) {
auto err = f.get_exception();
// When the connection abort was triggered by our own deadline rethrow as timed_out_error.
if (is_request_aborted(err) && timeout_as.abort_source().abort_requested()) {
co_await coroutine::return_exception(timed_out_error{});
}
co_await coroutine::return_exception_ptr(std::move(err));
}
co_return co_await std::move(f);
}
std::chrono::milliseconds timeout() const {
@@ -93,10 +129,6 @@ bool is_server_problem(std::exception_ptr& err) {
return is_server_unavailable(err) || try_catch<tls::verification_error>(err) != nullptr || try_catch<timed_out_error>(err) != nullptr;
}
bool is_request_aborted(std::exception_ptr& err) {
return try_catch<abort_requested_exception>(err) != nullptr;
}
future<client::request_error> map_err(std::exception_ptr& err) {
if (is_server_problem(err)) {
co_return service_unavailable_error{};