Merge 'vector_store_client: Add support for failed-node backoff' from Karol Nowacki

vector_search: Add backoff for failed nodes

Introduces logic to mark nodes that fail to answer an ANN request as
"down". Down nodes are omitted from further requests until they
successfully respond to a health check.

Health checks for down nodes are performed in the background using the
`status` endpoint, with an exponential backoff retry policy ranging
from 100ms to 20s.

Client list management is moved to separate files (clients.cc/clients.hh)
to improve code organization and modularity.

References: VECTOR-187.

Backport to 2025.4 as this feature is expected to be available in 2025.4.

Closes scylladb/scylladb#26308

* github.com:scylladb/scylladb:
  vector_search: Set max backoff delay to 2x read request timeout
  vector_search: Report status check exception via on_internal_error_noexcept
  vector_search: Extract client management into dedicated class
  vector_search: Add backoff for failed clients
  vector_search: Make endpoint available
  vector_search: Use std::expected for low-level client errors
  vector_search: Extract client class
This commit is contained in:
Piotr Dulikowski
2025-11-14 11:49:18 +01:00
12 changed files with 811 additions and 297 deletions

View File

@@ -1265,6 +1265,8 @@ scylla_core = (['message/messaging_service.cc',
'utils/disk_space_monitor.cc',
'vector_search/vector_store_client.cc',
'vector_search/dns.cc',
'vector_search/client.cc',
'vector_search/clients.cc'
] + [Antlr3Grammar('cql3/Cql.g')] \
+ scylla_raft_core
)

View File

@@ -1,2 +1,3 @@
extra_scylla_cmdline_options:
- '--reactor-backend=linux-aio'
- '--fail-on-abandoned-failed-futures=true'

View File

@@ -6,6 +6,8 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "seastar/core/future.hh"
#include "seastar/core/when_all.hh"
#include "vector_search/vector_store_client.hh"
#include "db/config.hh"
#include "exceptions/exceptions.hh"
@@ -67,13 +69,25 @@ auto listen_on_port(std::unique_ptr<http_server> server, sstring host, uint16_t
co_return std::make_tuple(std::move(server), listeners[0].local_address().port());
}
auto new_http_server(std::function<void(routes& r)> set_routes, sstring host = LOCALHOST, uint16_t port = 0)
-> future<std::tuple<std::unique_ptr<http_server>, socket_address>> {
auto make_http_server(std::function<void(routes& r)> set_routes) {
static unsigned id = 0;
auto server = std::make_unique<http_server>(fmt::format("test_vector_store_client_{}", id++));
set_routes(server->_routes);
server->set_content_streaming(true);
co_return co_await listen_on_port(std::move(server), std::move(host), port);
return server;
}
auto new_http_server(std::function<void(routes& r)> set_routes, sstring host = LOCALHOST, uint16_t port = 0)
-> future<std::tuple<std::unique_ptr<http_server>, socket_address>> {
co_return co_await listen_on_port(make_http_server(set_routes), std::move(host), port);
}
auto new_http_server(std::function<void(routes& r)> set_routes, server_socket socket) -> future<std::tuple<std::unique_ptr<http_server>, socket_address>> {
auto server = make_http_server(set_routes);
auto& listeners = http_server_tester::listeners(*server);
listeners.push_back(std::move(socket));
co_await server->do_accepts(listeners.size() - 1);
co_return std::make_tuple(std::move(server), listeners.back().local_address().port());
}
auto repeat_until(milliseconds timeout, std::function<future<bool>()> func) -> future<bool> {
@@ -224,6 +238,11 @@ public:
};
class unavailable_server {
struct Connection {
lowres_clock::time_point timestamp;
connected_socket socket;
};
public:
explicit unavailable_server(uint16_t port)
: _port(port) {
@@ -237,8 +256,10 @@ public:
}
future<> stop() {
_socket.abort_accept();
co_await _gate.close();
if (_socket) {
_socket.abort_accept();
co_await _gate.close();
}
}
sstring host() const {
@@ -249,10 +270,30 @@ public:
return _port;
}
size_t connections() const {
const std::vector<Connection>& connections() const {
return _connections;
}
future<seastar::server_socket> take_socket() {
_running = false;
// Make a connection to unblock accept() in run loop.
co_await seastar::connect(socket_address(net::inet_address(_host), _port));
co_await _gate.close();
co_return std::move(_socket);
}
void auto_shutdown_off() {
_auto_shutdown = false;
}
future<> shutdown_all_and_clear() {
std::vector<Connection> tmp;
std::swap(tmp, _connections);
for (auto& conn : tmp) {
co_await shutdown(conn.socket);
}
}
private:
future<> listen() {
co_await try_on_loopback_address([this](auto host) -> future<> {
@@ -266,24 +307,33 @@ private:
}
future<> run() {
while (true) {
while (_running) {
try {
auto s = co_await _socket.accept();
_connections++;
s.connection.shutdown_output();
s.connection.shutdown_input();
co_await s.connection.wait_input_shutdown();
auto result = co_await _socket.accept();
_connections.push_back(Connection{.timestamp = lowres_clock::now(), .socket = std::move(result.connection)});
if (_auto_shutdown) {
co_await shutdown(_connections.back().socket);
}
} catch (...) {
break;
}
}
}
future<> shutdown(connected_socket& cs) {
cs.shutdown_output();
cs.shutdown_input();
co_await cs.wait_input_shutdown();
}
seastar::server_socket _socket;
seastar::gate _gate;
uint16_t _port;
sstring _host;
size_t _connections = 0;
std::vector<Connection> _connections;
bool _running = true;
bool _auto_shutdown = true;
};
auto make_unavailable_server(uint16_t port = 0) -> future<std::unique_ptr<unavailable_server>> {
@@ -318,6 +368,16 @@ public:
co_await listen();
}
future<> start(server_socket socket) {
auto [server, addr] = co_await new_http_server(
[this](auto& r) {
set_routes(r);
},
std::move(socket));
_http_server = std::move(server);
_port = addr.port();
}
future<> stop() {
co_await _http_server->stop();
}
@@ -342,11 +402,8 @@ private:
future<> listen() {
co_await try_on_loopback_address([this](auto host) -> future<> {
auto [s, addr] = co_await new_http_server(
[this](routes& r) {
auto ann = [this](std::unique_ptr<request> req, std::unique_ptr<reply> rep) -> future<std::unique_ptr<reply>> {
return handle_request(std::move(req), std::move(rep));
};
r.add(operation_type::POST, url(INDEXES_PATH).remainder("path"), new function_handler(ann, "json"));
[this](auto& r) {
set_routes(r);
},
host.c_str(), _port);
_http_server = std::move(s);
@@ -355,7 +412,7 @@ private:
});
}
future<std::unique_ptr<reply>> handle_request(std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
future<std::unique_ptr<reply>> handle_ann_request(std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
ann_req r{.path = INDEXES_PATH + "/" + req->get_path_param("path"), .body = co_await util::read_entire_stream_contiguous(*req->content_stream)};
_ann_requests.push_back(std::move(r));
rep->set_status(_next_ann_response.status);
@@ -363,6 +420,27 @@ private:
co_return rep;
}
future<std::unique_ptr<reply>> handle_status_request(std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
rep->set_status(status_type::ok);
rep->write_body("json", "SERVING");
co_return rep;
}
void set_routes(routes& r) {
r.add(operation_type::POST, url(INDEXES_PATH).remainder("path"),
new function_handler(
[this](std::unique_ptr<request> req, std::unique_ptr<reply> rep) -> future<std::unique_ptr<reply>> {
return handle_ann_request(std::move(req), std::move(rep));
},
"json"));
r.add(operation_type::GET, url("/api/v1/status").remainder("status"),
new function_handler(
[this](std::unique_ptr<request> req, std::unique_ptr<reply> rep) -> future<std::unique_ptr<reply>> {
return handle_status_request(std::move(req), std::move(rep));
},
"json"));
}
uint16_t _port = 0;
sstring _host;
std::unique_ptr<http_server> _http_server;
@@ -823,7 +901,7 @@ SEASTAR_TEST_CASE(vector_store_client_multiple_ips_high_availability) {
// repeat the ANN query until the unavailable server is queried.
BOOST_CHECK(co_await repeat_until([&]() -> future<bool> {
keys = co_await vs.ann("ks", "idx", schema, std::vector<float>{0.1, 0.2, 0.3}, 2, as.reset());
co_return unavail_s->connections() > 1;
co_return unavail_s->connections().size() > 1;
}));
// The query is successful because the client falls back to the available server
@@ -887,7 +965,7 @@ SEASTAR_TEST_CASE(vector_store_client_multiple_uris_high_availability) {
// repeat the ANN query until the unavailable server is queried.
BOOST_CHECK(co_await repeat_until([&]() -> future<bool> {
keys = co_await vs.ann("ks", "idx", schema, std::vector<float>{0.1, 0.2, 0.3}, 2, as.reset());
co_return unavail_s->connections() > 1;
co_return unavail_s->connections().size() > 1;
}));
// The query is successful because the client falls back to the available server
@@ -1028,3 +1106,131 @@ SEASTAR_TEST_CASE(vector_store_client_test_paging_warning_doesnt_show_when_limit
return s1->stop();
});
}
SEASTAR_TEST_CASE(vector_store_client_node_recovery_after_backoff) {
auto unavail_server = co_await make_unavailable_server();
std::unique_ptr<vs_mock_server> avail_server;
constexpr auto HOSTNAME = "server.node";
auto cfg = cql_test_config();
cfg.db_config->vector_store_primary_uri.set(format("http://{}:{}", HOSTNAME, unavail_server->port()));
co_await do_with_cql_env(
[&](cql_test_env& env) -> future<> {
auto as = abort_source_timeout();
auto schema = co_await create_test_table(env, "ks", "idx");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{HOSTNAME, std::vector<std::string>{unavail_server->host()}}});
vs.start_background_tasks();
// Send request to unavailable node - this will put the node to backoff.
auto result = co_await vs.ann("ks", "idx", schema, std::vector<float>{0.1, 0.2, 0.3}, 2, as.reset());
BOOST_CHECK(!result);
BOOST_CHECK(std::holds_alternative<vector_store_client::service_unavailable>(result.error()));
// Replace the unavailable server with an available one.
avail_server = std::make_unique<vs_mock_server>();
co_await avail_server->start(co_await unavail_server->take_socket());
// Wait until node is taken out of the backoff state and used for requests again.
BOOST_CHECK(co_await repeat_until([&]() -> future<bool> {
auto result = co_await vs.ann("ks", "idx", schema, std::vector<float>{0.1, 0.2, 0.3}, 2, as.reset());
co_return result.has_value();
}));
},
cfg)
.finally(coroutine::lambda([&] -> future<> {
co_await unavail_server->stop();
if (avail_server) {
co_await avail_server->stop();
}
}));
}
SEASTAR_TEST_CASE(vector_store_client_single_status_check_after_concurrent_failures) {
using keys = std::expected<vector_store_client::primary_keys, vector_store_client::ann_error>;
auto unavail_s = co_await make_unavailable_server();
auto cfg = cql_test_config();
cfg.db_config->vector_store_primary_uri.set(format("http://unavail.node:{}", unavail_s->port()));
co_await do_with_cql_env(
[&](cql_test_env& env) -> future<> {
std::vector<future<keys>> requests;
unavail_s->auto_shutdown_off();
constexpr auto NUM_OF_PARALLEL_REQUESTS = 50;
auto as = abort_source_timeout();
auto schema = co_await create_test_table(env, "ks", "idx");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{"unavail.node", std::vector<std::string>{unavail_s->host()}}});
vs.start_background_tasks();
for (int i = 0; i < NUM_OF_PARALLEL_REQUESTS; ++i) {
requests.push_back(vs.ann("ks", "idx", schema, std::vector<float>{0.1, 0.2, 0.3}, 2, as.reset()));
}
// Wait for all requests to establish a connection with the server.
co_await repeat_until([&unavail_s]() -> future<bool> {
co_return unavail_s->connections().size() == NUM_OF_PARALLEL_REQUESTS;
});
// Shutdown all connections, causing all requests to fail.
// The number of connections will drop to zero.
co_await unavail_s->shutdown_all_and_clear();
// Wait for all requests to complete.
co_await when_all(requests.begin(), requests.end());
// After the backoff period, a single status check is expected to verify node recovery.
// The test server keeps the subsequent status check connection open (auto_shutdown_off()).
// This prevents the client's backoff mechanism from sending another status request
// while the first one is pending, ensuring that exactly one new connection is made.
// This makes the test assertion deterministic.
BOOST_CHECK(co_await repeat_until([&]() -> future<bool> {
co_return unavail_s->connections().size() == 1;
}));
},
cfg)
.finally(coroutine::lambda([&] -> future<> {
co_await unavail_s->stop();
}));
}
SEASTAR_TEST_CASE(vector_store_client_updates_backoff_max_time_from_read_request_timeout_cfg) {
auto unavail_s = co_await make_unavailable_server();
auto cfg = cql_test_config();
cfg.db_config->vector_store_primary_uri.set(format("http://unavail.node:{}", unavail_s->port()));
co_await do_with_cql_env(
[&](cql_test_env& env) -> future<> {
auto as = abort_source_timeout();
auto schema = co_await create_test_table(env, "ks", "idx");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{"unavail.node", std::vector<std::string>{unavail_s->host()}}});
vs.start_background_tasks();
// Set request timeout to 100ms, hence max backoff time is 2x100ms = 200ms.
cfg.db_config->read_request_timeout_in_ms.set(100);
// Trigger status checking by making ANN request to unavailable server.
co_await vs.ann("ks", "idx", schema, std::vector<float>{0.1, 0.2, 0.3}, 2, as.reset());
co_await repeat_until([&unavail_s]() -> future<bool> {
// Wait for 1 ANN request + 4 status check connections (5 total)
co_return unavail_s->connections().size() > 4;
});
// Verify backoff timing between status check connections.
// Skip the first connection (ANN request) and analyze status check intervals.
auto duration_between_1st_and_2nd_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
unavail_s->connections().at(2).timestamp - unavail_s->connections().at(1).timestamp);
BOOST_CHECK_GE(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(100));
BOOST_CHECK_LT(duration_between_1st_and_2nd_status_check, std::chrono::milliseconds(200));
auto duration_between_2nd_and_3rd_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
unavail_s->connections().at(3).timestamp - unavail_s->connections().at(2).timestamp);
// Max backoff time reached at 200ms, so subsequent status checks use fixed 200ms intervals.
BOOST_CHECK_GE(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(200)); // 200ms = 100ms * 2
BOOST_CHECK_LT(duration_between_2nd_and_3rd_status_check, std::chrono::milliseconds(400));
auto duration_between_3rd_and_4th_status_check = std::chrono::duration_cast<std::chrono::milliseconds>(
unavail_s->connections().at(4).timestamp - unavail_s->connections().at(3).timestamp);
BOOST_CHECK_GE(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(200));
BOOST_CHECK_LT(duration_between_3rd_and_4th_status_check, std::chrono::milliseconds(400));
},
cfg)
.finally(coroutine::lambda([&] -> future<> {
co_await unavail_s->stop();
}));
}

View File

@@ -2,7 +2,9 @@ add_library(vector_search STATIC)
target_sources(vector_search
PRIVATE
vector_store_client.cc
dns.cc)
dns.cc
client.cc
clients.cc)
target_link_libraries(vector_search
PUBLIC
Seastar::seastar

159
vector_search/client.cc Normal file
View File

@@ -0,0 +1,159 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "client.hh"
#include "utils/exceptions.hh"
#include "utils/exponential_backoff_retry.hh"
#include <seastar/http/request.hh>
#include <seastar/http/short_streams.hh>
#include <seastar/net/socket_defs.hh>
#include <seastar/net/api.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/core/on_internal_error.hh>
#include <chrono>
#include <fmt/format.h>
using namespace seastar;
using namespace std::chrono_literals;
namespace vector_search {
namespace {
class client_connection_factory : public http::experimental::connection_factory {
socket_address _addr;
public:
explicit client_connection_factory(socket_address addr)
: _addr(addr) {
}
future<connected_socket> make([[maybe_unused]] abort_source* as) override {
auto socket = co_await seastar::connect(_addr, {}, transport::TCP);
socket.set_nodelay(true);
socket.set_keepalive_parameters(net::tcp_keepalive_params{
.idle = 60s,
.interval = 60s,
.count = 10,
});
socket.set_keepalive(true);
co_return socket;
}
};
bool is_server_unavailable(std::exception_ptr& err) {
return try_catch<std::system_error>(err) != nullptr;
}
bool is_request_aborted(std::exception_ptr& err) {
return try_catch<abort_requested_exception>(err) != nullptr;
}
future<client::request_error> map_err(std::exception_ptr& err) {
if (is_server_unavailable(err)) {
co_return service_unavailable_error{};
}
if (is_request_aborted(err)) {
co_return aborted_error{};
}
co_await coroutine::return_exception_ptr(err); // rethrow
co_return client::request_error{}; // unreachable
}
auto constexpr BACKOFF_RETRY_MIN_TIME = 100ms;
} // namespace
client::client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value<uint32_t> request_timeout_in_ms)
: _endpoint(std::move(endpoint_))
, _http_client(std::make_unique<client_connection_factory>(socket_address(endpoint_.ip, endpoint_.port)))
, _logger(logger)
, _request_timeout(std::move(request_timeout_in_ms)) {
}
seastar::future<client::request_result> client::request(
seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content, seastar::abort_source& as) {
if (is_checking_status_in_progress()) {
co_return std::unexpected(service_unavailable_error{});
}
auto f = co_await seastar::coroutine::as_future(request_impl(method, std::move(path), std::move(content), std::nullopt, as));
if (f.failed()) {
auto err = f.get_exception();
if (is_server_unavailable(err)) {
handle_server_unavailable();
}
co_return std::unexpected{co_await map_err(err)};
}
co_return co_await std::move(f);
}
seastar::future<client::response> client::request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content,
std::optional<seastar::http::reply::status_type>&& expected_status, seastar::abort_source& as) {
auto req = http::request::make(method, _endpoint.host, std::move(path));
if (content) {
req.write_body("json", std::move(*content));
}
auto resp = response{seastar::http::reply::status_type::ok, std::vector<seastar::temporary_buffer<char>>()};
auto handler = [&resp](http::reply const& reply, input_stream<char> body) -> future<> {
resp.status = reply._status;
resp.content = co_await util::read_entire_stream(body);
};
co_await _http_client.make_request(std::move(req), std::move(handler), std::move(expected_status), &as);
co_return resp;
}
seastar::future<bool> client::check_status() {
auto f = co_await coroutine::as_future(request_impl(httpd::operation_type::GET, "/api/v1/status", std::nullopt, http::reply::status_type::ok, _as));
auto ret = !f.failed();
f.ignore_ready_future();
co_return ret;
}
seastar::future<> client::close() {
_as.request_abort();
co_await std::exchange(_checking_status_future, make_ready_future());
co_await _http_client.close();
}
void client::handle_server_unavailable() {
if (!is_checking_status_in_progress()) {
_checking_status_future = run_checking_status();
}
}
seastar::future<> client::run_checking_status() {
struct stop_retry {};
auto f = co_await coroutine::as_future(
exponential_backoff_retry::do_until_value(BACKOFF_RETRY_MIN_TIME, backoff_retry_max(), _as, [this] -> future<std::optional<stop_retry>> {
auto success = co_await check_status();
if (success) {
co_return stop_retry{};
}
co_return std::nullopt;
}));
if (f.failed()) {
if (auto err = f.get_exception(); !is_request_aborted(err)) {
// Report internal error for exceptions other than abort
on_internal_error_noexcept(_logger, fmt::format("exception while checking status: {}", err));
}
}
co_return;
}
bool client::is_checking_status_in_progress() const {
return !_checking_status_future.available();
}
std::chrono::milliseconds client::backoff_retry_max() const {
std::chrono::milliseconds ret{_request_timeout.get()};
return ret * 2;
}
} // namespace vector_search

70
vector_search/client.hh Normal file
View File

@@ -0,0 +1,70 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "error.hh"
#include "utils/log.hh"
#include "utils/updateable_value.hh"
#include <chrono>
#include <seastar/core/future.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/http/client.hh>
#include <seastar/http/common.hh>
#include <optional>
#include <expected>
#include <variant>
namespace vector_search {
class client {
public:
struct response {
seastar::http::reply::status_type status;
std::vector<seastar::temporary_buffer<char>> content;
};
struct endpoint_type {
seastar::sstring host;
std::uint16_t port;
seastar::net::inet_address ip;
};
using request_error = std::variant<aborted_error, service_unavailable_error>;
using request_result = std::expected<response, request_error>;
explicit client(logging::logger& logger, endpoint_type endpoint_, utils::updateable_value<uint32_t> request_timeout_in_ms);
seastar::future<request_result> request(
seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content, seastar::abort_source& as);
seastar::future<> close();
const endpoint_type& endpoint() const {
return _endpoint;
}
private:
seastar::future<response> request_impl(seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content,
std::optional<seastar::http::reply::status_type>&& expected, seastar::abort_source& as);
seastar::future<bool> check_status();
void handle_server_unavailable();
seastar::future<> run_checking_status();
bool is_checking_status_in_progress() const;
std::chrono::milliseconds backoff_retry_max() const;
endpoint_type _endpoint;
seastar::http::experimental::client _http_client;
seastar::future<> _checking_status_future = seastar::make_ready_future();
seastar::abort_source _as;
logging::logger& _logger;
utils::updateable_value<uint32_t> _request_timeout;
};
} // namespace vector_search

168
vector_search/clients.cc Normal file
View File

@@ -0,0 +1,168 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "clients.hh"
#include "load_balancer.hh"
#include "utils/exceptions.hh"
#include <random>
#include <expected>
#include <seastar/coroutine/as_future.hh>
#include <seastar/core/lowres_clock.hh>
using namespace seastar;
namespace vector_search {
namespace {
/// Timeout for waiting for a new client to be available
constexpr auto WAIT_FOR_CLIENT_TIMEOUT = std::chrono::seconds(5);
/// The number of times to retry a request if all nodes fail with an unavailable error.
constexpr auto REQUEST_RETRIES = 3;
static thread_local auto random_engine = std::default_random_engine(std::random_device{}());
/// Wait for a condition variable to be signaled or timeout.
auto wait_for_signal(condition_variable& cv, lowres_clock::time_point timeout) -> future<void> {
auto result = co_await coroutine::as_future(cv.wait(timeout));
if (result.failed()) {
auto err = result.get_exception();
if (try_catch<condition_variable_timed_out>(err) != nullptr) {
co_return;
}
co_await coroutine::return_exception_ptr(std::move(err));
}
co_return;
}
template <typename Variant>
auto make_unexpected(const auto& err) {
return std::unexpected{std::visit(
[](auto&& err) {
return Variant{err};
},
err)};
};
} // namespace
clients::clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value<uint32_t> request_timeout_in_ms)
: _producer([&]() -> future<clients_vec> {
return try_with_gate(_gate, [this] -> future<clients_vec> {
_trigger_refresh();
co_await wait_for_signal(_refresh_cv, lowres_clock::now() + _timeout);
co_return _clients;
});
})
, _trigger_refresh(std::move(trigger_refresh))
, _timeout(WAIT_FOR_CLIENT_TIMEOUT)
, _logger(logger)
, _request_timeout_in_ms(std::move(request_timeout_in_ms)) {
}
future<clients::request_result> clients::request(
seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content, seastar::abort_source& as) {
for (auto retries = 0; retries < REQUEST_RETRIES; ++retries) {
auto clients = co_await get_clients(as);
if (!clients) {
co_return make_unexpected<clients::request_error>(clients.error());
}
load_balancer lb(std::move(*clients), random_engine);
while (auto client = lb.next()) {
auto result = co_await client->request(method, path, content, as);
if (result) {
co_return std::move(result.value());
}
if (!result && std::holds_alternative<service_unavailable_error>(result.error())) {
// try next client
continue;
}
co_return make_unexpected<clients::request_error>(result.error());
}
_trigger_refresh();
}
co_return std::unexpected{service_unavailable_error{}};
}
/// Get the current http client or wait for a new one to be available.
future<clients::get_clients_result> clients::get_clients(abort_source& as) {
if (!_clients.empty()) {
co_return _clients;
}
auto current_clients = co_await coroutine::as_future(_producer(as));
if (current_clients.failed()) {
auto err = current_clients.get_exception();
if (as.abort_requested()) {
co_return std::unexpected{aborted_error{}};
}
co_await coroutine::return_exception_ptr(std::move(err));
}
auto clients = co_await std::move(current_clients);
if (clients.empty()) {
co_return std::unexpected{addr_unavailable_error{}};
}
co_return clients;
}
future<> clients::handle_changed(const std::vector<uri>& uris, const dns::host_address_map& addrs) {
clear();
for (const auto& uri : uris) {
auto it = addrs.find(uri.host);
if (it != addrs.end()) {
for (const auto& addr : it->second) {
_clients.push_back(make_lw_shared<client>(_logger, client::endpoint_type{uri.host, uri.port, addr}, _request_timeout_in_ms));
}
}
}
_refresh_cv.broadcast();
co_await close_old_clients();
}
future<> clients::stop() {
_refresh_cv.signal();
co_await _gate.close();
co_await close_clients();
co_await close_old_clients();
}
void clients::clear() {
_old_clients.insert(_old_clients.end(), std::make_move_iterator(_clients.begin()), std::make_move_iterator(_clients.end()));
_clients.clear();
}
future<> clients::close_clients() {
for (auto& client : _clients) {
co_await client->close();
}
_clients.clear();
}
future<> clients::close_old_clients() {
// iterate over old clients and close them. There is a co_await in the loop
// so we need to use [] accessor and copying clients to avoid dangling references of iterators.
// NOLINTNEXTLINE(modernize-loop-convert)
for (auto it = 0U; it < _old_clients.size(); ++it) {
auto& client = _old_clients[it];
if (client && client.owned()) {
auto client_cloned = client;
client = nullptr;
co_await client_cloned->close();
}
}
std::erase_if(_old_clients, [](auto const& client) {
return !client;
});
}
} // namespace vector_search

69
vector_search/clients.hh Normal file
View File

@@ -0,0 +1,69 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "client.hh"
#include "dns.hh"
#include "uri.hh"
#include "utils/sequential_producer.hh"
#include "vector_search/error.hh"
#include "utils/log.hh"
#include "utils/updateable_value.hh"
#include <expected>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/condition-variable.hh>
#include <vector>
namespace vector_search {
class clients {
public:
using refresh_trigger_callback = std::function<void()>;
using request_error = std::variant<aborted_error, addr_unavailable_error, service_unavailable_error>;
using request_result = std::expected<client::response, request_error>;
using clients_vec = std::vector<seastar::lw_shared_ptr<client>>;
using get_clients_error = std::variant<aborted_error, addr_unavailable_error>;
using get_clients_result = std::expected<clients_vec, get_clients_error>;
explicit clients(logging::logger& logger, refresh_trigger_callback trigger_refresh, utils::updateable_value<uint32_t> request_timeout_in_ms);
seastar::future<request_result> request(
seastar::httpd::operation_type method, seastar::sstring path, std::optional<seastar::sstring> content, seastar::abort_source& as);
seastar::future<> handle_changed(const std::vector<uri>& uris, const dns::host_address_map& addrs);
seastar::future<> stop();
void clear();
seastar::future<get_clients_result> get_clients(seastar::abort_source& as);
void timeout(std::chrono::milliseconds timeout) {
_timeout = timeout;
}
private:
seastar::future<> close_clients();
seastar::future<> close_old_clients();
clients_vec _clients;
sequential_producer<clients_vec> _producer;
refresh_trigger_callback _trigger_refresh;
seastar::gate _gate;
seastar::condition_variable _refresh_cv;
std::chrono::milliseconds _timeout;
clients_vec _old_clients;
logging::logger& _logger;
utils::updateable_value<uint32_t> _request_timeout_in_ms;
};
} // namespace vector_search

57
vector_search/error.hh Normal file
View File

@@ -0,0 +1,57 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/http/reply.hh>
#include <seastar/core/sstring.hh>
#include <fmt/format.h>
namespace vector_search {
/// The service is disabled.
struct disabled_error {};
/// The operation was aborted.
struct aborted_error {};
/// The vector-store addr is unavailable (not possible to get an addr from the dns service).
struct addr_unavailable_error {};
/// The vector-store service is unavailable.
struct service_unavailable_error {};
/// The error from the vector-store service.
struct service_error {
seastar::http::reply::status_type status; ///< The HTTP status code from the vector-store service.
};
/// An unsupported reply format from the vector-store service.
struct service_reply_format_error {};
struct error_visitor {
seastar::sstring operator()(service_error e) const {
return fmt::format("Vector Store error: HTTP status {}", e.status);
}
seastar::sstring operator()(disabled_error) const {
return fmt::format("Vector Store is disabled");
}
seastar::sstring operator()(aborted_error) const {
return fmt::format("Vector Store request was aborted");
}
seastar::sstring operator()(addr_unavailable_error) const {
return fmt::format("Vector Store service address could not be fetched from DNS");
}
seastar::sstring operator()(service_unavailable_error) const {
return fmt::format("Vector Store service is unavailable");
}
seastar::sstring operator()(service_reply_format_error) const {
return fmt::format("Vector Store returned an invalid JSON");
}
};
} // namespace vector_search

20
vector_search/uri.hh Normal file
View File

@@ -0,0 +1,20 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/core/sstring.hh>
#include <cstdint>
namespace vector_search {
struct uri {
seastar::sstring host;
std::uint16_t port;
};
} // namespace vector_search

View File

@@ -9,6 +9,8 @@
#include "vector_store_client.hh"
#include "dns.hh"
#include "load_balancer.hh"
#include "clients.hh"
#include "uri.hh"
#include "db/config.hh"
#include "exceptions/exceptions.hh"
#include "utils/sequential_producer.hh"
@@ -53,14 +55,7 @@ using port_number = vector_search::vector_store_client::port_number;
using primary_key = vector_search::primary_key;
using primary_keys = vector_search::vector_store_client::primary_keys;
using service_reply_format_error = vector_search::vector_store_client::service_reply_format_error;
using tcp_keepalive_params = net::tcp_keepalive_params;
using time_point = lowres_clock::time_point;
/// Timeout for waiting for a new client to be available
constexpr auto WAIT_FOR_CLIENT_TIMEOUT = std::chrono::seconds(5);
/// The number of times to retry an /ann request if all nodes fail with a system error.
constexpr auto ANN_RETRIES = 3;
using uri = vector_search::uri;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
logging::logger vslogger("vector_store_client");
@@ -76,11 +71,6 @@ auto parse_port(std::string const& port_txt) -> std::optional<port_number> {
return port;
}
struct uri {
host_name host;
port_number port;
};
auto parse_service_uri(std::string_view uri_) -> std::optional<uri> {
constexpr auto URI_REGEX = R"(^http:\/\/([a-z0-9._-]+):([0-9]+)$)";
auto const uri_regex = std::regex(URI_REGEX);
@@ -98,20 +88,6 @@ auto parse_service_uri(std::string_view uri_) -> std::optional<uri> {
return {{host, *port}};
}
/// Wait for a condition variable to be signaled or timeout.
auto wait_for_signal(condition_variable& cv, time_point timeout) -> future<void> {
auto result = co_await coroutine::as_future(cv.wait(timeout));
if (result.failed()) {
auto err = result.get_exception();
if (try_catch<condition_variable_timed_out>(err) != nullptr) {
co_return;
}
co_await coroutine::return_exception_ptr(std::move(err));
}
co_return;
}
auto get_key_column_value(const rjson::value& item, std::size_t idx, const column_definition& column) -> std::expected<bytes, ann_error> {
auto const& column_name = column.name_as_text();
auto const* keys_obj = rjson::find(item, column_name);
@@ -203,63 +179,6 @@ auto read_ann_json(rjson::value const& json, schema_ptr const& schema) -> std::e
return std::move(keys);
}
class client_connection_factory : public http::experimental::connection_factory {
socket_address _addr;
public:
explicit client_connection_factory(socket_address addr)
: _addr(addr) {
}
future<connected_socket> make([[maybe_unused]] abort_source* as) override {
auto socket = co_await seastar::connect(_addr, {}, transport::TCP);
socket.set_nodelay(true);
socket.set_keepalive_parameters(tcp_keepalive_params{
.idle = 60s,
.interval = 60s,
.count = 10,
});
socket.set_keepalive(true);
co_return socket;
}
};
class http_client {
uri _uri;
inet_address _addr;
http::experimental::client impl;
public:
http_client(uri host_port_, inet_address addr)
: _uri(std::move(host_port_))
, _addr(std::move(addr))
, impl(std::make_unique<client_connection_factory>(socket_address(addr, _uri.port))) {
}
bool connects_to(inet_address const& a, port_number p) const {
return _addr == a && _uri.port == p;
}
seastar::future<> make_request(operation_type method, const http_path& path, const std::optional<json_content>& content,
http::experimental::client::reply_handler&& handle, abort_source* as) {
auto req = http::request::make(method, _uri.host, path);
if (content) {
req.write_body("json", *content);
}
return impl.make_request(std::move(req), std::move(handle), std::nullopt, as);
}
seastar::future<> close() {
return impl.close();
}
const inet_address& addr() const {
return _addr;
}
};
bool should_vector_store_service_be_disabled(std::vector<sstring> const& uris) {
return uris.empty() || uris[0].empty();
}
@@ -305,23 +224,15 @@ std::vector<sstring> get_hosts(const std::vector<uri>& uris) {
namespace vector_search {
struct vector_store_client::impl {
using clients_type = std::vector<lw_shared_ptr<http_client>>;
utils::observer<sstring> uri_observer;
clients_type current_clients;
clients_type old_clients;
std::vector<uri> _uris;
gate client_producer_gate;
condition_variable refresh_client_cv;
milliseconds wait_for_client_timeout = WAIT_FOR_CLIENT_TIMEOUT;
sequential_producer<clients_type> clients_producer;
dns dns;
uint64_t dns_refreshes = 0;
seastar::metrics::metric_groups _metrics;
clients _clients;
impl(utils::config_file::named_value<sstring> cfg)
impl(utils::config_file::named_value<sstring> cfg, utils::config_file::named_value<uint32_t> read_request_timeout_in_ms)
: uri_observer(cfg.observe([this](seastar::sstring uris_csv) {
try {
handle_uris_changed(parse_uris(uris_csv));
@@ -331,154 +242,38 @@ struct vector_store_client::impl {
}
}))
, _uris(parse_uris(cfg()))
, clients_producer([&]() -> future<clients_type> {
return try_with_gate(client_producer_gate, [this] -> future<clients_type> {
dns.trigger_refresh();
co_await wait_for_signal(refresh_client_cv, lowres_clock::now() + wait_for_client_timeout);
co_return current_clients;
});
})
, dns(vslogger, get_hosts(_uris), [this](auto const& addrs) -> future<> {
co_await handle_addresses_changed(addrs);
}, dns_refreshes) {
, dns(
vslogger, get_hosts(_uris),
[this](auto const& addrs) -> future<> {
co_await handle_addresses_changed(addrs);
},
dns_refreshes)
, _clients(
vslogger,
[this]() {
dns.trigger_refresh();
},
std::move(read_request_timeout_in_ms)) {
_metrics.add_group("vector_store", {seastar::metrics::make_gauge("dns_refreshes", seastar::metrics::description("Number of DNS refreshes"), [this] {
return dns_refreshes;
}).aggregate({seastar::metrics::shard_label})});
}
void handle_uris_changed(std::vector<uri> uris) {
clear_current_clients();
_clients.clear();
_uris = std::move(uris);
dns.hosts(get_hosts(_uris));
}
auto handle_addresses_changed(const dns::host_address_map& addrs) -> future<> {
clear_current_clients();
for (const auto& uri : _uris) {
auto it = addrs.find(uri.host);
if (it != addrs.end()) {
for (const auto& addr : it->second) {
current_clients.push_back(make_lw_shared<http_client>(uri, addr));
}
}
}
refresh_client_cv.broadcast();
co_await cleanup_old_clients();
future<> handle_addresses_changed(const dns::host_address_map& addrs) {
co_await _clients.handle_changed(_uris, addrs);
}
auto is_disabled() const -> bool {
return _uris.empty();
}
void clear_current_clients() {
old_clients.insert(old_clients.end(), std::make_move_iterator(current_clients.begin()), std::make_move_iterator(current_clients.end()));
current_clients.clear();
}
/// Cleanup current clients
auto cleanup_current_clients() -> future<> {
for (auto& client : current_clients) {
co_await client->close();
}
current_clients.clear();
}
/// Cleanup old clients that are no longer used.
auto cleanup_old_clients() -> future<> {
// iterate over old clients and close them. There is a co_await in the loop
// so we need to use [] accessor and copying clients to avoid dangling references of iterators.
// NOLINTNEXTLINE(modernize-loop-convert)
for (auto it = 0U; it < old_clients.size(); ++it) {
auto& client = old_clients[it];
if (client && client.owned()) {
auto client_cloned = client;
co_await client_cloned->close();
client_cloned = nullptr;
}
}
std::erase_if(old_clients, [](auto const& client) {
return !client;
});
}
using get_client_error = std::variant<aborted, addr_unavailable, disabled>;
/// Get the current http client or wait for a new one to be available.
auto get_clients(abort_source& as) -> future<std::expected<clients_type, get_client_error>> {
if (is_disabled()) {
co_return std::unexpected{disabled{}};
}
if (!current_clients.empty()) {
co_return current_clients;
}
auto current_clients = co_await coroutine::as_future(clients_producer(as));
if (current_clients.failed()) {
auto err = current_clients.get_exception();
if (as.abort_requested()) {
co_return std::unexpected{aborted{}};
}
co_await coroutine::return_exception_ptr(std::move(err));
}
auto clients = co_await std::move(current_clients);
if (clients.empty()) {
co_return std::unexpected{addr_unavailable{}};
}
co_return clients;
}
struct make_request_response {
http::reply::status_type status; ///< The HTTP status of the response.
std::vector<temporary_buffer<char>> content; ///< The content of the response.
};
using make_request_error = std::variant<aborted, addr_unavailable, service_unavailable, disabled>;
auto make_request(operation_type method, http_path path, std::optional<json_content> content, abort_source& as)
-> future<std::expected<make_request_response, make_request_error>> {
auto resp = make_request_response{.status = http::reply::status_type::ok, .content = std::vector<temporary_buffer<char>>()};
for (auto retries = 0; retries < ANN_RETRIES; ++retries) {
auto clients = co_await get_clients(as);
if (!clients) {
co_return std::unexpected{std::visit(
[](auto&& err) {
return make_request_error{err};
},
clients.error())};
}
load_balancer lb(std::move(*clients), random_engine);
while (auto client = lb.next()) {
auto result = co_await coroutine::as_future(client->make_request(
method, path, content,
[&resp](http::reply const& reply, input_stream<char> body) -> future<> {
resp.status = reply._status;
resp.content = co_await util::read_entire_stream(body);
},
&as));
if (result.failed()) {
auto err = result.get_exception();
if (as.abort_requested()) {
co_return std::unexpected{aborted{}};
}
if (try_catch<std::system_error>(err) == nullptr) {
co_await coroutine::return_exception_ptr(std::move(err));
}
// std::system_error means that the server is unavailable, so we retry
} else {
co_return resp;
}
}
dns.trigger_refresh();
}
co_return std::unexpected{service_unavailable{}};
}
auto ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, abort_source& as)
-> future<std::expected<primary_keys, ann_error>> {
if (is_disabled()) {
@@ -489,7 +284,7 @@ struct vector_store_client::impl {
auto path = format("/api/v1/indexes/{}/{}/ann", keyspace, name);
auto content = write_ann_json(std::move(vs_vector), limit);
auto resp = co_await make_request(operation_type::POST, std::move(path), std::move(content), as);
auto resp = co_await _clients.request(operation_type::POST, std::move(path), std::move(content), as);
if (!resp) {
co_return std::unexpected{std::visit(
[](auto&& err) {
@@ -515,7 +310,7 @@ struct vector_store_client::impl {
};
vector_store_client::vector_store_client(config const& cfg)
: _impl(std::make_unique<impl>(cfg.vector_store_primary_uri)) {
: _impl(std::make_unique<impl>(cfg.vector_store_primary_uri, cfg.read_request_timeout_in_ms)) {
}
vector_store_client::~vector_store_client() = default;
@@ -525,11 +320,8 @@ void vector_store_client::start_background_tasks() {
}
auto vector_store_client::stop() -> future<> {
_impl->refresh_client_cv.signal();
co_await _impl->client_producer_gate.close();
co_await _impl->_clients.stop();
co_await _impl->dns.stop();
co_await _impl->cleanup_old_clients();
co_await _impl->cleanup_current_clients();
}
auto vector_store_client::is_disabled() const -> bool {
@@ -546,7 +338,7 @@ void vector_store_client_tester::set_dns_refresh_interval(vector_store_client& v
}
void vector_store_client_tester::set_wait_for_client_timeout(vector_store_client& vsc, std::chrono::milliseconds timeout) {
vsc._impl->wait_for_client_timeout = timeout;
vsc._impl->_clients.timeout(timeout);
}
void vector_store_client_tester::set_dns_resolver(vector_store_client& vsc, std::function<future<std::vector<inet_address>>(sstring const&)> resolver) {
@@ -558,13 +350,13 @@ void vector_store_client_tester::trigger_dns_resolver(vector_store_client& vsc)
}
auto vector_store_client_tester::resolve_hostname(vector_store_client& vsc, abort_source& as) -> future<std::vector<inet_address>> {
auto clients = co_await vsc._impl->get_clients(as);
auto clients = co_await vsc._impl->_clients.get_clients(as);
std::vector<inet_address> ret;
if (!clients) {
co_return ret;
}
for (auto const& c : *clients) {
ret.push_back(c->addr());
ret.push_back(c->endpoint().ip);
}
co_return ret;
}

View File

@@ -11,6 +11,7 @@
#include "dht/decorated_key.hh"
#include "keys/keys.hh"
#include "seastarx.hh"
#include "error.hh"
#include <seastar/core/shared_future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/http/reply.hh>
@@ -49,48 +50,15 @@ public:
using schema_ptr = lw_shared_ptr<schema const>;
using status_type = http::reply::status_type;
/// The vector_store_client service is disabled.
struct disabled {};
/// The operation was aborted.
struct aborted {};
/// The vector-store addr is unavailable (not possible to get an addr from the dns service).
struct addr_unavailable {};
/// The vector-store service is unavailable.
struct service_unavailable {};
/// The error from the vector-store service.
struct service_error {
status_type status; ///< The HTTP status code from the vector-store service.
};
/// An unsupported reply format from the vector-store service.
struct service_reply_format_error {};
using disabled = disabled_error;
using aborted = aborted_error;
using addr_unavailable = addr_unavailable_error;
using service_unavailable = service_unavailable_error;
using service_error = service_error;
using service_reply_format_error = service_reply_format_error;
using ann_error = std::variant<disabled, aborted, addr_unavailable, service_unavailable, service_error, service_reply_format_error>;
struct ann_error_visitor {
sstring operator()(vector_store_client::service_error e) const {
return fmt::format("Vector Store error: HTTP status {}", e.status);
}
sstring operator()(vector_store_client::disabled) const {
return fmt::format("Vector Store is disabled");
}
sstring operator()(vector_store_client::aborted) const {
return fmt::format("Vector Store request was aborted");
}
sstring operator()(vector_store_client::addr_unavailable) const {
return fmt::format("Vector Store service address could not be fetched from DNS");
}
sstring operator()(vector_store_client::service_unavailable) const {
return fmt::format("Vector Store service is unavailable");
}
sstring operator()(vector_store_client::service_reply_format_error) const {
return fmt::format("Vector Store returned an invalid JSON");
}
};
using ann_error_visitor = error_visitor;
explicit vector_store_client(config const& cfg);
~vector_store_client();