diff --git a/configure.py b/configure.py index 5714c8a955..defc48f8e7 100755 --- a/configure.py +++ b/configure.py @@ -1265,6 +1265,8 @@ scylla_core = (['message/messaging_service.cc', 'utils/disk_space_monitor.cc', 'vector_search/vector_store_client.cc', 'vector_search/dns.cc', + 'vector_search/client.cc', + 'vector_search/clients.cc' ] + [Antlr3Grammar('cql3/Cql.g')] \ + scylla_raft_core ) diff --git a/test/vector_search/test_config.yaml b/test/vector_search/test_config.yaml index 0d4cfde39a..be386063dc 100644 --- a/test/vector_search/test_config.yaml +++ b/test/vector_search/test_config.yaml @@ -1,2 +1,3 @@ extra_scylla_cmdline_options: - '--reactor-backend=linux-aio' + - '--fail-on-abandoned-failed-futures=true' diff --git a/test/vector_search/vector_store_client_test.cc b/test/vector_search/vector_store_client_test.cc index c65cbcab66..e332359ce3 100644 --- a/test/vector_search/vector_store_client_test.cc +++ b/test/vector_search/vector_store_client_test.cc @@ -6,6 +6,8 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ +#include "seastar/core/future.hh" +#include "seastar/core/when_all.hh" #include "vector_search/vector_store_client.hh" #include "db/config.hh" #include "exceptions/exceptions.hh" @@ -67,13 +69,25 @@ auto listen_on_port(std::unique_ptr server, sstring host, uint16_t co_return std::make_tuple(std::move(server), listeners[0].local_address().port()); } -auto new_http_server(std::function set_routes, sstring host = LOCALHOST, uint16_t port = 0) - -> future, socket_address>> { +auto make_http_server(std::function set_routes) { static unsigned id = 0; auto server = std::make_unique(fmt::format("test_vector_store_client_{}", id++)); set_routes(server->_routes); server->set_content_streaming(true); - co_return co_await listen_on_port(std::move(server), std::move(host), port); + return server; +} + +auto new_http_server(std::function set_routes, sstring host = LOCALHOST, uint16_t port = 0) + -> future, socket_address>> { + co_return co_await listen_on_port(make_http_server(set_routes), std::move(host), port); +} + +auto new_http_server(std::function set_routes, server_socket socket) -> future, socket_address>> { + auto server = make_http_server(set_routes); + auto& listeners = http_server_tester::listeners(*server); + listeners.push_back(std::move(socket)); + co_await server->do_accepts(listeners.size() - 1); + co_return std::make_tuple(std::move(server), listeners.back().local_address().port()); } auto repeat_until(milliseconds timeout, std::function()> func) -> future { @@ -224,6 +238,11 @@ public: }; class unavailable_server { + struct Connection { + lowres_clock::time_point timestamp; + connected_socket socket; + }; + public: explicit unavailable_server(uint16_t port) : _port(port) { @@ -237,8 +256,10 @@ public: } future<> stop() { - _socket.abort_accept(); - co_await _gate.close(); + if (_socket) { + _socket.abort_accept(); + co_await _gate.close(); + } } sstring host() const { @@ -249,10 +270,30 @@ public: return _port; } - size_t connections() const { + const std::vector& connections() const { return _connections; } + future take_socket() { + _running = false; + // Make a connection to unblock accept() in run loop. + co_await seastar::connect(socket_address(net::inet_address(_host), _port)); + co_await _gate.close(); + co_return std::move(_socket); + } + + void auto_shutdown_off() { + _auto_shutdown = false; + } + + future<> shutdown_all_and_clear() { + std::vector tmp; + std::swap(tmp, _connections); + for (auto& conn : tmp) { + co_await shutdown(conn.socket); + } + } + private: future<> listen() { co_await try_on_loopback_address([this](auto host) -> future<> { @@ -266,24 +307,33 @@ private: } future<> run() { - while (true) { + while (_running) { try { - auto s = co_await _socket.accept(); - _connections++; - s.connection.shutdown_output(); - s.connection.shutdown_input(); - co_await s.connection.wait_input_shutdown(); + auto result = co_await _socket.accept(); + _connections.push_back(Connection{.timestamp = lowres_clock::now(), .socket = std::move(result.connection)}); + if (_auto_shutdown) { + co_await shutdown(_connections.back().socket); + } } catch (...) { break; } } } + future<> shutdown(connected_socket& cs) { + cs.shutdown_output(); + cs.shutdown_input(); + co_await cs.wait_input_shutdown(); + } + + seastar::server_socket _socket; seastar::gate _gate; uint16_t _port; sstring _host; - size_t _connections = 0; + std::vector _connections; + bool _running = true; + bool _auto_shutdown = true; }; auto make_unavailable_server(uint16_t port = 0) -> future> { @@ -318,6 +368,16 @@ public: co_await listen(); } + future<> start(server_socket socket) { + auto [server, addr] = co_await new_http_server( + [this](auto& r) { + set_routes(r); + }, + std::move(socket)); + _http_server = std::move(server); + _port = addr.port(); + } + future<> stop() { co_await _http_server->stop(); } @@ -342,11 +402,8 @@ private: future<> listen() { co_await try_on_loopback_address([this](auto host) -> future<> { auto [s, addr] = co_await new_http_server( - [this](routes& r) { - auto ann = [this](std::unique_ptr req, std::unique_ptr rep) -> future> { - return handle_request(std::move(req), std::move(rep)); - }; - r.add(operation_type::POST, url(INDEXES_PATH).remainder("path"), new function_handler(ann, "json")); + [this](auto& r) { + set_routes(r); }, host.c_str(), _port); _http_server = std::move(s); @@ -355,7 +412,7 @@ private: }); } - future> handle_request(std::unique_ptr req, std::unique_ptr rep) { + future> handle_ann_request(std::unique_ptr req, std::unique_ptr rep) { ann_req r{.path = INDEXES_PATH + "/" + req->get_path_param("path"), .body = co_await util::read_entire_stream_contiguous(*req->content_stream)}; _ann_requests.push_back(std::move(r)); rep->set_status(_next_ann_response.status); @@ -363,6 +420,27 @@ private: co_return rep; } + future> handle_status_request(std::unique_ptr req, std::unique_ptr rep) { + rep->set_status(status_type::ok); + rep->write_body("json", "SERVING"); + co_return rep; + } + + void set_routes(routes& r) { + r.add(operation_type::POST, url(INDEXES_PATH).remainder("path"), + new function_handler( + [this](std::unique_ptr req, std::unique_ptr rep) -> future> { + return handle_ann_request(std::move(req), std::move(rep)); + }, + "json")); + r.add(operation_type::GET, url("/api/v1/status").remainder("status"), + new function_handler( + [this](std::unique_ptr req, std::unique_ptr rep) -> future> { + return handle_status_request(std::move(req), std::move(rep)); + }, + "json")); + } + uint16_t _port = 0; sstring _host; std::unique_ptr _http_server; @@ -823,7 +901,7 @@ SEASTAR_TEST_CASE(vector_store_client_multiple_ips_high_availability) { // repeat the ANN query until the unavailable server is queried. BOOST_CHECK(co_await repeat_until([&]() -> future { keys = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); - co_return unavail_s->connections() > 1; + co_return unavail_s->connections().size() > 1; })); // The query is successful because the client falls back to the available server @@ -887,7 +965,7 @@ SEASTAR_TEST_CASE(vector_store_client_multiple_uris_high_availability) { // repeat the ANN query until the unavailable server is queried. BOOST_CHECK(co_await repeat_until([&]() -> future { keys = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); - co_return unavail_s->connections() > 1; + co_return unavail_s->connections().size() > 1; })); // The query is successful because the client falls back to the available server @@ -1028,3 +1106,131 @@ SEASTAR_TEST_CASE(vector_store_client_test_paging_warning_doesnt_show_when_limit return s1->stop(); }); } + +SEASTAR_TEST_CASE(vector_store_client_node_recovery_after_backoff) { + auto unavail_server = co_await make_unavailable_server(); + std::unique_ptr avail_server; + constexpr auto HOSTNAME = "server.node"; + + auto cfg = cql_test_config(); + cfg.db_config->vector_store_primary_uri.set(format("http://{}:{}", HOSTNAME, unavail_server->port())); + co_await do_with_cql_env( + [&](cql_test_env& env) -> future<> { + auto as = abort_source_timeout(); + auto schema = co_await create_test_table(env, "ks", "idx"); + auto& vs = env.local_qp().vector_store_client(); + configure(vs).with_dns({{HOSTNAME, std::vector{unavail_server->host()}}}); + vs.start_background_tasks(); + + // Send request to unavailable node - this will put the node to backoff. + auto result = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); + + BOOST_CHECK(!result); + BOOST_CHECK(std::holds_alternative(result.error())); + + // Replace the unavailable server with an available one. + avail_server = std::make_unique(); + co_await avail_server->start(co_await unavail_server->take_socket()); + + // Wait until node is taken out of the backoff state and used for requests again. + BOOST_CHECK(co_await repeat_until([&]() -> future { + auto result = co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); + co_return result.has_value(); + })); + }, + cfg) + .finally(coroutine::lambda([&] -> future<> { + co_await unavail_server->stop(); + if (avail_server) { + co_await avail_server->stop(); + } + })); +} + +SEASTAR_TEST_CASE(vector_store_client_single_status_check_after_concurrent_failures) { + using keys = std::expected; + + auto unavail_s = co_await make_unavailable_server(); + auto cfg = cql_test_config(); + cfg.db_config->vector_store_primary_uri.set(format("http://unavail.node:{}", unavail_s->port())); + co_await do_with_cql_env( + [&](cql_test_env& env) -> future<> { + std::vector> requests; + unavail_s->auto_shutdown_off(); + constexpr auto NUM_OF_PARALLEL_REQUESTS = 50; + auto as = abort_source_timeout(); + auto schema = co_await create_test_table(env, "ks", "idx"); + auto& vs = env.local_qp().vector_store_client(); + configure(vs).with_dns({{"unavail.node", std::vector{unavail_s->host()}}}); + vs.start_background_tasks(); + + for (int i = 0; i < NUM_OF_PARALLEL_REQUESTS; ++i) { + requests.push_back(vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset())); + } + // Wait for all requests to establish a connection with the server. + co_await repeat_until([&unavail_s]() -> future { + co_return unavail_s->connections().size() == NUM_OF_PARALLEL_REQUESTS; + }); + // Shutdown all connections, causing all requests to fail. + // The number of connections will drop to zero. + co_await unavail_s->shutdown_all_and_clear(); + // Wait for all requests to complete. + co_await when_all(requests.begin(), requests.end()); + + // After the backoff period, a single status check is expected to verify node recovery. + // The test server keeps the subsequent status check connection open (auto_shutdown_off()). + // This prevents the client's backoff mechanism from sending another status request + // while the first one is pending, ensuring that exactly one new connection is made. + // This makes the test assertion deterministic. + BOOST_CHECK(co_await repeat_until([&]() -> future { + co_return unavail_s->connections().size() == 1; + })); + }, + cfg) + .finally(coroutine::lambda([&] -> future<> { + co_await unavail_s->stop(); + })); +} + +SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_request_timeout_cfg) { + auto unavail_s = co_await make_unavailable_server(); + auto cfg = cql_test_config(); + cfg.db_config->vector_store_primary_uri.set(format("http://unavail.node:{}", unavail_s->port())); + co_await do_with_cql_env( + [&](cql_test_env& env) -> future<> { + auto as = abort_source_timeout(); + auto schema = co_await create_test_table(env, "ks", "idx"); + auto& vs = env.local_qp().vector_store_client(); + configure(vs).with_dns({{"unavail.node", std::vector{unavail_s->host()}}}); + vs.start_background_tasks(); + + // Set request timeout to 100ms, hence max backoff time is 2x100ms = 200ms. + cfg.db_config->read_request_timeout_in_ms.set(100); + // Trigger status checking by making ANN request to unavailable server. + co_await vs.ann("ks", "idx", schema, std::vector{0.1, 0.2, 0.3}, 2, as.reset()); + co_await repeat_until([&unavail_s]() -> future { + // Wait for 1 ANN request + 4 status check connections (5 total) + co_return unavail_s->connections().size() > 4; + }); + + // Verify backoff timing between status check connections. + // Skip the first connection (ANN request) and analyze status check intervals. + auto duration_between_1st_and_2nd_status_check = std::chrono::duration_cast( + unavail_s->connections().at(2).timestamp - unavail_s->connections().at(1).timestamp); + BOOST_CHECK_GE(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(100)); + BOOST_CHECK_LT(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(200)); + auto duration_between_2nd_and_3rd_status_check = std::chrono::duration_cast( + unavail_s->connections().at(3).timestamp - unavail_s->connections().at(2).timestamp); + // Max backoff time reached at 200ms, so subsequent status checks use fixed 200ms intervals. + BOOST_CHECK_GE(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(200)); // 200ms = 100ms * 2 + BOOST_CHECK_LT(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(400)); + auto duration_between_3rd_and_4th_status_check = std::chrono::duration_cast( + unavail_s->connections().at(4).timestamp - unavail_s->connections().at(3).timestamp); + BOOST_CHECK_GE(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(200)); + BOOST_CHECK_LT(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(400)); + }, + cfg) + .finally(coroutine::lambda([&] -> future<> { + co_await unavail_s->stop(); + })); +} diff --git a/vector_search/CMakeLists.txt b/vector_search/CMakeLists.txt index 9ff112ed70..10b89c9cd7 100644 --- a/vector_search/CMakeLists.txt +++ b/vector_search/CMakeLists.txt @@ -2,7 +2,9 @@ add_library(vector_search STATIC) target_sources(vector_search PRIVATE vector_store_client.cc - dns.cc) + dns.cc + client.cc + clients.cc) target_link_libraries(vector_search PUBLIC Seastar::seastar diff --git a/vector_search/client.cc b/vector_search/client.cc new file mode 100644 index 0000000000..ce5b0a22c2 --- /dev/null +++ b/vector_search/client.cc @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "client.hh" +#include "utils/exceptions.hh" +#include "utils/exponential_backoff_retry.hh" +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace seastar; +using namespace std::chrono_literals; + +namespace vector_search { +namespace { + +class client_connection_factory : public http::experimental::connection_factory { + socket_address _addr; + +public: + explicit client_connection_factory(socket_address addr) + : _addr(addr) { + } + + future make([[maybe_unused]] abort_source* as) override { + auto socket = co_await seastar::connect(_addr, {}, transport::TCP); + socket.set_nodelay(true); + socket.set_keepalive_parameters(net::tcp_keepalive_params{ + .idle = 60s, + .interval = 60s, + .count = 10, + }); + socket.set_keepalive(true); + co_return socket; + } +}; + +bool is_server_unavailable(std::exception_ptr& err) { + return try_catch(err) != nullptr; +} + +bool is_request_aborted(std::exception_ptr& err) { + return try_catch(err) != nullptr; +} + +future map_err(std::exception_ptr& err) { + if (is_server_unavailable(err)) { + co_return service_unavailable_error{}; + } + if (is_request_aborted(err)) { + co_return aborted_error{}; + } + co_await coroutine::return_exception_ptr(err); // rethrow + co_return client::request_error{}; // unreachable +} + +auto constexpr BACKOFF_RETRY_MIN_TIME = 100ms; + +} // namespace + +client::client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value request_timeout_in_ms) + : _endpoint(std::move(endpoint_)) + , _http_client(std::make_unique(socket_address(endpoint_.ip, endpoint_.port))) + , _logger(logger) + , _request_timeout(std::move(request_timeout_in_ms)) { +} + +seastar::future client::request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as) { + if (is_checking_status_in_progress()) { + co_return std::unexpected(service_unavailable_error{}); + } + + auto f = co_await seastar::coroutine::as_future(request_impl(method, std::move(path), std::move(content), std::nullopt, as)); + if (f.failed()) { + auto err = f.get_exception(); + if (is_server_unavailable(err)) { + handle_server_unavailable(); + } + co_return std::unexpected{co_await map_err(err)}; + } + co_return co_await std::move(f); +} + +seastar::future client::request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional content, + std::optional&& expected_status, seastar::abort_source& as) { + + auto req = http::request::make(method, _endpoint.host, std::move(path)); + if (content) { + req.write_body("json", std::move(*content)); + } + auto resp = response{seastar::http::reply::status_type::ok, std::vector>()}; + auto handler = [&resp](http::reply const& reply, input_stream body) -> future<> { + resp.status = reply._status; + resp.content = co_await util::read_entire_stream(body); + }; + + co_await _http_client.make_request(std::move(req), std::move(handler), std::move(expected_status), &as); + co_return resp; +} + +seastar::future client::check_status() { + auto f = co_await coroutine::as_future(request_impl(httpd::operation_type::GET, "/api/v1/status", std::nullopt, http::reply::status_type::ok, _as)); + auto ret = !f.failed(); + f.ignore_ready_future(); + co_return ret; +} + +seastar::future<> client::close() { + _as.request_abort(); + co_await std::exchange(_checking_status_future, make_ready_future()); + co_await _http_client.close(); +} + +void client::handle_server_unavailable() { + if (!is_checking_status_in_progress()) { + _checking_status_future = run_checking_status(); + } +} + +seastar::future<> client::run_checking_status() { + struct stop_retry {}; + auto f = co_await coroutine::as_future( + exponential_backoff_retry::do_until_value(BACKOFF_RETRY_MIN_TIME, backoff_retry_max(), _as, [this] -> future> { + auto success = co_await check_status(); + if (success) { + co_return stop_retry{}; + } + co_return std::nullopt; + })); + if (f.failed()) { + if (auto err = f.get_exception(); !is_request_aborted(err)) { + // Report internal error for exceptions other than abort + on_internal_error_noexcept(_logger, fmt::format("exception while checking status: {}", err)); + } + } + co_return; +} + +bool client::is_checking_status_in_progress() const { + return !_checking_status_future.available(); +} + +std::chrono::milliseconds client::backoff_retry_max() const { + std::chrono::milliseconds ret{_request_timeout.get()}; + return ret * 2; +} + +} // namespace vector_search diff --git a/vector_search/client.hh b/vector_search/client.hh new file mode 100644 index 0000000000..b7538db5f3 --- /dev/null +++ b/vector_search/client.hh @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "error.hh" +#include "utils/log.hh" +#include "utils/updateable_value.hh" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace vector_search { + +class client { +public: + struct response { + seastar::http::reply::status_type status; + std::vector> content; + }; + + struct endpoint_type { + seastar::sstring host; + std::uint16_t port; + seastar::net::inet_address ip; + }; + + using request_error = std::variant; + using request_result = std::expected; + + explicit client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value request_timeout_in_ms); + + seastar::future request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); + + seastar::future<> close(); + + const endpoint_type& endpoint() const { + return _endpoint; + } + +private: + seastar::future request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional content, + std::optional&& expected, seastar::abort_source& as); + seastar::future check_status(); + void handle_server_unavailable(); + seastar::future<> run_checking_status(); + bool is_checking_status_in_progress() const; + std::chrono::milliseconds backoff_retry_max() const; + + endpoint_type _endpoint; + seastar::http::experimental::client _http_client; + seastar::future<> _checking_status_future = seastar::make_ready_future(); + seastar::abort_source _as; + logging::logger& _logger; + utils::updateable_value _request_timeout; +}; + +} // namespace vector_search diff --git a/vector_search/clients.cc b/vector_search/clients.cc new file mode 100644 index 0000000000..98898e13b0 --- /dev/null +++ b/vector_search/clients.cc @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "clients.hh" +#include "load_balancer.hh" +#include "utils/exceptions.hh" +#include +#include +#include +#include + +using namespace seastar; + +namespace vector_search { +namespace { + +/// Timeout for waiting for a new client to be available +constexpr auto WAIT_FOR_CLIENT_TIMEOUT = std::chrono::seconds(5); + +/// The number of times to retry a request if all nodes fail with an unavailable error. +constexpr auto REQUEST_RETRIES = 3; + +static thread_local auto random_engine = std::default_random_engine(std::random_device{}()); + +/// Wait for a condition variable to be signaled or timeout. +auto wait_for_signal(condition_variable& cv, lowres_clock::time_point timeout) -> future { + auto result = co_await coroutine::as_future(cv.wait(timeout)); + if (result.failed()) { + auto err = result.get_exception(); + if (try_catch(err) != nullptr) { + co_return; + } + co_await coroutine::return_exception_ptr(std::move(err)); + } + co_return; +} + +template +auto make_unexpected(const auto& err) { + return std::unexpected{std::visit( + [](auto&& err) { + return Variant{err}; + }, + err)}; +}; + +} // namespace + +clients::clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value request_timeout_in_ms) + : _producer([&]() -> future { + return try_with_gate(_gate, [this] -> future { + _trigger_refresh(); + co_await wait_for_signal(_refresh_cv, lowres_clock::now() + _timeout); + co_return _clients; + }); + }) + , _trigger_refresh(std::move(trigger_refresh)) + , _timeout(WAIT_FOR_CLIENT_TIMEOUT) + , _logger(logger) + , _request_timeout_in_ms(std::move(request_timeout_in_ms)) { +} + +future clients::request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as) { + + for (auto retries = 0; retries < REQUEST_RETRIES; ++retries) { + auto clients = co_await get_clients(as); + if (!clients) { + co_return make_unexpected(clients.error()); + } + + load_balancer lb(std::move(*clients), random_engine); + while (auto client = lb.next()) { + auto result = co_await client->request(method, path, content, as); + if (result) { + co_return std::move(result.value()); + } + if (!result && std::holds_alternative(result.error())) { + // try next client + continue; + } + co_return make_unexpected(result.error()); + } + _trigger_refresh(); + } + + co_return std::unexpected{service_unavailable_error{}}; +} + +/// Get the current http client or wait for a new one to be available. +future clients::get_clients(abort_source& as) { + if (!_clients.empty()) { + co_return _clients; + } + + auto current_clients = co_await coroutine::as_future(_producer(as)); + + if (current_clients.failed()) { + auto err = current_clients.get_exception(); + if (as.abort_requested()) { + co_return std::unexpected{aborted_error{}}; + } + co_await coroutine::return_exception_ptr(std::move(err)); + } + auto clients = co_await std::move(current_clients); + if (clients.empty()) { + co_return std::unexpected{addr_unavailable_error{}}; + } + co_return clients; +} + +future<> clients::handle_changed(const std::vector& uris, const dns::host_address_map& addrs) { + clear(); + for (const auto& uri : uris) { + auto it = addrs.find(uri.host); + if (it != addrs.end()) { + for (const auto& addr : it->second) { + _clients.push_back(make_lw_shared(_logger, client::endpoint_type{uri.host, uri.port, addr}, _request_timeout_in_ms)); + } + } + } + + _refresh_cv.broadcast(); + co_await close_old_clients(); +} + +future<> clients::stop() { + _refresh_cv.signal(); + co_await _gate.close(); + co_await close_clients(); + co_await close_old_clients(); +} + +void clients::clear() { + _old_clients.insert(_old_clients.end(), std::make_move_iterator(_clients.begin()), std::make_move_iterator(_clients.end())); + _clients.clear(); +} + +future<> clients::close_clients() { + for (auto& client : _clients) { + co_await client->close(); + } + _clients.clear(); +} + +future<> clients::close_old_clients() { + // iterate over old clients and close them. There is a co_await in the loop + // so we need to use [] accessor and copying clients to avoid dangling references of iterators. + // NOLINTNEXTLINE(modernize-loop-convert) + for (auto it = 0U; it < _old_clients.size(); ++it) { + auto& client = _old_clients[it]; + if (client && client.owned()) { + auto client_cloned = client; + client = nullptr; + co_await client_cloned->close(); + } + } + std::erase_if(_old_clients, [](auto const& client) { + return !client; + }); +} + +} // namespace vector_search diff --git a/vector_search/clients.hh b/vector_search/clients.hh new file mode 100644 index 0000000000..78cbcf5ed4 --- /dev/null +++ b/vector_search/clients.hh @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "client.hh" +#include "dns.hh" +#include "uri.hh" +#include "utils/sequential_producer.hh" +#include "vector_search/error.hh" +#include "utils/log.hh" +#include "utils/updateable_value.hh" +#include +#include +#include +#include +#include + +namespace vector_search { + +class clients { +public: + using refresh_trigger_callback = std::function; + + using request_error = std::variant; + using request_result = std::expected; + + using clients_vec = std::vector>; + using get_clients_error = std::variant; + using get_clients_result = std::expected; + + explicit clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value request_timeout_in_ms); + + seastar::future request( + seastar::httpd::operation_type method, seastar::sstring path, std::optional content, seastar::abort_source& as); + + seastar::future<> handle_changed(const std::vector& uris, const dns::host_address_map& addrs); + + seastar::future<> stop(); + + void clear(); + + seastar::future get_clients(seastar::abort_source& as); + + void timeout(std::chrono::milliseconds timeout) { + _timeout = timeout; + } + +private: + seastar::future<> close_clients(); + seastar::future<> close_old_clients(); + + clients_vec _clients; + sequential_producer _producer; + refresh_trigger_callback _trigger_refresh; + seastar::gate _gate; + seastar::condition_variable _refresh_cv; + std::chrono::milliseconds _timeout; + clients_vec _old_clients; + logging::logger& _logger; + utils::updateable_value _request_timeout_in_ms; +}; + +} // namespace vector_search diff --git a/vector_search/error.hh b/vector_search/error.hh new file mode 100644 index 0000000000..ce337b12a8 --- /dev/null +++ b/vector_search/error.hh @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once +#include +#include +#include + +namespace vector_search { + +/// The service is disabled. +struct disabled_error {}; + +/// The operation was aborted. +struct aborted_error {}; + +/// The vector-store addr is unavailable (not possible to get an addr from the dns service). +struct addr_unavailable_error {}; + +/// The vector-store service is unavailable. +struct service_unavailable_error {}; + +/// The error from the vector-store service. +struct service_error { + seastar::http::reply::status_type status; ///< The HTTP status code from the vector-store service. +}; + +/// An unsupported reply format from the vector-store service. +struct service_reply_format_error {}; + +struct error_visitor { + seastar::sstring operator()(service_error e) const { + return fmt::format("Vector Store error: HTTP status {}", e.status); + } + seastar::sstring operator()(disabled_error) const { + return fmt::format("Vector Store is disabled"); + } + seastar::sstring operator()(aborted_error) const { + return fmt::format("Vector Store request was aborted"); + } + seastar::sstring operator()(addr_unavailable_error) const { + return fmt::format("Vector Store service address could not be fetched from DNS"); + } + seastar::sstring operator()(service_unavailable_error) const { + return fmt::format("Vector Store service is unavailable"); + } + seastar::sstring operator()(service_reply_format_error) const { + return fmt::format("Vector Store returned an invalid JSON"); + } +}; + +} // namespace vector_search diff --git a/vector_search/uri.hh b/vector_search/uri.hh new file mode 100644 index 0000000000..1884df63c8 --- /dev/null +++ b/vector_search/uri.hh @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once +#include +#include + +namespace vector_search { + +struct uri { + seastar::sstring host; + std::uint16_t port; +}; + +} // namespace vector_search diff --git a/vector_search/vector_store_client.cc b/vector_search/vector_store_client.cc index 1d4e270fa1..832ce9df5a 100644 --- a/vector_search/vector_store_client.cc +++ b/vector_search/vector_store_client.cc @@ -9,6 +9,8 @@ #include "vector_store_client.hh" #include "dns.hh" #include "load_balancer.hh" +#include "clients.hh" +#include "uri.hh" #include "db/config.hh" #include "exceptions/exceptions.hh" #include "utils/sequential_producer.hh" @@ -53,14 +55,7 @@ using port_number = vector_search::vector_store_client::port_number; using primary_key = vector_search::primary_key; using primary_keys = vector_search::vector_store_client::primary_keys; using service_reply_format_error = vector_search::vector_store_client::service_reply_format_error; -using tcp_keepalive_params = net::tcp_keepalive_params; -using time_point = lowres_clock::time_point; - -/// Timeout for waiting for a new client to be available -constexpr auto WAIT_FOR_CLIENT_TIMEOUT = std::chrono::seconds(5); - -/// The number of times to retry an /ann request if all nodes fail with a system error. -constexpr auto ANN_RETRIES = 3; +using uri = vector_search::uri; // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) logging::logger vslogger("vector_store_client"); @@ -76,11 +71,6 @@ auto parse_port(std::string const& port_txt) -> std::optional { return port; } -struct uri { - host_name host; - port_number port; -}; - auto parse_service_uri(std::string_view uri_) -> std::optional { constexpr auto URI_REGEX = R"(^http:\/\/([a-z0-9._-]+):([0-9]+)$)"; auto const uri_regex = std::regex(URI_REGEX); @@ -98,20 +88,6 @@ auto parse_service_uri(std::string_view uri_) -> std::optional { return {{host, *port}}; } - -/// Wait for a condition variable to be signaled or timeout. -auto wait_for_signal(condition_variable& cv, time_point timeout) -> future { - auto result = co_await coroutine::as_future(cv.wait(timeout)); - if (result.failed()) { - auto err = result.get_exception(); - if (try_catch(err) != nullptr) { - co_return; - } - co_await coroutine::return_exception_ptr(std::move(err)); - } - co_return; -} - auto get_key_column_value(const rjson::value& item, std::size_t idx, const column_definition& column) -> std::expected { auto const& column_name = column.name_as_text(); auto const* keys_obj = rjson::find(item, column_name); @@ -203,63 +179,6 @@ auto read_ann_json(rjson::value const& json, schema_ptr const& schema) -> std::e return std::move(keys); } -class client_connection_factory : public http::experimental::connection_factory { - socket_address _addr; - -public: - explicit client_connection_factory(socket_address addr) - : _addr(addr) { - } - - future make([[maybe_unused]] abort_source* as) override { - auto socket = co_await seastar::connect(_addr, {}, transport::TCP); - socket.set_nodelay(true); - socket.set_keepalive_parameters(tcp_keepalive_params{ - .idle = 60s, - .interval = 60s, - .count = 10, - }); - socket.set_keepalive(true); - co_return socket; - } -}; - -class http_client { - - uri _uri; - inet_address _addr; - - http::experimental::client impl; - -public: - http_client(uri host_port_, inet_address addr) - : _uri(std::move(host_port_)) - , _addr(std::move(addr)) - , impl(std::make_unique(socket_address(addr, _uri.port))) { - } - - bool connects_to(inet_address const& a, port_number p) const { - return _addr == a && _uri.port == p; - } - - seastar::future<> make_request(operation_type method, const http_path& path, const std::optional& content, - http::experimental::client::reply_handler&& handle, abort_source* as) { - auto req = http::request::make(method, _uri.host, path); - if (content) { - req.write_body("json", *content); - } - return impl.make_request(std::move(req), std::move(handle), std::nullopt, as); - } - - seastar::future<> close() { - return impl.close(); - } - - const inet_address& addr() const { - return _addr; - } -}; - bool should_vector_store_service_be_disabled(std::vector const& uris) { return uris.empty() || uris[0].empty(); } @@ -305,23 +224,15 @@ std::vector get_hosts(const std::vector& uris) { namespace vector_search { struct vector_store_client::impl { - - using clients_type = std::vector>; - utils::observer uri_observer; - clients_type current_clients; - clients_type old_clients; std::vector _uris; - gate client_producer_gate; - condition_variable refresh_client_cv; - milliseconds wait_for_client_timeout = WAIT_FOR_CLIENT_TIMEOUT; - sequential_producer clients_producer; dns dns; uint64_t dns_refreshes = 0; seastar::metrics::metric_groups _metrics; + clients _clients; - impl(utils::config_file::named_value cfg) + impl(utils::config_file::named_value cfg, utils::config_file::named_value read_request_timeout_in_ms) : uri_observer(cfg.observe([this](seastar::sstring uris_csv) { try { handle_uris_changed(parse_uris(uris_csv)); @@ -331,154 +242,38 @@ struct vector_store_client::impl { } })) , _uris(parse_uris(cfg())) - , clients_producer([&]() -> future { - return try_with_gate(client_producer_gate, [this] -> future { - dns.trigger_refresh(); - co_await wait_for_signal(refresh_client_cv, lowres_clock::now() + wait_for_client_timeout); - co_return current_clients; - }); - }) - , dns(vslogger, get_hosts(_uris), [this](auto const& addrs) -> future<> { - co_await handle_addresses_changed(addrs); - }, dns_refreshes) { + + , dns( + vslogger, get_hosts(_uris), + [this](auto const& addrs) -> future<> { + co_await handle_addresses_changed(addrs); + }, + dns_refreshes) + , _clients( + vslogger, + [this]() { + dns.trigger_refresh(); + }, + std::move(read_request_timeout_in_ms)) { _metrics.add_group("vector_store", {seastar::metrics::make_gauge("dns_refreshes", seastar::metrics::description("Number of DNS refreshes"), [this] { return dns_refreshes; }).aggregate({seastar::metrics::shard_label})}); } void handle_uris_changed(std::vector uris) { - clear_current_clients(); + _clients.clear(); _uris = std::move(uris); dns.hosts(get_hosts(_uris)); } - auto handle_addresses_changed(const dns::host_address_map& addrs) -> future<> { - clear_current_clients(); - for (const auto& uri : _uris) { - auto it = addrs.find(uri.host); - if (it != addrs.end()) { - for (const auto& addr : it->second) { - current_clients.push_back(make_lw_shared(uri, addr)); - } - } - } - - refresh_client_cv.broadcast(); - co_await cleanup_old_clients(); + future<> handle_addresses_changed(const dns::host_address_map& addrs) { + co_await _clients.handle_changed(_uris, addrs); } auto is_disabled() const -> bool { return _uris.empty(); } - void clear_current_clients() { - old_clients.insert(old_clients.end(), std::make_move_iterator(current_clients.begin()), std::make_move_iterator(current_clients.end())); - current_clients.clear(); - } - - /// Cleanup current clients - auto cleanup_current_clients() -> future<> { - for (auto& client : current_clients) { - co_await client->close(); - } - current_clients.clear(); - } - - /// Cleanup old clients that are no longer used. - auto cleanup_old_clients() -> future<> { - // iterate over old clients and close them. There is a co_await in the loop - // so we need to use [] accessor and copying clients to avoid dangling references of iterators. - // NOLINTNEXTLINE(modernize-loop-convert) - for (auto it = 0U; it < old_clients.size(); ++it) { - auto& client = old_clients[it]; - if (client && client.owned()) { - auto client_cloned = client; - co_await client_cloned->close(); - client_cloned = nullptr; - } - } - std::erase_if(old_clients, [](auto const& client) { - return !client; - }); - } - - using get_client_error = std::variant; - - /// Get the current http client or wait for a new one to be available. - auto get_clients(abort_source& as) -> future> { - if (is_disabled()) { - co_return std::unexpected{disabled{}}; - } - if (!current_clients.empty()) { - co_return current_clients; - } - - auto current_clients = co_await coroutine::as_future(clients_producer(as)); - - if (current_clients.failed()) { - auto err = current_clients.get_exception(); - if (as.abort_requested()) { - co_return std::unexpected{aborted{}}; - } - co_await coroutine::return_exception_ptr(std::move(err)); - } - auto clients = co_await std::move(current_clients); - if (clients.empty()) { - co_return std::unexpected{addr_unavailable{}}; - } - co_return clients; - } - - struct make_request_response { - http::reply::status_type status; ///< The HTTP status of the response. - std::vector> content; ///< The content of the response. - }; - - using make_request_error = std::variant; - - auto make_request(operation_type method, http_path path, std::optional content, abort_source& as) - -> future> { - auto resp = make_request_response{.status = http::reply::status_type::ok, .content = std::vector>()}; - - for (auto retries = 0; retries < ANN_RETRIES; ++retries) { - auto clients = co_await get_clients(as); - if (!clients) { - co_return std::unexpected{std::visit( - [](auto&& err) { - return make_request_error{err}; - }, - clients.error())}; - } - - load_balancer lb(std::move(*clients), random_engine); - while (auto client = lb.next()) { - auto result = co_await coroutine::as_future(client->make_request( - method, path, content, - [&resp](http::reply const& reply, input_stream body) -> future<> { - resp.status = reply._status; - resp.content = co_await util::read_entire_stream(body); - }, - &as)); - if (result.failed()) { - auto err = result.get_exception(); - if (as.abort_requested()) { - co_return std::unexpected{aborted{}}; - } - if (try_catch(err) == nullptr) { - co_await coroutine::return_exception_ptr(std::move(err)); - } - // std::system_error means that the server is unavailable, so we retry - } else { - co_return resp; - } - } - - dns.trigger_refresh(); - } - - co_return std::unexpected{service_unavailable{}}; - } - auto ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, abort_source& as) -> future> { if (is_disabled()) { @@ -489,7 +284,7 @@ struct vector_store_client::impl { auto path = format("/api/v1/indexes/{}/{}/ann", keyspace, name); auto content = write_ann_json(std::move(vs_vector), limit); - auto resp = co_await make_request(operation_type::POST, std::move(path), std::move(content), as); + auto resp = co_await _clients.request(operation_type::POST, std::move(path), std::move(content), as); if (!resp) { co_return std::unexpected{std::visit( [](auto&& err) { @@ -515,7 +310,7 @@ struct vector_store_client::impl { }; vector_store_client::vector_store_client(config const& cfg) - : _impl(std::make_unique(cfg.vector_store_primary_uri)) { + : _impl(std::make_unique(cfg.vector_store_primary_uri, cfg.read_request_timeout_in_ms)) { } vector_store_client::~vector_store_client() = default; @@ -525,11 +320,8 @@ void vector_store_client::start_background_tasks() { } auto vector_store_client::stop() -> future<> { - _impl->refresh_client_cv.signal(); - co_await _impl->client_producer_gate.close(); + co_await _impl->_clients.stop(); co_await _impl->dns.stop(); - co_await _impl->cleanup_old_clients(); - co_await _impl->cleanup_current_clients(); } auto vector_store_client::is_disabled() const -> bool { @@ -546,7 +338,7 @@ void vector_store_client_tester::set_dns_refresh_interval(vector_store_client& v } void vector_store_client_tester::set_wait_for_client_timeout(vector_store_client& vsc, std::chrono::milliseconds timeout) { - vsc._impl->wait_for_client_timeout = timeout; + vsc._impl->_clients.timeout(timeout); } void vector_store_client_tester::set_dns_resolver(vector_store_client& vsc, std::function>(sstring const&)> resolver) { @@ -558,13 +350,13 @@ void vector_store_client_tester::trigger_dns_resolver(vector_store_client& vsc) } auto vector_store_client_tester::resolve_hostname(vector_store_client& vsc, abort_source& as) -> future> { - auto clients = co_await vsc._impl->get_clients(as); + auto clients = co_await vsc._impl->_clients.get_clients(as); std::vector ret; if (!clients) { co_return ret; } for (auto const& c : *clients) { - ret.push_back(c->addr()); + ret.push_back(c->endpoint().ip); } co_return ret; } diff --git a/vector_search/vector_store_client.hh b/vector_search/vector_store_client.hh index ae21bb241b..cd593507c3 100644 --- a/vector_search/vector_store_client.hh +++ b/vector_search/vector_store_client.hh @@ -11,6 +11,7 @@ #include "dht/decorated_key.hh" #include "keys/keys.hh" #include "seastarx.hh" +#include "error.hh" #include #include #include @@ -49,48 +50,15 @@ public: using schema_ptr = lw_shared_ptr; using status_type = http::reply::status_type; - /// The vector_store_client service is disabled. - struct disabled {}; - - /// The operation was aborted. - struct aborted {}; - - /// The vector-store addr is unavailable (not possible to get an addr from the dns service). - struct addr_unavailable {}; - - /// The vector-store service is unavailable. - struct service_unavailable {}; - - /// The error from the vector-store service. - struct service_error { - status_type status; ///< The HTTP status code from the vector-store service. - }; - - /// An unsupported reply format from the vector-store service. - struct service_reply_format_error {}; + using disabled = disabled_error; + using aborted = aborted_error; + using addr_unavailable = addr_unavailable_error; + using service_unavailable = service_unavailable_error; + using service_error = service_error; + using service_reply_format_error = service_reply_format_error; using ann_error = std::variant; - - struct ann_error_visitor { - sstring operator()(vector_store_client::service_error e) const { - return fmt::format("Vector Store error: HTTP status {}", e.status); - } - sstring operator()(vector_store_client::disabled) const { - return fmt::format("Vector Store is disabled"); - } - sstring operator()(vector_store_client::aborted) const { - return fmt::format("Vector Store request was aborted"); - } - sstring operator()(vector_store_client::addr_unavailable) const { - return fmt::format("Vector Store service address could not be fetched from DNS"); - } - sstring operator()(vector_store_client::service_unavailable) const { - return fmt::format("Vector Store service is unavailable"); - } - sstring operator()(vector_store_client::service_reply_format_error) const { - return fmt::format("Vector Store returned an invalid JSON"); - } - }; + using ann_error_visitor = error_visitor; explicit vector_store_client(config const& cfg); ~vector_store_client();