diff --git a/test/vector_search/client_test.cc b/test/vector_search/client_test.cc index eaa6d4d73c..a6bc838ac9 100644 --- a/test/vector_search/client_test.cc +++ b/test/vector_search/client_test.cc @@ -9,7 +9,9 @@ #include "seastar/http/common.hh" #include "vector_search/client.hh" #include "vs_mock_server.hh" +#include "unavailable_server.hh" #include "utils.hh" +#include "utils/rjson.hh" #include #include #include @@ -27,10 +29,18 @@ const auto REQUEST_TIMEOUT = utils::updateable_value{100}; constexpr auto PATH = "/api/v1/indexes/ks/idx/ann"; constexpr auto CONTENT = R"({"vector": [0.1, 0.2, 0.3], "limit": 10})"; -client::endpoint_type make_endpoint(const std::unique_ptr& server) { +template +client::endpoint_type make_endpoint(const std::unique_ptr& server) { return client::endpoint_type{server->host(), server->port(), seastar::net::inet_address(server->host())}; } +future> make_available(std::unique_ptr& down_server) { + // Replace the unavailable server with an available one. + auto server = std::make_unique(); + co_await server->start(co_await down_server->take_socket()); + co_return server; +} + } // namespace SEASTAR_TEST_CASE(is_up_after_construction) { @@ -73,64 +83,72 @@ SEASTAR_TEST_CASE(is_up_when_server_returned_client_error_status) { co_await server->stop(); } -SEASTAR_TEST_CASE(is_down_when_server_returned_server_error_status) { +SEASTAR_TEST_CASE(is_up_when_server_returned_server_error_status) { abort_source_timeout as; auto server = co_await make_vs_mock_server(); server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"}); - // The node might attempt to recover in the background by making a status request. - // To prevent a race condition where the node recovers before we check its status, - // we ensure the next status request also fails. - server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"}); client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT}; auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset()); - BOOST_CHECK(!client.is_up()); - BOOST_CHECK(!res); - BOOST_CHECK(std::holds_alternative(res.error())); + BOOST_CHECK(client.is_up()); + BOOST_CHECK(res); + BOOST_CHECK(res->status == seastar::http::reply::status_type::internal_server_error); co_await client.close(); co_await server->stop(); } -SEASTAR_TEST_CASE(is_down_when_server_returned_service_unavailable_status) { +SEASTAR_TEST_CASE(is_up_when_server_returned_service_unavailable_status) { abort_source_timeout as; auto server = co_await make_vs_mock_server(); server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::service_unavailable, "Service Unavailable"}); - // The node might attempt to recover in the background by making a status request. - // To prevent a race condition where the node recovers before we check its status, - // we ensure the next status request also fails. - server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"}); client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT}; auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset()); + BOOST_CHECK(client.is_up()); + BOOST_CHECK(res); + BOOST_CHECK(res->status == seastar::http::reply::status_type::service_unavailable); + + co_await client.close(); + co_await server->stop(); +} + +SEASTAR_TEST_CASE(is_down_when_server_is_not_available) { + abort_source_timeout as; + auto down_server = co_await make_unavailable_server(); + client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT}; + + auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset()); + BOOST_CHECK(!client.is_up()); BOOST_CHECK(!res); BOOST_CHECK(std::holds_alternative(res.error())); co_await client.close(); - co_await server->stop(); + co_await down_server->stop(); } SEASTAR_TEST_CASE(becomes_up_when_server_status_is_serving) { abort_source_timeout as; - auto server = co_await make_vs_mock_server(); - server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"}); - server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, "SERVING"}); - client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT}; + auto down_server = co_await make_unavailable_server(); + client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT}; + + auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset()); + auto server = co_await make_available(down_server); + server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, rjson::quote_json_string("SERVING")}); - co_await client.request(operation_type::POST, PATH, CONTENT, as.reset()); auto became_up = co_await repeat_until([&client]() -> future { co_return client.is_up(); }); - BOOST_CHECK(became_up); co_await client.close(); co_await server->stop(); + co_await down_server->stop(); } SEASTAR_TEST_CASE(remains_down_when_server_status_is_not_serving) { @@ -141,21 +159,22 @@ SEASTAR_TEST_CASE(remains_down_when_server_status_is_not_serving) { "BOOTSTRAPPING", }; for (auto const& status : non_serving_statuses) { - auto server = co_await make_vs_mock_server(); - server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"}); - server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, status}); - client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT}; + auto down_server = co_await make_unavailable_server(); + client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT}; co_await client.request(operation_type::POST, PATH, CONTENT, as.reset()); + auto server = co_await make_available(down_server); + server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, rjson::quote_json_string(status)}); + auto got_2_status_requests = co_await repeat_until([&]() -> future { // waiting for 2 status requests to be sure that node had a chance to become up co_return server->status_requests().size() >= 2; }); - BOOST_CHECK(got_2_status_requests); BOOST_CHECK(!client.is_up()); co_await client.close(); co_await server->stop(); + co_await down_server->stop(); } } diff --git a/test/vector_search/unavailable_server.hh b/test/vector_search/unavailable_server.hh new file mode 100644 index 0000000000..72c561d940 --- /dev/null +++ b/test/vector_search/unavailable_server.hh @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once +#include "utils.hh" +#include +#include +#include +#include +#include + +namespace test::vector_search { + +class unavailable_server { + struct Connection { + seastar::lowres_clock::time_point timestamp; + seastar::connected_socket socket; + }; + +public: + explicit unavailable_server(std::uint16_t port) + : _port(port) { + } + + seastar::future<> start() { + co_await listen(); + (void)seastar::try_with_gate(_gate, [this] { + return run(); + }); + } + + seastar::future<> stop() { + if (_socket) { + _socket.abort_accept(); + co_await _gate.close(); + } + } + + seastar::sstring host() const { + return _host; + } + + std::uint16_t port() const { + return _port; + } + + const std::vector& connections() const { + return _connections; + } + + seastar::future take_socket() { + _running = false; + // Make a connection to unblock accept() in run loop. + co_await seastar::connect(seastar::socket_address(seastar::net::inet_address(_host), _port)); + co_await _gate.close(); + co_return std::move(_socket); + } + + void auto_shutdown_off() { + _auto_shutdown = false; + } + + seastar::future<> shutdown_all_and_clear() { + std::vector tmp; + std::swap(tmp, _connections); + for (auto& conn : tmp) { + co_await shutdown(conn.socket); + } + } + +private: + seastar::future<> listen() { + co_await try_on_loopback_address([this](auto host) -> seastar::future<> { + seastar::listen_options opts; + opts.set_fixed_cpu(seastar::this_shard_id()); + _socket = seastar::listen(seastar::socket_address(seastar::net::inet_address(host), _port), opts); + _port = _socket.local_address().port(); + _host = std::move(host); + return seastar::make_ready_future<>(); + }); + } + + seastar::future<> run() { + while (_running) { + try { + auto result = co_await _socket.accept(); + _connections.push_back(Connection{.timestamp = seastar::lowres_clock::now(), .socket = std::move(result.connection)}); + if (_auto_shutdown) { + co_await shutdown(_connections.back().socket); + } + } catch (...) { + break; + } + } + } + + seastar::future<> shutdown(seastar::connected_socket& cs) { + cs.shutdown_output(); + cs.shutdown_input(); + co_await cs.wait_input_shutdown(); + } + + + seastar::server_socket _socket; + seastar::gate _gate; + std::uint16_t _port; + seastar::sstring _host; + std::vector _connections; + bool _running = true; + bool _auto_shutdown = true; +}; + +inline auto make_unavailable_server(std::uint16_t port = 0) -> seastar::future> { + auto ret = std::make_unique(port); + co_await ret->start(); + co_return ret; +} + +} // namespace test::vector_search diff --git a/test/vector_search/vector_store_client_test.cc b/test/vector_search/vector_store_client_test.cc index fba5c2a47d..715f49dfb0 100644 --- a/test/vector_search/vector_store_client_test.cc +++ b/test/vector_search/vector_store_client_test.cc @@ -9,6 +9,7 @@ #include "vector_search/vector_store_client.hh" #include "utils.hh" #include "vs_mock_server.hh" +#include "unavailable_server.hh" #include "seastar/core/future.hh" #include "seastar/core/when_all.hh" #include "db/config.hh" @@ -120,111 +121,6 @@ public: } }; -class unavailable_server { - struct Connection { - lowres_clock::time_point timestamp; - connected_socket socket; - }; - -public: - explicit unavailable_server(uint16_t port) - : _port(port) { - } - - future<> start() { - co_await listen(); - (void)try_with_gate(_gate, [this] { - return run(); - }); - } - - future<> stop() { - if (_socket) { - _socket.abort_accept(); - co_await _gate.close(); - } - } - - sstring host() const { - return _host; - } - - uint16_t port() const { - return _port; - } - - const std::vector& connections() const { - return _connections; - } - - future take_socket() { - _running = false; - // Make a connection to unblock accept() in run loop. - co_await seastar::connect(socket_address(net::inet_address(_host), _port)); - co_await _gate.close(); - co_return std::move(_socket); - } - - void auto_shutdown_off() { - _auto_shutdown = false; - } - - future<> shutdown_all_and_clear() { - std::vector tmp; - std::swap(tmp, _connections); - for (auto& conn : tmp) { - co_await shutdown(conn.socket); - } - } - -private: - future<> listen() { - co_await try_on_loopback_address([this](auto host) -> future<> { - ::listen_options opts; - opts.set_fixed_cpu(this_shard_id()); - _socket = seastar::listen(socket_address(net::inet_address(host), _port), opts); - _port = _socket.local_address().port(); - _host = std::move(host); - return make_ready_future<>(); - }); - } - - future<> run() { - while (_running) { - try { - auto result = co_await _socket.accept(); - _connections.push_back(Connection{.timestamp = lowres_clock::now(), .socket = std::move(result.connection)}); - if (_auto_shutdown) { - co_await shutdown(_connections.back().socket); - } - } catch (...) { - break; - } - } - } - - future<> shutdown(connected_socket& cs) { - cs.shutdown_output(); - cs.shutdown_input(); - co_await cs.wait_input_shutdown(); - } - - - seastar::server_socket _socket; - seastar::gate _gate; - uint16_t _port; - sstring _host; - std::vector _connections; - bool _running = true; - bool _auto_shutdown = true; -}; - -auto make_unavailable_server(uint16_t port = 0) -> future> { - auto ret = std::make_unique(port); - co_await ret->start(); - co_return ret; -} - } // namespace BOOST_AUTO_TEST_CASE(vector_store_client_test_ctor) { diff --git a/test/vector_search/vs_mock_server.hh b/test/vector_search/vs_mock_server.hh index 4632e4326b..ec1d91c54d 100644 --- a/test/vector_search/vs_mock_server.hh +++ b/test/vector_search/vs_mock_server.hh @@ -9,6 +9,7 @@ #pragma once #include "utils.hh" +#include "utils/rjson.hh" #include "seastar/http/request.hh" #include #include @@ -140,7 +141,7 @@ private: std::vector _ann_requests; std::vector _status_requests; response _next_ann_response{seastar::http::reply::status_type::ok, CORRECT_RESPONSE_FOR_TEST_TABLE}; - response _next_status_response{seastar::http::reply::status_type::ok, "SERVING"}; + response _next_status_response{seastar::http::reply::status_type::ok, rjson::quote_json_string("SERVING")}; const seastar::sstring INDEXES_PATH = "/api/v1/indexes"; }; diff --git a/vector_search/client.cc b/vector_search/client.cc index 90758c8bd6..522a620b0b 100644 --- a/vector_search/client.cc +++ b/vector_search/client.cc @@ -10,6 +10,7 @@ #include "utils.hh" #include "utils/exceptions.hh" #include "utils/exponential_backoff_retry.hh" +#include "utils/rjson.hh" #include #include #include @@ -54,10 +55,6 @@ bool is_request_aborted(std::exception_ptr& err) { return try_catch(err) != nullptr; } -bool is_server_error(http::reply::status_type status) { - return status >= http::reply::status_type::internal_server_error; -} - future map_err(std::exception_ptr& err) { if (is_server_unavailable(err)) { co_return service_unavailable_error{}; @@ -94,14 +91,7 @@ seastar::future client::request( } co_return std::unexpected{co_await map_err(err)}; } - auto resp = co_await std::move(f); - if (is_server_error(resp.status)) { - _logger.warn("client ({}:{}): received HTTP status {}: {}", _endpoint.host, _endpoint.port, static_cast(resp.status), - response_content_to_sstring(resp.content)); - handle_server_unavailable(); - co_return std::unexpected{service_unavailable_error{}}; - } - co_return resp; + co_return co_await std::move(f); } seastar::future client::request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional content, @@ -128,7 +118,8 @@ seastar::future client::check_status() { co_return false; } auto resp = co_await std::move(f); - co_return response_content_to_sstring(resp.content) == "SERVING"; + auto json = rjson::parse(std::move(resp.content)); + co_return json.IsString() && json.GetString() == std::string_view("SERVING"); } seastar::future<> client::close() {