Merge '[Backport 2025.4] vector_search: Fix error handling and status parsing' from Scylladb[bot]

vector_search: Fix error handling and status parsing

This change addresses two issues in the vector search client that caused
validator test failures: incorrect handling of 5xx server errors and
faulty status response parsing.

1.  5xx Error Handling:
    Previously, a 5xx response (e.g., 503 Service Unavailable) from the
    underlying vector store for an `/ann` search request was incorrectly
    interpreted as a node failure. This would cause the node to be marked
    as down, even for transient issues like an index scan being in progress.

    This change ensures that 5xx errors are treated as transient search
    failures, not node failures, preventing nodes from being incorrectly
    marked as down.

2.  Status Response Parsing:
    The logic for parsing status responses from the vector store was
    flawed. This has been corrected to ensure proper parsing.

Fixes: SCYLLADB-50

Backport to 2025.4 as this problem is present on this branch.

- (cherry picked from commit 05b9cafb57)

- (cherry picked from commit 366ecef1b9)

- (cherry picked from commit 9563d87f74)

Parent PR: #27111

Closes scylladb/scylladb#27145

* github.com:scylladb/scylladb:
  vector_search: Don't mark nodes as down on 5xx server errors
  test: vector_search: Move unavailable_server to dedicated file
  vector_search: Fix status response parsing
This commit is contained in:
Piotr Dulikowski
2025-11-22 18:53:44 +01:00
5 changed files with 176 additions and 145 deletions

View File

@@ -9,7 +9,9 @@
#include "seastar/http/common.hh"
#include "vector_search/client.hh"
#include "vs_mock_server.hh"
#include "unavailable_server.hh"
#include "utils.hh"
#include "utils/rjson.hh"
#include <boost/test/tools/old/interface.hpp>
#include <seastar/testing/test_case.hh>
#include <seastar/coroutine/as_future.hh>
@@ -27,10 +29,18 @@ const auto REQUEST_TIMEOUT = utils::updateable_value<uint32_t>{100};
constexpr auto PATH = "/api/v1/indexes/ks/idx/ann";
constexpr auto CONTENT = R"({"vector": [0.1, 0.2, 0.3], "limit": 10})";
client::endpoint_type make_endpoint(const std::unique_ptr<vs_mock_server>& server) {
template <typename Server>
client::endpoint_type make_endpoint(const std::unique_ptr<Server>& server) {
return client::endpoint_type{server->host(), server->port(), seastar::net::inet_address(server->host())};
}
future<std::unique_ptr<vs_mock_server>> make_available(std::unique_ptr<unavailable_server>& down_server) {
// Replace the unavailable server with an available one.
auto server = std::make_unique<vs_mock_server>();
co_await server->start(co_await down_server->take_socket());
co_return server;
}
} // namespace
SEASTAR_TEST_CASE(is_up_after_construction) {
@@ -73,64 +83,72 @@ SEASTAR_TEST_CASE(is_up_when_server_returned_client_error_status) {
co_await server->stop();
}
SEASTAR_TEST_CASE(is_down_when_server_returned_server_error_status) {
SEASTAR_TEST_CASE(is_up_when_server_returned_server_error_status) {
abort_source_timeout as;
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"});
// The node might attempt to recover in the background by making a status request.
// To prevent a race condition where the node recovers before we check its status,
// we ensure the next status request also fails.
server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
BOOST_CHECK(!client.is_up());
BOOST_CHECK(!res);
BOOST_CHECK(std::holds_alternative<service_unavailable_error>(res.error()));
BOOST_CHECK(client.is_up());
BOOST_CHECK(res);
BOOST_CHECK(res->status == seastar::http::reply::status_type::internal_server_error);
co_await client.close();
co_await server->stop();
}
SEASTAR_TEST_CASE(is_down_when_server_returned_service_unavailable_status) {
SEASTAR_TEST_CASE(is_up_when_server_returned_service_unavailable_status) {
abort_source_timeout as;
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::service_unavailable, "Service Unavailable"});
// The node might attempt to recover in the background by making a status request.
// To prevent a race condition where the node recovers before we check its status,
// we ensure the next status request also fails.
server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
BOOST_CHECK(client.is_up());
BOOST_CHECK(res);
BOOST_CHECK(res->status == seastar::http::reply::status_type::service_unavailable);
co_await client.close();
co_await server->stop();
}
SEASTAR_TEST_CASE(is_down_when_server_is_not_available) {
abort_source_timeout as;
auto down_server = co_await make_unavailable_server();
client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
BOOST_CHECK(!client.is_up());
BOOST_CHECK(!res);
BOOST_CHECK(std::holds_alternative<service_unavailable_error>(res.error()));
co_await client.close();
co_await server->stop();
co_await down_server->stop();
}
SEASTAR_TEST_CASE(becomes_up_when_server_status_is_serving) {
abort_source_timeout as;
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"});
server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, "SERVING"});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT};
auto down_server = co_await make_unavailable_server();
client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT};
auto res = co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
auto server = co_await make_available(down_server);
server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, rjson::quote_json_string("SERVING")});
co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
auto became_up = co_await repeat_until([&client]() -> future<bool> {
co_return client.is_up();
});
BOOST_CHECK(became_up);
co_await client.close();
co_await server->stop();
co_await down_server->stop();
}
SEASTAR_TEST_CASE(remains_down_when_server_status_is_not_serving) {
@@ -141,21 +159,22 @@ SEASTAR_TEST_CASE(remains_down_when_server_status_is_not_serving) {
"BOOTSTRAPPING",
};
for (auto const& status : non_serving_statuses) {
auto server = co_await make_vs_mock_server();
server->next_ann_response(vs_mock_server::response{seastar::http::reply::status_type::internal_server_error, "Internal Server Error"});
server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, status});
client client{client_test_logger, make_endpoint(server), REQUEST_TIMEOUT};
auto down_server = co_await make_unavailable_server();
client client{client_test_logger, make_endpoint(down_server), REQUEST_TIMEOUT};
co_await client.request(operation_type::POST, PATH, CONTENT, as.reset());
auto server = co_await make_available(down_server);
server->next_status_response(vs_mock_server::response{seastar::http::reply::status_type::ok, rjson::quote_json_string(status)});
auto got_2_status_requests = co_await repeat_until([&]() -> future<bool> {
// waiting for 2 status requests to be sure that node had a chance to become up
co_return server->status_requests().size() >= 2;
});
BOOST_CHECK(got_2_status_requests);
BOOST_CHECK(!client.is_up());
co_await client.close();
co_await server->stop();
co_await down_server->stop();
}
}

View File

@@ -0,0 +1,124 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "utils.hh"
#include <seastar/core/seastar.hh>
#include <seastar/net/api.hh>
#include <cstdint>
#include <vector>
#include <memory>
namespace test::vector_search {
class unavailable_server {
struct Connection {
seastar::lowres_clock::time_point timestamp;
seastar::connected_socket socket;
};
public:
explicit unavailable_server(std::uint16_t port)
: _port(port) {
}
seastar::future<> start() {
co_await listen();
(void)seastar::try_with_gate(_gate, [this] {
return run();
});
}
seastar::future<> stop() {
if (_socket) {
_socket.abort_accept();
co_await _gate.close();
}
}
seastar::sstring host() const {
return _host;
}
std::uint16_t port() const {
return _port;
}
const std::vector<Connection>& connections() const {
return _connections;
}
seastar::future<seastar::server_socket> take_socket() {
_running = false;
// Make a connection to unblock accept() in run loop.
co_await seastar::connect(seastar::socket_address(seastar::net::inet_address(_host), _port));
co_await _gate.close();
co_return std::move(_socket);
}
void auto_shutdown_off() {
_auto_shutdown = false;
}
seastar::future<> shutdown_all_and_clear() {
std::vector<Connection> tmp;
std::swap(tmp, _connections);
for (auto& conn : tmp) {
co_await shutdown(conn.socket);
}
}
private:
seastar::future<> listen() {
co_await try_on_loopback_address([this](auto host) -> seastar::future<> {
seastar::listen_options opts;
opts.set_fixed_cpu(seastar::this_shard_id());
_socket = seastar::listen(seastar::socket_address(seastar::net::inet_address(host), _port), opts);
_port = _socket.local_address().port();
_host = std::move(host);
return seastar::make_ready_future<>();
});
}
seastar::future<> run() {
while (_running) {
try {
auto result = co_await _socket.accept();
_connections.push_back(Connection{.timestamp = seastar::lowres_clock::now(), .socket = std::move(result.connection)});
if (_auto_shutdown) {
co_await shutdown(_connections.back().socket);
}
} catch (...) {
break;
}
}
}
seastar::future<> shutdown(seastar::connected_socket& cs) {
cs.shutdown_output();
cs.shutdown_input();
co_await cs.wait_input_shutdown();
}
seastar::server_socket _socket;
seastar::gate _gate;
std::uint16_t _port;
seastar::sstring _host;
std::vector<Connection> _connections;
bool _running = true;
bool _auto_shutdown = true;
};
inline auto make_unavailable_server(std::uint16_t port = 0) -> seastar::future<std::unique_ptr<unavailable_server>> {
auto ret = std::make_unique<unavailable_server>(port);
co_await ret->start();
co_return ret;
}
} // namespace test::vector_search

View File

@@ -9,6 +9,7 @@
#include "vector_search/vector_store_client.hh"
#include "utils.hh"
#include "vs_mock_server.hh"
#include "unavailable_server.hh"
#include "seastar/core/future.hh"
#include "seastar/core/when_all.hh"
#include "db/config.hh"
@@ -120,111 +121,6 @@ public:
}
};
class unavailable_server {
struct Connection {
lowres_clock::time_point timestamp;
connected_socket socket;
};
public:
explicit unavailable_server(uint16_t port)
: _port(port) {
}
future<> start() {
co_await listen();
(void)try_with_gate(_gate, [this] {
return run();
});
}
future<> stop() {
if (_socket) {
_socket.abort_accept();
co_await _gate.close();
}
}
sstring host() const {
return _host;
}
uint16_t port() const {
return _port;
}
const std::vector<Connection>& connections() const {
return _connections;
}
future<seastar::server_socket> take_socket() {
_running = false;
// Make a connection to unblock accept() in run loop.
co_await seastar::connect(socket_address(net::inet_address(_host), _port));
co_await _gate.close();
co_return std::move(_socket);
}
void auto_shutdown_off() {
_auto_shutdown = false;
}
future<> shutdown_all_and_clear() {
std::vector<Connection> tmp;
std::swap(tmp, _connections);
for (auto& conn : tmp) {
co_await shutdown(conn.socket);
}
}
private:
future<> listen() {
co_await try_on_loopback_address([this](auto host) -> future<> {
::listen_options opts;
opts.set_fixed_cpu(this_shard_id());
_socket = seastar::listen(socket_address(net::inet_address(host), _port), opts);
_port = _socket.local_address().port();
_host = std::move(host);
return make_ready_future<>();
});
}
future<> run() {
while (_running) {
try {
auto result = co_await _socket.accept();
_connections.push_back(Connection{.timestamp = lowres_clock::now(), .socket = std::move(result.connection)});
if (_auto_shutdown) {
co_await shutdown(_connections.back().socket);
}
} catch (...) {
break;
}
}
}
future<> shutdown(connected_socket& cs) {
cs.shutdown_output();
cs.shutdown_input();
co_await cs.wait_input_shutdown();
}
seastar::server_socket _socket;
seastar::gate _gate;
uint16_t _port;
sstring _host;
std::vector<Connection> _connections;
bool _running = true;
bool _auto_shutdown = true;
};
auto make_unavailable_server(uint16_t port = 0) -> future<std::unique_ptr<unavailable_server>> {
auto ret = std::make_unique<unavailable_server>(port);
co_await ret->start();
co_return ret;
}
} // namespace
BOOST_AUTO_TEST_CASE(vector_store_client_test_ctor) {

View File

@@ -9,6 +9,7 @@
#pragma once
#include "utils.hh"
#include "utils/rjson.hh"
#include "seastar/http/request.hh"
#include <seastar/core/future.hh>
#include <seastar/core/seastar.hh>
@@ -140,7 +141,7 @@ private:
std::vector<request> _ann_requests;
std::vector<request> _status_requests;
response _next_ann_response{seastar::http::reply::status_type::ok, CORRECT_RESPONSE_FOR_TEST_TABLE};
response _next_status_response{seastar::http::reply::status_type::ok, "SERVING"};
response _next_status_response{seastar::http::reply::status_type::ok, rjson::quote_json_string("SERVING")};
const seastar::sstring INDEXES_PATH = "/api/v1/indexes";
};

View File

@@ -10,6 +10,7 @@
#include "utils.hh"
#include "utils/exceptions.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/rjson.hh"
#include <seastar/http/request.hh>
#include <seastar/http/short_streams.hh>
#include <seastar/net/socket_defs.hh>
@@ -54,10 +55,6 @@ bool is_request_aborted(std::exception_ptr& err) {
return try_catch<abort_requested_exception>(err) != nullptr;
}
bool is_server_error(http::reply::status_type status) {
return status >= http::reply::status_type::internal_server_error;
}
future<client::request_error> map_err(std::exception_ptr& err) {
if (is_server_unavailable(err)) {
co_return service_unavailable_error{};
@@ -94,14 +91,7 @@ seastar::future<client::request_result> client::request(
}
co_return std::unexpected{co_await map_err(err)};
}
auto resp = co_await std::move(f);
if (is_server_error(resp.status)) {
_logger.warn("client ({}:{}): received HTTP status {}: {}", _endpoint.host, _endpoint.port, static_cast<int>(resp.status),
response_content_to_sstring(resp.content));
handle_server_unavailable();
co_return std::unexpected{service_unavailable_error{}};
}
co_return resp;
co_return co_await std::move(f);
}
seastar::future<client::response> client::request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content,
@@ -128,7 +118,8 @@ seastar::future<bool> client::check_status() {
co_return false;
}
auto resp = co_await std::move(f);
co_return response_content_to_sstring(resp.content) == "SERVING";
auto json = rjson::parse(std::move(resp.content));
co_return json.IsString() && json.GetString() == std::string_view("SERVING");
}
seastar::future<> client::close() {