From 62f8b26bd779390d20a6e5165345bcae028b961e Mon Sep 17 00:00:00 2001 From: Karol Nowacki Date: Wed, 22 Oct 2025 08:40:50 +0200 Subject: [PATCH 1/7] vector_search: Extract client class This refactoring extracts low-level client logic into a new, dedicated `client` class. The new class is responsible for connecting to the server and serializing requests. This change prepares for extending the `vector_store_client` to check node status via the `api/v1/status` endpoint. `/ann` Response deserialization remains in the `vector_store_client` as it is schema-dependent. --- configure.py | 1 + vector_search/CMakeLists.txt | 3 +- vector_search/client.cc | 70 +++++++++++++++++++++++ vector_search/client.hh | 51 +++++++++++++++++ vector_search/vector_store_client.cc | 83 +++------------------------- 5 files changed, 131 insertions(+), 77 deletions(-) create mode 100644 vector_search/client.cc create mode 100644 vector_search/client.hh diff --git a/configure.py b/configure.py index 5714c8a955..ba351de212 100755 --- a/configure.py +++ b/configure.py @@ -1265,6 +1265,7 @@ scylla_core = (['message/messaging_service.cc', 'utils/disk_space_monitor.cc', 'vector_search/vector_store_client.cc', 'vector_search/dns.cc', + 'vector_search/client.cc' ] + [Antlr3Grammar('cql3/Cql.g')] \ + scylla_raft_core ) diff --git a/vector_search/CMakeLists.txt b/vector_search/CMakeLists.txt index 9ff112ed70..620e2b23fa 100644 --- a/vector_search/CMakeLists.txt +++ b/vector_search/CMakeLists.txt @@ -2,7 +2,8 @@ add_library(vector_search STATIC) target_sources(vector_search PRIVATE vector_store_client.cc - dns.cc) + dns.cc + client.cc) target_link_libraries(vector_search PUBLIC Seastar::seastar diff --git a/vector_search/client.cc b/vector_search/client.cc new file mode 100644 index 0000000000..0f0aebde14 --- /dev/null +++ b/vector_search/client.cc @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "client.hh" +#include +#include +#include +#include + +using namespace seastar; +using namespace std::chrono_literals; + +namespace vector_search { +namespace { + +class client_connection_factory : public http::experimental::connection_factory { + socket_address _addr; + +public: + explicit client_connection_factory(socket_address addr) + : _addr(addr) { + } + + future make([[maybe_unused]] abort_source* as) override { + auto socket = co_await seastar::connect(_addr, {}, transport::TCP); + socket.set_nodelay(true); + socket.set_keepalive_parameters(net::tcp_keepalive_params{ + .idle = 60s, + .interval = 60s, + .count = 10, + }); + socket.set_keepalive(true); + co_return socket; + } +}; + +} // namespace + +client::client(endpoint_type endpoint_) + : _endpoint(std::move(endpoint_)) + , _http_client(std::make_unique(socket_address(endpoint_.ip, endpoint_.port))) { +} + +seastar::future client::request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as) { + + auto req = http::request::make(method, _endpoint.host, std::move(path)); + if (content) { + req.write_body("json", std::move(*content)); + } + auto resp = response{seastar::http::reply::status_type::ok, std::vector>()}; + auto handler = [&resp](http::reply const& reply, input_stream body) -> future<> { + resp.status = reply._status; + resp.content = co_await util::read_entire_stream(body); + }; + + co_await _http_client.make_request(std::move(req), std::move(handler), std::nullopt, &as); + co_return resp; +} + +seastar::future<> client::close() { + return _http_client.close(); +} + +} // namespace vector_search diff --git a/vector_search/client.hh b/vector_search/client.hh new file mode 100644 index 0000000000..fcaa06a6b1 --- /dev/null +++ b/vector_search/client.hh @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace vector_search { + +class client { + +public: + struct response { + seastar::http::reply::status_type status; + std::vector> content; + }; + + struct endpoint_type { + seastar::sstring host; + std::uint16_t port; + seastar::net::inet_address ip; + }; + + explicit client(endpoint_type endpoint_); + + seastar::future request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); + + seastar::future<> close(); + + const endpoint_type& endpoint() const { + return _endpoint; + } + +private: + endpoint_type _endpoint; + seastar::http::experimental::client _http_client; +}; + + +} // namespace vector_search diff --git a/vector_search/vector_store_client.cc b/vector_search/vector_store_client.cc index 1d4e270fa1..4d5ebcfbc1 100644 --- a/vector_search/vector_store_client.cc +++ b/vector_search/vector_store_client.cc @@ -9,6 +9,7 @@ #include "vector_store_client.hh" #include "dns.hh" #include "load_balancer.hh" +#include "client.hh" #include "db/config.hh" #include "exceptions/exceptions.hh" #include "utils/sequential_producer.hh" @@ -203,63 +204,6 @@ auto read_ann_json(rjson::value const& json, schema_ptr const& schema) -> std::e return std::move(keys); } -class client_connection_factory : public http::experimental::connection_factory { - socket_address _addr; - -public: - explicit client_connection_factory(socket_address addr) - : _addr(addr) { - } - - future make([[maybe_unused]] abort_source* as) override { - auto socket = co_await seastar::connect(_addr, {}, transport::TCP); - socket.set_nodelay(true); - socket.set_keepalive_parameters(tcp_keepalive_params{ - .idle = 60s, - .interval = 60s, - .count = 10, - }); - socket.set_keepalive(true); - co_return socket; - } -}; - -class http_client { - - uri _uri; - inet_address _addr; - - http::experimental::client impl; - -public: - http_client(uri host_port_, inet_address addr) - : _uri(std::move(host_port_)) - , _addr(std::move(addr)) - , impl(std::make_unique(socket_address(addr, _uri.port))) { - } - - bool connects_to(inet_address const& a, port_number p) const { - return _addr == a && _uri.port == p; - } - - seastar::future<> make_request(operation_type method, const http_path& path, const std::optional& content, - http::experimental::client::reply_handler&& handle, abort_source* as) { - auto req = http::request::make(method, _uri.host, path); - if (content) { - req.write_body("json", *content); - } - return impl.make_request(std::move(req), std::move(handle), std::nullopt, as); - } - - seastar::future<> close() { - return impl.close(); - } - - const inet_address& addr() const { - return _addr; - } -}; - bool should_vector_store_service_be_disabled(std::vector const& uris) { return uris.empty() || uris[0].empty(); } @@ -306,7 +250,7 @@ namespace vector_search { struct vector_store_client::impl { - using clients_type = std::vector>; + using clients_type = std::vector>; utils::observer uri_observer; clients_type current_clients; @@ -358,7 +302,7 @@ struct vector_store_client::impl { auto it = addrs.find(uri.host); if (it != addrs.end()) { for (const auto& addr : it->second) { - current_clients.push_back(make_lw_shared(uri, addr)); + current_clients.push_back(make_lw_shared(client::endpoint_type{uri.host, uri.port, addr})); } } } @@ -429,17 +373,10 @@ struct vector_store_client::impl { co_return clients; } - struct make_request_response { - http::reply::status_type status; ///< The HTTP status of the response. - std::vector> content; ///< The content of the response. - }; - using make_request_error = std::variant; auto make_request(operation_type method, http_path path, std::optional content, abort_source& as) - -> future> { - auto resp = make_request_response{.status = http::reply::status_type::ok, .content = std::vector>()}; - + -> future> { for (auto retries = 0; retries < ANN_RETRIES; ++retries) { auto clients = co_await get_clients(as); if (!clients) { @@ -452,13 +389,7 @@ struct vector_store_client::impl { load_balancer lb(std::move(*clients), random_engine); while (auto client = lb.next()) { - auto result = co_await coroutine::as_future(client->make_request( - method, path, content, - [&resp](http::reply const& reply, input_stream body) -> future<> { - resp.status = reply._status; - resp.content = co_await util::read_entire_stream(body); - }, - &as)); + auto result = co_await coroutine::as_future(client->request(method, path, content, as)); if (result.failed()) { auto err = result.get_exception(); if (as.abort_requested()) { @@ -469,7 +400,7 @@ struct vector_store_client::impl { } // std::system_error means that the server is unavailable, so we retry } else { - co_return resp; + co_return co_await std::move(result); } } @@ -564,7 +495,7 @@ auto vector_store_client_tester::resolve_hostname(vector_store_client& vsc, abor co_return ret; } for (auto const& c : *clients) { - ret.push_back(c->addr()); + ret.push_back(c->endpoint().ip); } co_return ret; } From 49a177b51e545a82f898174318e51d274a2e56b9 Mon Sep 17 00:00:00 2001 From: Karol Nowacki Date: Mon, 3 Nov 2025 11:21:58 +0100 Subject: [PATCH 2/7] vector_search: Use std::expected for low-level client errors To unify error handling, the low-level client methods now return `std::expected` instead of throwing exceptions. This allows for consistent and explicit error propagation from the client up to the caller. The relevant error types have been moved to a new `vector_search/error.hh` header to centralize their definitions. --- vector_search/client.cc | 29 +++++++++++++- vector_search/client.hh | 9 ++++- vector_search/error.hh | 57 ++++++++++++++++++++++++++++ vector_search/vector_store_client.cc | 35 ++++++++--------- vector_search/vector_store_client.hh | 48 ++++------------------- 5 files changed, 117 insertions(+), 61 deletions(-) create mode 100644 vector_search/error.hh diff --git a/vector_search/client.cc b/vector_search/client.cc index 0f0aebde14..6a202751d1 100644 --- a/vector_search/client.cc +++ b/vector_search/client.cc @@ -7,10 +7,12 @@ */ #include "client.hh" +#include "utils/exceptions.hh" #include #include #include #include +#include using namespace seastar; using namespace std::chrono_literals; @@ -39,6 +41,25 @@ public: } }; +bool is_server_unavailable(std::exception_ptr& err) { + return try_catch(err) != nullptr; +} + +bool is_request_aborted(std::exception_ptr& err) { + return try_catch(err) != nullptr; +} + +future map_err(std::exception_ptr& err) { + if (is_server_unavailable(err)) { + co_return service_unavailable_error{}; + } + if (is_request_aborted(err)) { + co_return aborted_error{}; + } + co_await coroutine::return_exception_ptr(err); // rethrow + co_return client::request_error{}; // unreachable +} + } // namespace client::client(endpoint_type endpoint_) @@ -46,7 +67,7 @@ client::client(endpoint_type endpoint_) , _http_client(std::make_unique(socket_address(endpoint_.ip, endpoint_.port))) { } -seastar::future client::request( +seastar::future client::request( seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as) { auto req = http::request::make(method, _endpoint.host, std::move(path)); @@ -59,7 +80,11 @@ seastar::future client::request( resp.content = co_await util::read_entire_stream(body); }; - co_await _http_client.make_request(std::move(req), std::move(handler), std::nullopt, &as); + auto f = co_await seastar::coroutine::as_future(_http_client.make_request(std::move(req), std::move(handler), std::nullopt, &as)); + if (f.failed()) { + auto err = f.get_exception(); + co_return std::unexpected(co_await map_err(err)); + } co_return resp; } diff --git a/vector_search/client.hh b/vector_search/client.hh index fcaa06a6b1..6305b86334 100644 --- a/vector_search/client.hh +++ b/vector_search/client.hh @@ -8,17 +8,19 @@ #pragma once +#include "error.hh" #include #include #include #include #include #include +#include +#include namespace vector_search { class client { - public: struct response { seastar::http::reply::status_type status; @@ -31,9 +33,12 @@ public: seastar::net::inet_address ip; }; + using request_error = std::variant; + using request_result = std::expected; + explicit client(endpoint_type endpoint_); - seastar::future request( + seastar::future request( seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); seastar::future<> close(); diff --git a/vector_search/error.hh b/vector_search/error.hh new file mode 100644 index 0000000000..ce337b12a8 --- /dev/null +++ b/vector_search/error.hh @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once +#include +#include +#include + +namespace vector_search { + +/// The service is disabled. +struct disabled_error {}; + +/// The operation was aborted. +struct aborted_error {}; + +/// The vector-store addr is unavailable (not possible to get an addr from the dns service). +struct addr_unavailable_error {}; + +/// The vector-store service is unavailable. +struct service_unavailable_error {}; + +/// The error from the vector-store service. +struct service_error { + seastar::http::reply::status_type status; ///< The HTTP status code from the vector-store service. +}; + +/// An unsupported reply format from the vector-store service. +struct service_reply_format_error {}; + +struct error_visitor { + seastar::sstring operator()(service_error e) const { + return fmt::format("Vector Store error: HTTP status {}", e.status); + } + seastar::sstring operator()(disabled_error) const { + return fmt::format("Vector Store is disabled"); + } + seastar::sstring operator()(aborted_error) const { + return fmt::format("Vector Store request was aborted"); + } + seastar::sstring operator()(addr_unavailable_error) const { + return fmt::format("Vector Store service address could not be fetched from DNS"); + } + seastar::sstring operator()(service_unavailable_error) const { + return fmt::format("Vector Store service is unavailable"); + } + seastar::sstring operator()(service_reply_format_error) const { + return fmt::format("Vector Store returned an invalid JSON"); + } +}; + +} // namespace vector_search diff --git a/vector_search/vector_store_client.cc b/vector_search/vector_store_client.cc index 4d5ebcfbc1..7c4a2fc450 100644 --- a/vector_search/vector_store_client.cc +++ b/vector_search/vector_store_client.cc @@ -244,6 +244,15 @@ std::vector get_hosts(const std::vector& uris) { return ret; } +template +auto make_unexpected(const auto& err) { + return std::unexpected{std::visit( + [](auto&& err) { + return Variant{err}; + }, + err)}; +}; + } // namespace namespace vector_search { @@ -380,28 +389,20 @@ struct vector_store_client::impl { for (auto retries = 0; retries < ANN_RETRIES; ++retries) { auto clients = co_await get_clients(as); if (!clients) { - co_return std::unexpected{std::visit( - [](auto&& err) { - return make_request_error{err}; - }, - clients.error())}; + co_return make_unexpected(clients.error()); } load_balancer lb(std::move(*clients), random_engine); while (auto client = lb.next()) { - auto result = co_await coroutine::as_future(client->request(method, path, content, as)); - if (result.failed()) { - auto err = result.get_exception(); - if (as.abort_requested()) { - co_return std::unexpected{aborted{}}; - } - if (try_catch(err) == nullptr) { - co_await coroutine::return_exception_ptr(std::move(err)); - } - // std::system_error means that the server is unavailable, so we retry - } else { - co_return co_await std::move(result); + auto result = co_await client->request(method, path, content, as); + if (result) { + co_return std::move(result.value()); } + if (std::holds_alternative(result.error())) { + // try next client + continue; + } + co_return make_unexpected(result.error()); } dns.trigger_refresh(); diff --git a/vector_search/vector_store_client.hh b/vector_search/vector_store_client.hh index ae21bb241b..cd593507c3 100644 --- a/vector_search/vector_store_client.hh +++ b/vector_search/vector_store_client.hh @@ -11,6 +11,7 @@ #include "dht/decorated_key.hh" #include "keys/keys.hh" #include "seastarx.hh" +#include "error.hh" #include #include #include @@ -49,48 +50,15 @@ public: using schema_ptr = lw_shared_ptr; using status_type = http::reply::status_type; - /// The vector_store_client service is disabled. - struct disabled {}; - - /// The operation was aborted. - struct aborted {}; - - /// The vector-store addr is unavailable (not possible to get an addr from the dns service). - struct addr_unavailable {}; - - /// The vector-store service is unavailable. - struct service_unavailable {}; - - /// The error from the vector-store service. - struct service_error { - status_type status; ///< The HTTP status code from the vector-store service. - }; - - /// An unsupported reply format from the vector-store service. - struct service_reply_format_error {}; + using disabled = disabled_error; + using aborted = aborted_error; + using addr_unavailable = addr_unavailable_error; + using service_unavailable = service_unavailable_error; + using service_error = service_error; + using service_reply_format_error = service_reply_format_error; using ann_error = std::variant; - - struct ann_error_visitor { - sstring operator()(vector_store_client::service_error e) const { - return fmt::format("Vector Store error: HTTP status {}", e.status); - } - sstring operator()(vector_store_client::disabled) const { - return fmt::format("Vector Store is disabled"); - } - sstring operator()(vector_store_client::aborted) const { - return fmt::format("Vector Store request was aborted"); - } - sstring operator()(vector_store_client::addr_unavailable) const { - return fmt::format("Vector Store service address could not be fetched from DNS"); - } - sstring operator()(vector_store_client::service_unavailable) const { - return fmt::format("Vector Store service is unavailable"); - } - sstring operator()(vector_store_client::service_reply_format_error) const { - return fmt::format("Vector Store returned an invalid JSON"); - } - }; + using ann_error_visitor = error_visitor; explicit vector_store_client(config const& cfg); ~vector_store_client(); From 190459aefada530f3bf4249cb64c9397b72e86dc Mon Sep 17 00:00:00 2001 From: Karol Nowacki Date: Fri, 3 Oct 2025 10:18:17 +0200 Subject: [PATCH 3/7] vector_search: Make endpoint available In preparation for a new feature, the tests need the ability to make an endpoint that was previously unavailable, available again. This is achieved by adding an `unavailable_server::take_socket` method. This method allows transferring the listening socket from the `unavailable_server` to the `mock_vs_server`, ensuring they both operate on the same endpoint. --- .../vector_search/vector_store_client_test.cc | 76 ++++++++++++++++--- 1 file changed, 64 insertions(+), 12 deletions(-) diff --git a/test/vector_search/vector_store_client_test.cc b/test/vector_search/vector_store_client_test.cc index c65cbcab66..7e07cc3146 100644 --- a/test/vector_search/vector_store_client_test.cc +++ b/test/vector_search/vector_store_client_test.cc @@ -6,6 +6,7 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ +#include "seastar/core/future.hh" #include "vector_search/vector_store_client.hh" #include "db/config.hh" #include "exceptions/exceptions.hh" @@ -67,13 +68,25 @@ auto listen_on_port(std::unique_ptr server, sstring host, uint16_t co_return std::make_tuple(std::move(server), listeners[0].local_address().port()); } -auto new_http_server(std::function set_routes, sstring host = LOCALHOST, uint16_t port = 0) - -> future, socket_address>> { +auto make_http_server(std::function set_routes) { static unsigned id = 0; auto server = std::make_unique(fmt::format("test_vector_store_client_{}", id++)); set_routes(server->_routes); server->set_content_streaming(true); - co_return co_await listen_on_port(std::move(server), std::move(host), port); + return server; +} + +auto new_http_server(std::function set_routes, sstring host = LOCALHOST, uint16_t port = 0) + -> future, socket_address>> { + co_return co_await listen_on_port(make_http_server(set_routes), std::move(host), port); +} + +auto new_http_server(std::function set_routes, server_socket socket) -> future, socket_address>> { + auto server = make_http_server(set_routes); + auto& listeners = http_server_tester::listeners(*server); + listeners.push_back(std::move(socket)); + co_await server->do_accepts(listeners.size() - 1); + co_return std::make_tuple(std::move(server), listeners.back().local_address().port()); } auto repeat_until(milliseconds timeout, std::function()> func) -> future { @@ -237,8 +250,10 @@ public: } future<> stop() { - _socket.abort_accept(); - co_await _gate.close(); + if (_socket) { + _socket.abort_accept(); + co_await _gate.close(); + } } sstring host() const { @@ -253,6 +268,14 @@ public: return _connections; } + future take_socket() { + _running = false; + // Make a connection to unblock accept() in run loop. + co_await seastar::connect(socket_address(net::inet_address(_host), _port)); + co_await _gate.close(); + co_return std::move(_socket); + } + private: future<> listen() { co_await try_on_loopback_address([this](auto host) -> future<> { @@ -266,7 +289,7 @@ private: } future<> run() { - while (true) { + while (_running) { try { auto s = co_await _socket.accept(); _connections++; @@ -284,6 +307,7 @@ private: uint16_t _port; sstring _host; size_t _connections = 0; + bool _running = true; }; auto make_unavailable_server(uint16_t port = 0) -> future> { @@ -318,6 +342,16 @@ public: co_await listen(); } + future<> start(server_socket socket) { + auto [server, addr] = co_await new_http_server( + [this](auto& r) { + set_routes(r); + }, + std::move(socket)); + _http_server = std::move(server); + _port = addr.port(); + } + future<> stop() { co_await _http_server->stop(); } @@ -342,11 +376,8 @@ private: future<> listen() { co_await try_on_loopback_address([this](auto host) -> future<> { auto [s, addr] = co_await new_http_server( - [this](routes& r) { - auto ann = [this](std::unique_ptr req, std::unique_ptr rep) -> future> { - return handle_request(std::move(req), std::move(rep)); - }; - r.add(operation_type::POST, url(INDEXES_PATH).remainder("path"), new function_handler(ann, "json")); + [this](auto& r) { + set_routes(r); }, host.c_str(), _port); _http_server = std::move(s); @@ -355,7 +386,7 @@ private: }); } - future> handle_request(std::unique_ptr req, std::unique_ptr rep) { + future> handle_ann_request(std::unique_ptr req, std::unique_ptr rep) { ann_req r{.path = INDEXES_PATH + "/" + req->get_path_param("path"), .body = co_await util::read_entire_stream_contiguous(*req->content_stream)}; _ann_requests.push_back(std::move(r)); rep->set_status(_next_ann_response.status); @@ -363,6 +394,27 @@ private: co_return rep; } + future> handle_status_request(std::unique_ptr req, std::unique_ptr rep) { + rep->set_status(status_type::ok); + rep->write_body("json", "SERVING"); + co_return rep; + } + + void set_routes(routes& r) { + r.add(operation_type::POST, url(INDEXES_PATH).remainder("path"), + new function_handler( + [this](std::unique_ptr req, std::unique_ptr rep) -> future> { + return handle_ann_request(std::move(req), std::move(rep)); + }, + "json")); + r.add(operation_type::GET, url("/api/v1/status").remainder("status"), + new function_handler( + [this](std::unique_ptr req, std::unique_ptr rep) -> future> { + return handle_status_request(std::move(req), std::move(rep)); + }, + "json")); + } + uint16_t _port = 0; sstring _host; std::unique_ptr _http_server; From 009d3ea2784a1196f9ac8f30a14e1c0b4115b326 Mon Sep 17 00:00:00 2001 From: Karol Nowacki Date: Fri, 14 Nov 2025 07:38:01 +0100 Subject: [PATCH 4/7] vector_search: Add backoff for failed clients Introduces logic to mark clients that fail to answer an ANN request as "down". Down clients are omitted from further requests until they successfully respond to a health check. Health checks for down clients are performed in the background using the `status` endpoint, with an exponential backoff retry policy ranging from 100ms to 20s. --- test/vector_search/test_config.yaml | 1 + .../vector_search/vector_store_client_test.cc | 119 ++++++++++++++++-- vector_search/client.cc | 60 ++++++++- vector_search/client.hh | 11 +- vector_search/vector_store_client.cc | 2 +- 5 files changed, 178 insertions(+), 15 deletions(-) diff --git a/test/vector_search/test_config.yaml b/test/vector_search/test_config.yaml index 0d4cfde39a..be386063dc 100644 --- a/test/vector_search/test_config.yaml +++ b/test/vector_search/test_config.yaml @@ -1,2 +1,3 @@ extra_scylla_cmdline_options: - '--reactor-backend=linux-aio' + - '--fail-on-abandoned-failed-futures=true' diff --git a/test/vector_search/vector_store_client_test.cc b/test/vector_search/vector_store_client_test.cc index 7e07cc3146..248a8e8bcf 100644 --- a/test/vector_search/vector_store_client_test.cc +++ b/test/vector_search/vector_store_client_test.cc @@ -7,6 +7,7 @@ */ #include "seastar/core/future.hh" +#include "seastar/core/when_all.hh" #include "vector_search/vector_store_client.hh" #include "db/config.hh" #include "exceptions/exceptions.hh" @@ -265,7 +266,7 @@ public: } size_t connections() const { - return _connections; + return _connections.size(); } future take_socket() { @@ -276,6 +277,18 @@ public: co_return std::move(_socket); } + void auto_shutdown_off() { + _auto_shutdown = false; + } + + future<> shutdown_all_and_clear() { + std::vector tmp; + std::swap(tmp, _connections); + for (auto& conn : tmp) { + co_await shutdown(conn.connection); + } + } + private: future<> listen() { co_await try_on_loopback_address([this](auto host) -> future<> { @@ -291,23 +304,30 @@ private: future<> run() { while (_running) { try { - auto s = co_await _socket.accept(); - _connections++; - s.connection.shutdown_output(); - s.connection.shutdown_input(); - co_await s.connection.wait_input_shutdown(); + _connections.push_back(co_await _socket.accept()); + if (_auto_shutdown) { + co_await shutdown(_connections.back().connection); + } } catch (...) { break; } } } + future<> shutdown(connected_socket& cs) { + cs.shutdown_output(); + cs.shutdown_input(); + co_await cs.wait_input_shutdown(); + } + + seastar::server_socket _socket; seastar::gate _gate; uint16_t _port; sstring _host; - size_t _connections = 0; + std::vector _connections; bool _running = true; + bool _auto_shutdown = true; }; auto make_unavailable_server(uint16_t port = 0) -> future> { @@ -1080,3 +1100,88 @@ SEASTAR_TEST_CASE(vector_store_client_test_paging_warning_doesnt_show_when_limit return s1->stop(); }); } + +SEASTAR_TEST_CASE(vector_store_client_node_recovery_after_backoff) { + auto unavail_server = co_await make_unavailable_server(); + std::unique_ptr avail_server; + constexpr auto HOSTNAME = "server.node"; + + auto cfg = cql_test_config(); + cfg.db_config->vector_store_primary_uri.set(format("http://{}:{}", HOSTNAME, unavail_server->port())); + co_await do_with_cql_env( + [&](cql_test_env& env) -> future<> { + auto as = abort_source_timeout(); + auto schema = co_await create_test_table(env, "ks", "idx"); + auto& vs = env.local_qp().vector_store_client(); + configure(vs).with_dns({{HOSTNAME, std::vector{unavail_server->host()}}}); + vs.start_background_tasks(); + + // Send request to unavailable node - this will put the node to backoff. + auto result = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); + + BOOST_CHECK(!result); + BOOST_CHECK(std::holds_alternative(result.error())); + + // Replace the unavailable server with an available one. + avail_server = std::make_unique(); + co_await avail_server->start(co_await unavail_server->take_socket()); + + // Wait until node is taken out of the backoff state and used for requests again. + BOOST_CHECK(co_await repeat_until([&]() -> future { + auto result = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); + co_return result.has_value(); + })); + }, + cfg) + .finally(coroutine::lambda([&] -> future<> { + co_await unavail_server->stop(); + if (avail_server) { + co_await avail_server->stop(); + } + })); +} + +SEASTAR_TEST_CASE(vector_store_client_single_status_check_after_concurrent_failures) { + using keys = std::expected; + + auto unavail_s = co_await make_unavailable_server(); + auto cfg = cql_test_config(); + cfg.db_config->vector_store_primary_uri.set(format("http://unavail.node:{}", unavail_s->port())); + co_await do_with_cql_env( + [&](cql_test_env& env) -> future<> { + std::vector> requests; + unavail_s->auto_shutdown_off(); + constexpr auto NUM_OF_PARALLEL_REQUESTS = 50; + auto as = abort_source_timeout(); + auto schema = co_await create_test_table(env, "ks", "idx"); + auto& vs = env.local_qp().vector_store_client(); + configure(vs).with_dns({{"unavail.node", std::vector{unavail_s->host()}}}); + vs.start_background_tasks(); + + for (int i = 0; i < NUM_OF_PARALLEL_REQUESTS; ++i) { + requests.push_back(vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset())); + } + // Wait for all requests to establish a connection with the server. + co_await repeat_until([&unavail_s]() -> future { + co_return unavail_s->connections() == NUM_OF_PARALLEL_REQUESTS; + }); + // Shutdown all connections, causing all requests to fail. + // The number of connections will drop to zero. + co_await unavail_s->shutdown_all_and_clear(); + // Wait for all requests to complete. + co_await when_all(requests.begin(), requests.end()); + + // After the backoff period, a single status check is expected to verify node recovery. + // The test server keeps the subsequent status check connection open (auto_shutdown_off()). + // This prevents the client's backoff mechanism from sending another status request + // while the first one is pending, ensuring that exactly one new connection is made. + // This makes the test assertion deterministic. + BOOST_CHECK(co_await repeat_until([&]() -> future { + co_return unavail_s->connections() == 1; + })); + }, + cfg) + .finally(coroutine::lambda([&] -> future<> { + co_await unavail_s->stop(); + })); +} diff --git a/vector_search/client.cc b/vector_search/client.cc index 6a202751d1..984eb87da5 100644 --- a/vector_search/client.cc +++ b/vector_search/client.cc @@ -8,11 +8,13 @@ #include "client.hh" #include "utils/exceptions.hh" +#include "utils/exponential_backoff_retry.hh" #include #include #include #include #include +#include using namespace seastar; using namespace std::chrono_literals; @@ -60,6 +62,9 @@ future map_err(std::exception_ptr& err) { co_return client::request_error{}; // unreachable } +auto constexpr BACKOFF_RETRY_MIN_TIME = 100ms; +auto constexpr BACKOFF_RETRY_MAX_TIME = 20s; + } // namespace client::client(endpoint_type endpoint_) @@ -69,6 +74,23 @@ client::client(endpoint_type endpoint_) seastar::future client::request( seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as) { + if (is_checking_status_in_progress()) { + co_return std::unexpected(service_unavailable_error{}); + } + + auto f = co_await seastar::coroutine::as_future(request_impl(method, std::move(path), std::move(content), std::nullopt, as)); + if (f.failed()) { + auto err = f.get_exception(); + if (is_server_unavailable(err)) { + handle_server_unavailable(); + } + co_return std::unexpected{co_await map_err(err)}; + } + co_return co_await std::move(f); +} + +seastar::future client::request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional content, + std::optional&& expected_status, seastar::abort_source& as) { auto req = http::request::make(method, _endpoint.host, std::move(path)); if (content) { @@ -80,16 +102,42 @@ seastar::future client::request( resp.content = co_await util::read_entire_stream(body); }; - auto f = co_await seastar::coroutine::as_future(_http_client.make_request(std::move(req), std::move(handler), std::nullopt, &as)); - if (f.failed()) { - auto err = f.get_exception(); - co_return std::unexpected(co_await map_err(err)); - } + co_await _http_client.make_request(std::move(req), std::move(handler), std::move(expected_status), &as); co_return resp; } +seastar::future client::check_status() { + auto f = co_await coroutine::as_future(request_impl(httpd::operation_type::GET, "/api/v1/status", std::nullopt, http::reply::status_type::ok, _as)); + auto ret = !f.failed(); + f.ignore_ready_future(); + co_return ret; +} + seastar::future<> client::close() { - return _http_client.close(); + _as.request_abort(); + co_await std::exchange(_checking_status_future, make_ready_future()); + co_await _http_client.close(); +} + +void client::handle_server_unavailable() { + if (!is_checking_status_in_progress()) { + _checking_status_future = run_checking_status(); + } +} + +seastar::future<> client::run_checking_status() { + struct stop_retry {}; + co_await exponential_backoff_retry::do_until_value(BACKOFF_RETRY_MIN_TIME, BACKOFF_RETRY_MAX_TIME, _as, [this] -> future> { + auto success = co_await check_status(); + if (success) { + co_return stop_retry{}; + } + co_return std::nullopt; + }); +} + +bool client::is_checking_status_in_progress() const { + return !_checking_status_future.available(); } } // namespace vector_search diff --git a/vector_search/client.hh b/vector_search/client.hh index 6305b86334..bbee999419 100644 --- a/vector_search/client.hh +++ b/vector_search/client.hh @@ -48,9 +48,18 @@ public: } private: + seastar::future request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional content, + std::optional&& expected, seastar::abort_source& as); + + seastar::future check_status(); + void handle_server_unavailable(); + seastar::future<> run_checking_status(); + bool is_checking_status_in_progress() const; + endpoint_type _endpoint; seastar::http::experimental::client _http_client; + seastar::future<> _checking_status_future = seastar::make_ready_future(); + seastar::abort_source _as; }; - } // namespace vector_search diff --git a/vector_search/vector_store_client.cc b/vector_search/vector_store_client.cc index 7c4a2fc450..4c98cecb6b 100644 --- a/vector_search/vector_store_client.cc +++ b/vector_search/vector_store_client.cc @@ -346,8 +346,8 @@ struct vector_store_client::impl { auto& client = old_clients[it]; if (client && client.owned()) { auto client_cloned = client; + client = nullptr; co_await client_cloned->close(); - client_cloned = nullptr; } } std::erase_if(old_clients, [](auto const& client) { From 940ed239b28497231f7ffe76a15ccbb65fe54880 Mon Sep 17 00:00:00 2001 From: Karol Nowacki Date: Wed, 22 Oct 2025 09:12:19 +0200 Subject: [PATCH 5/7] vector_search: Extract client management into dedicated class Refactor client list management by moving it to separate files (clients.cc/clients.hh) to improve code organization and modularity. --- configure.py | 3 +- vector_search/CMakeLists.txt | 3 +- vector_search/clients.cc | 166 ++++++++++++++++++++++++ vector_search/clients.hh | 65 ++++++++++ vector_search/uri.hh | 20 +++ vector_search/vector_store_client.cc | 185 +++------------------------ 6 files changed, 276 insertions(+), 166 deletions(-) create mode 100644 vector_search/clients.cc create mode 100644 vector_search/clients.hh create mode 100644 vector_search/uri.hh diff --git a/configure.py b/configure.py index ba351de212..defc48f8e7 100755 --- a/configure.py +++ b/configure.py @@ -1265,7 +1265,8 @@ scylla_core = (['message/messaging_service.cc', 'utils/disk_space_monitor.cc', 'vector_search/vector_store_client.cc', 'vector_search/dns.cc', - 'vector_search/client.cc' + 'vector_search/client.cc', + 'vector_search/clients.cc' ] + [Antlr3Grammar('cql3/Cql.g')] \ + scylla_raft_core ) diff --git a/vector_search/CMakeLists.txt b/vector_search/CMakeLists.txt index 620e2b23fa..10b89c9cd7 100644 --- a/vector_search/CMakeLists.txt +++ b/vector_search/CMakeLists.txt @@ -3,7 +3,8 @@ target_sources(vector_search PRIVATE vector_store_client.cc dns.cc - client.cc) + client.cc + clients.cc) target_link_libraries(vector_search PUBLIC Seastar::seastar diff --git a/vector_search/clients.cc b/vector_search/clients.cc new file mode 100644 index 0000000000..5d421f31f9 --- /dev/null +++ b/vector_search/clients.cc @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "clients.hh" +#include "load_balancer.hh" +#include "utils/exceptions.hh" +#include +#include +#include +#include + +using namespace seastar; + +namespace vector_search { +namespace { + +/// Timeout for waiting for a new client to be available +constexpr auto WAIT_FOR_CLIENT_TIMEOUT = std::chrono::seconds(5); + +/// The number of times to retry a request if all nodes fail with an unavailable error. +constexpr auto REQUEST_RETRIES = 3; + +static thread_local auto random_engine = std::default_random_engine(std::random_device{}()); + +/// Wait for a condition variable to be signaled or timeout. +auto wait_for_signal(condition_variable& cv, lowres_clock::time_point timeout) -> future { + auto result = co_await coroutine::as_future(cv.wait(timeout)); + if (result.failed()) { + auto err = result.get_exception(); + if (try_catch(err) != nullptr) { + co_return; + } + co_await coroutine::return_exception_ptr(std::move(err)); + } + co_return; +} + +template +auto make_unexpected(const auto& err) { + return std::unexpected{std::visit( + [](auto&& err) { + return Variant{err}; + }, + err)}; +}; + +} // namespace + +clients::clients(refresh_trigger_callback trigger_refresh) + : _producer([&]() -> future { + return try_with_gate(_gate, [this] -> future { + _trigger_refresh(); + co_await wait_for_signal(_refresh_cv, lowres_clock::now() + _timeout); + co_return _clients; + }); + }) + , _trigger_refresh(std::move(trigger_refresh)) + , _timeout(WAIT_FOR_CLIENT_TIMEOUT) { +} + +future clients::request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as) { + + for (auto retries = 0; retries < REQUEST_RETRIES; ++retries) { + auto clients = co_await get_clients(as); + if (!clients) { + co_return make_unexpected(clients.error()); + } + + load_balancer lb(std::move(*clients), random_engine); + while (auto client = lb.next()) { + auto result = co_await client->request(method, path, content, as); + if (result) { + co_return std::move(result.value()); + } + if (!result && std::holds_alternative(result.error())) { + // try next client + continue; + } + co_return make_unexpected(result.error()); + } + _trigger_refresh(); + } + + co_return std::unexpected{service_unavailable_error{}}; +} + +/// Get the current http client or wait for a new one to be available. +future clients::get_clients(abort_source& as) { + if (!_clients.empty()) { + co_return _clients; + } + + auto current_clients = co_await coroutine::as_future(_producer(as)); + + if (current_clients.failed()) { + auto err = current_clients.get_exception(); + if (as.abort_requested()) { + co_return std::unexpected{aborted_error{}}; + } + co_await coroutine::return_exception_ptr(std::move(err)); + } + auto clients = co_await std::move(current_clients); + if (clients.empty()) { + co_return std::unexpected{addr_unavailable_error{}}; + } + co_return clients; +} + +future<> clients::handle_changed(const std::vector& uris, const dns::host_address_map& addrs) { + clear(); + for (const auto& uri : uris) { + auto it = addrs.find(uri.host); + if (it != addrs.end()) { + for (const auto& addr : it->second) { + _clients.push_back(make_lw_shared(client::endpoint_type{uri.host, uri.port, addr})); + } + } + } + + _refresh_cv.broadcast(); + co_await close_old_clients(); +} + +future<> clients::stop() { + _refresh_cv.signal(); + co_await _gate.close(); + co_await close_clients(); + co_await close_old_clients(); +} + +void clients::clear() { + _old_clients.insert(_old_clients.end(), std::make_move_iterator(_clients.begin()), std::make_move_iterator(_clients.end())); + _clients.clear(); +} + +future<> clients::close_clients() { + for (auto& client : _clients) { + co_await client->close(); + } + _clients.clear(); +} + +future<> clients::close_old_clients() { + // iterate over old clients and close them. There is a co_await in the loop + // so we need to use [] accessor and copying clients to avoid dangling references of iterators. + // NOLINTNEXTLINE(modernize-loop-convert) + for (auto it = 0U; it < _old_clients.size(); ++it) { + auto& client = _old_clients[it]; + if (client && client.owned()) { + auto client_cloned = client; + client = nullptr; + co_await client_cloned->close(); + } + } + std::erase_if(_old_clients, [](auto const& client) { + return !client; + }); +} + +} // namespace vector_search diff --git a/vector_search/clients.hh b/vector_search/clients.hh new file mode 100644 index 0000000000..731ca64406 --- /dev/null +++ b/vector_search/clients.hh @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "client.hh" +#include "dns.hh" +#include "uri.hh" +#include "utils/sequential_producer.hh" +#include "vector_search/error.hh" +#include +#include +#include +#include +#include + +namespace vector_search { + +class clients { +public: + using refresh_trigger_callback = std::function; + + using request_error = std::variant; + using request_result = std::expected; + + using clients_vec = std::vector>; + using get_clients_error = std::variant; + using get_clients_result = std::expected; + + explicit clients(refresh_trigger_callback trigger_refresh); + + seastar::future request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); + + seastar::future<> handle_changed(const std::vector& uris, const dns::host_address_map& addrs); + + seastar::future<> stop(); + + void clear(); + + seastar::future get_clients(seastar::abort_source& as); + + void timeout(std::chrono::milliseconds timeout) { + _timeout = timeout; + } + +private: + seastar::future<> close_clients(); + seastar::future<> close_old_clients(); + + clients_vec _clients; + sequential_producer _producer; + refresh_trigger_callback _trigger_refresh; + seastar::gate _gate; + seastar::condition_variable _refresh_cv; + std::chrono::milliseconds _timeout; + clients_vec _old_clients; +}; + +} // namespace vector_search diff --git a/vector_search/uri.hh b/vector_search/uri.hh new file mode 100644 index 0000000000..1884df63c8 --- /dev/null +++ b/vector_search/uri.hh @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once +#include +#include + +namespace vector_search { + +struct uri { + seastar::sstring host; + std::uint16_t port; +}; + +} // namespace vector_search diff --git a/vector_search/vector_store_client.cc b/vector_search/vector_store_client.cc index 4c98cecb6b..85b1f83fef 100644 --- a/vector_search/vector_store_client.cc +++ b/vector_search/vector_store_client.cc @@ -9,7 +9,8 @@ #include "vector_store_client.hh" #include "dns.hh" #include "load_balancer.hh" -#include "client.hh" +#include "clients.hh" +#include "uri.hh" #include "db/config.hh" #include "exceptions/exceptions.hh" #include "utils/sequential_producer.hh" @@ -54,14 +55,7 @@ using port_number = vector_search::vector_store_client::port_number; using primary_key = vector_search::primary_key; using primary_keys = vector_search::vector_store_client::primary_keys; using service_reply_format_error = vector_search::vector_store_client::service_reply_format_error; -using tcp_keepalive_params = net::tcp_keepalive_params; -using time_point = lowres_clock::time_point; - -/// Timeout for waiting for a new client to be available -constexpr auto WAIT_FOR_CLIENT_TIMEOUT = std::chrono::seconds(5); - -/// The number of times to retry an /ann request if all nodes fail with a system error. -constexpr auto ANN_RETRIES = 3; +using uri = vector_search::uri; // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) logging::logger vslogger("vector_store_client"); @@ -77,11 +71,6 @@ auto parse_port(std::string const& port_txt) -> std::optional { return port; } -struct uri { - host_name host; - port_number port; -}; - auto parse_service_uri(std::string_view uri_) -> std::optional { constexpr auto URI_REGEX = R"(^http:\/\/([a-z0-9._-]+):([0-9]+)$)"; auto const uri_regex = std::regex(URI_REGEX); @@ -99,20 +88,6 @@ auto parse_service_uri(std::string_view uri_) -> std::optional { return {{host, *port}}; } - -/// Wait for a condition variable to be signaled or timeout. -auto wait_for_signal(condition_variable& cv, time_point timeout) -> future { - auto result = co_await coroutine::as_future(cv.wait(timeout)); - if (result.failed()) { - auto err = result.get_exception(); - if (try_catch(err) != nullptr) { - co_return; - } - co_await coroutine::return_exception_ptr(std::move(err)); - } - co_return; -} - auto get_key_column_value(const rjson::value& item, std::size_t idx, const column_definition& column) -> std::expected { auto const& column_name = column.name_as_text(); auto const* keys_obj = rjson::find(item, column_name); @@ -244,34 +219,17 @@ std::vector get_hosts(const std::vector& uris) { return ret; } -template -auto make_unexpected(const auto& err) { - return std::unexpected{std::visit( - [](auto&& err) { - return Variant{err}; - }, - err)}; -}; - } // namespace namespace vector_search { struct vector_store_client::impl { - - using clients_type = std::vector>; - utils::observer uri_observer; - clients_type current_clients; - clients_type old_clients; std::vector _uris; - gate client_producer_gate; - condition_variable refresh_client_cv; - milliseconds wait_for_client_timeout = WAIT_FOR_CLIENT_TIMEOUT; - sequential_producer clients_producer; dns dns; uint64_t dns_refreshes = 0; seastar::metrics::metric_groups _metrics; + clients _clients; impl(utils::config_file::named_value cfg) @@ -284,133 +242,35 @@ struct vector_store_client::impl { } })) , _uris(parse_uris(cfg())) - , clients_producer([&]() -> future { - return try_with_gate(client_producer_gate, [this] -> future { - dns.trigger_refresh(); - co_await wait_for_signal(refresh_client_cv, lowres_clock::now() + wait_for_client_timeout); - co_return current_clients; - }); - }) - , dns(vslogger, get_hosts(_uris), [this](auto const& addrs) -> future<> { - co_await handle_addresses_changed(addrs); - }, dns_refreshes) { + + , dns( + vslogger, get_hosts(_uris), + [this](auto const& addrs) -> future<> { + co_await handle_addresses_changed(addrs); + }, + dns_refreshes) + , _clients([this]() { + dns.trigger_refresh(); + }) { _metrics.add_group("vector_store", {seastar::metrics::make_gauge("dns_refreshes", seastar::metrics::description("Number of DNS refreshes"), [this] { return dns_refreshes; }).aggregate({seastar::metrics::shard_label})}); } void handle_uris_changed(std::vector uris) { - clear_current_clients(); + _clients.clear(); _uris = std::move(uris); dns.hosts(get_hosts(_uris)); } - auto handle_addresses_changed(const dns::host_address_map& addrs) -> future<> { - clear_current_clients(); - for (const auto& uri : _uris) { - auto it = addrs.find(uri.host); - if (it != addrs.end()) { - for (const auto& addr : it->second) { - current_clients.push_back(make_lw_shared(client::endpoint_type{uri.host, uri.port, addr})); - } - } - } - - refresh_client_cv.broadcast(); - co_await cleanup_old_clients(); + future<> handle_addresses_changed(const dns::host_address_map& addrs) { + co_await _clients.handle_changed(_uris, addrs); } auto is_disabled() const -> bool { return _uris.empty(); } - void clear_current_clients() { - old_clients.insert(old_clients.end(), std::make_move_iterator(current_clients.begin()), std::make_move_iterator(current_clients.end())); - current_clients.clear(); - } - - /// Cleanup current clients - auto cleanup_current_clients() -> future<> { - for (auto& client : current_clients) { - co_await client->close(); - } - current_clients.clear(); - } - - /// Cleanup old clients that are no longer used. - auto cleanup_old_clients() -> future<> { - // iterate over old clients and close them. There is a co_await in the loop - // so we need to use [] accessor and copying clients to avoid dangling references of iterators. - // NOLINTNEXTLINE(modernize-loop-convert) - for (auto it = 0U; it < old_clients.size(); ++it) { - auto& client = old_clients[it]; - if (client && client.owned()) { - auto client_cloned = client; - client = nullptr; - co_await client_cloned->close(); - } - } - std::erase_if(old_clients, [](auto const& client) { - return !client; - }); - } - - using get_client_error = std::variant; - - /// Get the current http client or wait for a new one to be available. - auto get_clients(abort_source& as) -> future> { - if (is_disabled()) { - co_return std::unexpected{disabled{}}; - } - if (!current_clients.empty()) { - co_return current_clients; - } - - auto current_clients = co_await coroutine::as_future(clients_producer(as)); - - if (current_clients.failed()) { - auto err = current_clients.get_exception(); - if (as.abort_requested()) { - co_return std::unexpected{aborted{}}; - } - co_await coroutine::return_exception_ptr(std::move(err)); - } - auto clients = co_await std::move(current_clients); - if (clients.empty()) { - co_return std::unexpected{addr_unavailable{}}; - } - co_return clients; - } - - using make_request_error = std::variant; - - auto make_request(operation_type method, http_path path, std::optional content, abort_source& as) - -> future> { - for (auto retries = 0; retries < ANN_RETRIES; ++retries) { - auto clients = co_await get_clients(as); - if (!clients) { - co_return make_unexpected(clients.error()); - } - - load_balancer lb(std::move(*clients), random_engine); - while (auto client = lb.next()) { - auto result = co_await client->request(method, path, content, as); - if (result) { - co_return std::move(result.value()); - } - if (std::holds_alternative(result.error())) { - // try next client - continue; - } - co_return make_unexpected(result.error()); - } - - dns.trigger_refresh(); - } - - co_return std::unexpected{service_unavailable{}}; - } - auto ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, abort_source& as) -> future> { if (is_disabled()) { @@ -421,7 +281,7 @@ struct vector_store_client::impl { auto path = format("/api/v1/indexes/{}/{}/ann", keyspace, name); auto content = write_ann_json(std::move(vs_vector), limit); - auto resp = co_await make_request(operation_type::POST, std::move(path), std::move(content), as); + auto resp = co_await _clients.request(operation_type::POST, std::move(path), std::move(content), as); if (!resp) { co_return std::unexpected{std::visit( [](auto&& err) { @@ -457,11 +317,8 @@ void vector_store_client::start_background_tasks() { } auto vector_store_client::stop() -> future<> { - _impl->refresh_client_cv.signal(); - co_await _impl->client_producer_gate.close(); + co_await _impl->_clients.stop(); co_await _impl->dns.stop(); - co_await _impl->cleanup_old_clients(); - co_await _impl->cleanup_current_clients(); } auto vector_store_client::is_disabled() const -> bool { @@ -478,7 +335,7 @@ void vector_store_client_tester::set_dns_refresh_interval(vector_store_client& v } void vector_store_client_tester::set_wait_for_client_timeout(vector_store_client& vsc, std::chrono::milliseconds timeout) { - vsc._impl->wait_for_client_timeout = timeout; + vsc._impl->_clients.timeout(timeout); } void vector_store_client_tester::set_dns_resolver(vector_store_client& vsc, std::function>(sstring const&)> resolver) { @@ -490,7 +347,7 @@ void vector_store_client_tester::trigger_dns_resolver(vector_store_client& vsc) } auto vector_store_client_tester::resolve_hostname(vector_store_client& vsc, abort_source& as) -> future> { - auto clients = co_await vsc._impl->get_clients(as); + auto clients = co_await vsc._impl->_clients.get_clients(as); std::vector ret; if (!clients) { co_return ret; From 097c0f959228e7019f3894a5bbe40bfce72c510d Mon Sep 17 00:00:00 2001 From: Karol Nowacki Date: Mon, 3 Nov 2025 13:17:19 +0100 Subject: [PATCH 6/7] vector_search: Report status check exception via on_internal_error_noexcept This exception should only occur due to internal errors, not client or external issues. If triggered, it indicates an internal problem. Therefore, we notify about this exception using on_internal_error_noexcept. --- vector_search/client.cc | 27 +++++++++++++++++++-------- vector_search/client.hh | 5 +++-- vector_search/clients.cc | 7 ++++--- vector_search/clients.hh | 4 +++- vector_search/vector_store_client.cc | 2 +- 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/vector_search/client.cc b/vector_search/client.cc index 984eb87da5..318a942895 100644 --- a/vector_search/client.cc +++ b/vector_search/client.cc @@ -14,7 +14,9 @@ #include #include #include +#include #include +#include using namespace seastar; using namespace std::chrono_literals; @@ -67,9 +69,10 @@ auto constexpr BACKOFF_RETRY_MAX_TIME = 20s; } // namespace -client::client(endpoint_type endpoint_) +client::client(logging::logger& logger, endpoint_type endpoint_) : _endpoint(std::move(endpoint_)) - , _http_client(std::make_unique(socket_address(endpoint_.ip, endpoint_.port))) { + , _http_client(std::make_unique(socket_address(endpoint_.ip, endpoint_.port))) + , _logger(logger) { } seastar::future client::request( @@ -127,13 +130,21 @@ void client::handle_server_unavailable() { seastar::future<> client::run_checking_status() { struct stop_retry {}; - co_await exponential_backoff_retry::do_until_value(BACKOFF_RETRY_MIN_TIME, BACKOFF_RETRY_MAX_TIME, _as, [this] -> future> { - auto success = co_await check_status(); - if (success) { - co_return stop_retry{}; + auto f = co_await coroutine::as_future( + exponential_backoff_retry::do_until_value(BACKOFF_RETRY_MIN_TIME, BACKOFF_RETRY_MAX_TIME, _as, [this] -> future> { + auto success = co_await check_status(); + if (success) { + co_return stop_retry{}; + } + co_return std::nullopt; + })); + if (f.failed()) { + if (auto err = f.get_exception(); !is_request_aborted(err)) { + // Report internal error for exceptions other than abort + on_internal_error_noexcept(_logger, fmt::format("exception while checking status: {}", err)); } - co_return std::nullopt; - }); + } + co_return; } bool client::is_checking_status_in_progress() const { diff --git a/vector_search/client.hh b/vector_search/client.hh index bbee999419..7ace43d8a2 100644 --- a/vector_search/client.hh +++ b/vector_search/client.hh @@ -9,6 +9,7 @@ #pragma once #include "error.hh" +#include "utils/log.hh" #include #include #include @@ -36,7 +37,7 @@ public: using request_error = std::variant; using request_result = std::expected; - explicit client(endpoint_type endpoint_); + explicit client(logging::logger& logger, endpoint_type endpoint_); seastar::future request( seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); @@ -50,7 +51,6 @@ public: private: seastar::future request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional content, std::optional&& expected, seastar::abort_source& as); - seastar::future check_status(); void handle_server_unavailable(); seastar::future<> run_checking_status(); @@ -60,6 +60,7 @@ private: seastar::http::experimental::client _http_client; seastar::future<> _checking_status_future = seastar::make_ready_future(); seastar::abort_source _as; + logging::logger& _logger; }; } // namespace vector_search diff --git a/vector_search/clients.cc b/vector_search/clients.cc index 5d421f31f9..82cd0785ea 100644 --- a/vector_search/clients.cc +++ b/vector_search/clients.cc @@ -51,7 +51,7 @@ auto make_unexpected(const auto& err) { } // namespace -clients::clients(refresh_trigger_callback trigger_refresh) +clients::clients(logging::logger& logger, refresh_trigger_callback trigger_refresh) : _producer([&]() -> future { return try_with_gate(_gate, [this] -> future { _trigger_refresh(); @@ -60,7 +60,8 @@ clients::clients(refresh_trigger_callback trigger_refresh) }); }) , _trigger_refresh(std::move(trigger_refresh)) - , _timeout(WAIT_FOR_CLIENT_TIMEOUT) { + , _timeout(WAIT_FOR_CLIENT_TIMEOUT) + , _logger(logger) { } future clients::request( @@ -118,7 +119,7 @@ future<> clients::handle_changed(const std::vector& uris, const dns::host_a auto it = addrs.find(uri.host); if (it != addrs.end()) { for (const auto& addr : it->second) { - _clients.push_back(make_lw_shared(client::endpoint_type{uri.host, uri.port, addr})); + _clients.push_back(make_lw_shared(_logger, client::endpoint_type{uri.host, uri.port, addr})); } } } diff --git a/vector_search/clients.hh b/vector_search/clients.hh index 731ca64406..2fc2b6bf16 100644 --- a/vector_search/clients.hh +++ b/vector_search/clients.hh @@ -13,6 +13,7 @@ #include "uri.hh" #include "utils/sequential_producer.hh" #include "vector_search/error.hh" +#include "utils/log.hh" #include #include #include @@ -32,7 +33,7 @@ public: using get_clients_error = std::variant; using get_clients_result = std::expected; - explicit clients(refresh_trigger_callback trigger_refresh); + explicit clients(logging::logger& logger, refresh_trigger_callback trigger_refresh); seastar::future request( seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); @@ -60,6 +61,7 @@ private: seastar::condition_variable _refresh_cv; std::chrono::milliseconds _timeout; clients_vec _old_clients; + logging::logger& _logger; }; } // namespace vector_search diff --git a/vector_search/vector_store_client.cc b/vector_search/vector_store_client.cc index 85b1f83fef..9096ed7ebf 100644 --- a/vector_search/vector_store_client.cc +++ b/vector_search/vector_store_client.cc @@ -249,7 +249,7 @@ struct vector_store_client::impl { co_await handle_addresses_changed(addrs); }, dns_refreshes) - , _clients([this]() { + , _clients(vslogger, [this]() { dns.trigger_refresh(); }) { _metrics.add_group("vector_store", {seastar::metrics::make_gauge("dns_refreshes", seastar::metrics::description("Number of DNS refreshes"), [this] { From 1972fb315b9dd841c01bf31568a019fbc5508d90 Mon Sep 17 00:00:00 2001 From: Karol Nowacki Date: Thu, 13 Nov 2025 09:26:37 +0100 Subject: [PATCH 7/7] vector_search: Set max backoff delay to 2x read request timeout The maximum backoff delay for status checking now depends on the `read_request_timeout_in_ms` configuration option. The delay is set to twice the value of this parameter. --- .../vector_search/vector_store_client_test.cc | 71 ++++++++++++++++--- vector_search/client.cc | 13 ++-- vector_search/client.hh | 6 +- vector_search/clients.cc | 7 +- vector_search/clients.hh | 4 +- vector_search/vector_store_client.cc | 13 ++-- 6 files changed, 89 insertions(+), 25 deletions(-) diff --git a/test/vector_search/vector_store_client_test.cc b/test/vector_search/vector_store_client_test.cc index 248a8e8bcf..e332359ce3 100644 --- a/test/vector_search/vector_store_client_test.cc +++ b/test/vector_search/vector_store_client_test.cc @@ -238,6 +238,11 @@ public: }; class unavailable_server { + struct Connection { + lowres_clock::time_point timestamp; + connected_socket socket; + }; + public: explicit unavailable_server(uint16_t port) : _port(port) { @@ -265,8 +270,8 @@ public: return _port; } - size_t connections() const { - return _connections.size(); + const std::vector& connections() const { + return _connections; } future take_socket() { @@ -282,10 +287,10 @@ public: } future<> shutdown_all_and_clear() { - std::vector tmp; + std::vector tmp; std::swap(tmp, _connections); for (auto& conn : tmp) { - co_await shutdown(conn.connection); + co_await shutdown(conn.socket); } } @@ -304,9 +309,10 @@ private: future<> run() { while (_running) { try { - _connections.push_back(co_await _socket.accept()); + auto result = co_await _socket.accept(); + _connections.push_back(Connection{.timestamp = lowres_clock::now(), .socket = std::move(result.connection)}); if (_auto_shutdown) { - co_await shutdown(_connections.back().connection); + co_await shutdown(_connections.back().socket); } } catch (...) { break; @@ -325,7 +331,7 @@ private: seastar::gate _gate; uint16_t _port; sstring _host; - std::vector _connections; + std::vector _connections; bool _running = true; bool _auto_shutdown = true; }; @@ -895,7 +901,7 @@ SEASTAR_TEST_CASE(vector_store_client_multiple_ips_high_availability) { // repeat the ANN query until the unavailable server is queried. BOOST_CHECK(co_await repeat_until([&]() -> future { keys = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); - co_return unavail_s->connections() > 1; + co_return unavail_s->connections().size() > 1; })); // The query is successful because the client falls back to the available server @@ -959,7 +965,7 @@ SEASTAR_TEST_CASE(vector_store_client_multiple_uris_high_availability) { // repeat the ANN query until the unavailable server is queried. BOOST_CHECK(co_await repeat_until([&]() -> future { keys = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); - co_return unavail_s->connections() > 1; + co_return unavail_s->connections().size() > 1; })); // The query is successful because the client falls back to the available server @@ -1163,7 +1169,7 @@ SEASTAR_TEST_CASE(vector_store_client_single_status_check_after_concurrent_failu } // Wait for all requests to establish a connection with the server. co_await repeat_until([&unavail_s]() -> future { - co_return unavail_s->connections() == NUM_OF_PARALLEL_REQUESTS; + co_return unavail_s->connections().size() == NUM_OF_PARALLEL_REQUESTS; }); // Shutdown all connections, causing all requests to fail. // The number of connections will drop to zero. @@ -1177,7 +1183,7 @@ SEASTAR_TEST_CASE(vector_store_client_single_status_check_after_concurrent_failu // while the first one is pending, ensuring that exactly one new connection is made. // This makes the test assertion deterministic. BOOST_CHECK(co_await repeat_until([&]() -> future { - co_return unavail_s->connections() == 1; + co_return unavail_s->connections().size() == 1; })); }, cfg) @@ -1185,3 +1191,46 @@ SEASTAR_TEST_CASE(vector_store_client_single_status_check_after_concurrent_failu co_await unavail_s->stop(); })); } + +SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_request_timeout_cfg) { + auto unavail_s = co_await make_unavailable_server(); + auto cfg = cql_test_config(); + cfg.db_config->vector_store_primary_uri.set(format("http://unavail.node:{}", unavail_s->port())); + co_await do_with_cql_env( + [&](cql_test_env& env) -> future<> { + auto as = abort_source_timeout(); + auto schema = co_await create_test_table(env, "ks", "idx"); + auto& vs = env.local_qp().vector_store_client(); + configure(vs).with_dns({{"unavail.node", std::vector{unavail_s->host()}}}); + vs.start_background_tasks(); + + // Set request timeout to 100ms, hence max backoff time is 2x100ms = 200ms. + cfg.db_config->read_request_timeout_in_ms.set(100); + // Trigger status checking by making ANN request to unavailable server. + co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); + co_await repeat_until([&unavail_s]() -> future { + // Wait for 1 ANN request + 4 status check connections (5 total) + co_return unavail_s->connections().size() > 4; + }); + + // Verify backoff timing between status check connections. + // Skip the first connection (ANN request) and analyze status check intervals. + auto duration_between_1st_and_2nd_status_check = std::chrono::duration_cast( + unavail_s->connections().at(2).timestamp - unavail_s->connections().at(1).timestamp); + BOOST_CHECK_GE(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(100)); + BOOST_CHECK_LT(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(200)); + auto duration_between_2nd_and_3rd_status_check = std::chrono::duration_cast( + unavail_s->connections().at(3).timestamp - unavail_s->connections().at(2).timestamp); + // Max backoff time reached at 200ms, so subsequent status checks use fixed 200ms intervals. + BOOST_CHECK_GE(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(200)); // 200ms = 100ms * 2 + BOOST_CHECK_LT(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(400)); + auto duration_between_3rd_and_4th_status_check = std::chrono::duration_cast( + unavail_s->connections().at(4).timestamp - unavail_s->connections().at(3).timestamp); + BOOST_CHECK_GE(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(200)); + BOOST_CHECK_LT(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(400)); + }, + cfg) + .finally(coroutine::lambda([&] -> future<> { + co_await unavail_s->stop(); + })); +} diff --git a/vector_search/client.cc b/vector_search/client.cc index 318a942895..ce5b0a22c2 100644 --- a/vector_search/client.cc +++ b/vector_search/client.cc @@ -65,14 +65,14 @@ future map_err(std::exception_ptr& err) { } auto constexpr BACKOFF_RETRY_MIN_TIME = 100ms; -auto constexpr BACKOFF_RETRY_MAX_TIME = 20s; } // namespace -client::client(logging::logger& logger, endpoint_type endpoint_) +client::client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value request_timeout_in_ms) : _endpoint(std::move(endpoint_)) , _http_client(std::make_unique(socket_address(endpoint_.ip, endpoint_.port))) - , _logger(logger) { + , _logger(logger) + , _request_timeout(std::move(request_timeout_in_ms)) { } seastar::future client::request( @@ -131,7 +131,7 @@ void client::handle_server_unavailable() { seastar::future<> client::run_checking_status() { struct stop_retry {}; auto f = co_await coroutine::as_future( - exponential_backoff_retry::do_until_value(BACKOFF_RETRY_MIN_TIME, BACKOFF_RETRY_MAX_TIME, _as, [this] -> future> { + exponential_backoff_retry::do_until_value(BACKOFF_RETRY_MIN_TIME, backoff_retry_max(), _as, [this] -> future> { auto success = co_await check_status(); if (success) { co_return stop_retry{}; @@ -151,4 +151,9 @@ bool client::is_checking_status_in_progress() const { return !_checking_status_future.available(); } +std::chrono::milliseconds client::backoff_retry_max() const { + std::chrono::milliseconds ret{_request_timeout.get()}; + return ret * 2; +} + } // namespace vector_search diff --git a/vector_search/client.hh b/vector_search/client.hh index 7ace43d8a2..b7538db5f3 100644 --- a/vector_search/client.hh +++ b/vector_search/client.hh @@ -10,6 +10,8 @@ #include "error.hh" #include "utils/log.hh" +#include "utils/updateable_value.hh" +#include #include #include #include @@ -37,7 +39,7 @@ public: using request_error = std::variant; using request_result = std::expected; - explicit client(logging::logger& logger, endpoint_type endpoint_); + explicit client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value request_timeout_in_ms); seastar::future request( seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); @@ -55,12 +57,14 @@ private: void handle_server_unavailable(); seastar::future<> run_checking_status(); bool is_checking_status_in_progress() const; + std::chrono::milliseconds backoff_retry_max() const; endpoint_type _endpoint; seastar::http::experimental::client _http_client; seastar::future<> _checking_status_future = seastar::make_ready_future(); seastar::abort_source _as; logging::logger& _logger; + utils::updateable_value _request_timeout; }; } // namespace vector_search diff --git a/vector_search/clients.cc b/vector_search/clients.cc index 82cd0785ea..98898e13b0 100644 --- a/vector_search/clients.cc +++ b/vector_search/clients.cc @@ -51,7 +51,7 @@ auto make_unexpected(const auto& err) { } // namespace -clients::clients(logging::logger& logger, refresh_trigger_callback trigger_refresh) +clients::clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value request_timeout_in_ms) : _producer([&]() -> future { return try_with_gate(_gate, [this] -> future { _trigger_refresh(); @@ -61,7 +61,8 @@ clients::clients(logging::logger& logger, refresh_trigger_callback trigger_refre }) , _trigger_refresh(std::move(trigger_refresh)) , _timeout(WAIT_FOR_CLIENT_TIMEOUT) - , _logger(logger) { + , _logger(logger) + , _request_timeout_in_ms(std::move(request_timeout_in_ms)) { } future clients::request( @@ -119,7 +120,7 @@ future<> clients::handle_changed(const std::vector& uris, const dns::host_a auto it = addrs.find(uri.host); if (it != addrs.end()) { for (const auto& addr : it->second) { - _clients.push_back(make_lw_shared(_logger, client::endpoint_type{uri.host, uri.port, addr})); + _clients.push_back(make_lw_shared(_logger, client::endpoint_type{uri.host, uri.port, addr}, _request_timeout_in_ms)); } } } diff --git a/vector_search/clients.hh b/vector_search/clients.hh index 2fc2b6bf16..78cbcf5ed4 100644 --- a/vector_search/clients.hh +++ b/vector_search/clients.hh @@ -14,6 +14,7 @@ #include "utils/sequential_producer.hh" #include "vector_search/error.hh" #include "utils/log.hh" +#include "utils/updateable_value.hh" #include #include #include @@ -33,7 +34,7 @@ public: using get_clients_error = std::variant; using get_clients_result = std::expected; - explicit clients(logging::logger& logger, refresh_trigger_callback trigger_refresh); + explicit clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value request_timeout_in_ms); seastar::future request( seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); @@ -62,6 +63,7 @@ private: std::chrono::milliseconds _timeout; clients_vec _old_clients; logging::logger& _logger; + utils::updateable_value _request_timeout_in_ms; }; } // namespace vector_search diff --git a/vector_search/vector_store_client.cc b/vector_search/vector_store_client.cc index 9096ed7ebf..832ce9df5a 100644 --- a/vector_search/vector_store_client.cc +++ b/vector_search/vector_store_client.cc @@ -232,7 +232,7 @@ struct vector_store_client::impl { clients _clients; - impl(utils::config_file::named_value cfg) + impl(utils::config_file::named_value cfg, utils::config_file::named_value read_request_timeout_in_ms) : uri_observer(cfg.observe([this](seastar::sstring uris_csv) { try { handle_uris_changed(parse_uris(uris_csv)); @@ -249,9 +249,12 @@ struct vector_store_client::impl { co_await handle_addresses_changed(addrs); }, dns_refreshes) - , _clients(vslogger, [this]() { - dns.trigger_refresh(); - }) { + , _clients( + vslogger, + [this]() { + dns.trigger_refresh(); + }, + std::move(read_request_timeout_in_ms)) { _metrics.add_group("vector_store", {seastar::metrics::make_gauge("dns_refreshes", seastar::metrics::description("Number of DNS refreshes"), [this] { return dns_refreshes; }).aggregate({seastar::metrics::shard_label})}); @@ -307,7 +310,7 @@ struct vector_store_client::impl { }; vector_store_client::vector_store_client(config const& cfg) - : _impl(std::make_unique(cfg.vector_store_primary_uri)) { + : _impl(std::make_unique(cfg.vector_store_primary_uri, cfg.read_request_timeout_in_ms)) { } vector_store_client::~vector_store_client() = default;