Merge 'vector_search: test: migrate CQL tests for vector search from C++/Boost to pytest' from Karol Nowacki

Migrate vector search (ANN ordered select query) CQL tests from C++/Boost suite to pytest.

This migration includes:
- New pytest tests in `test/cqlpy/test_vector_search_with_vector_store_mock.py`
- VectorStoreMock server as pytest fixture to simulate vector store responses

The benefits of this migration are:
- Extended test coverage to verify CQL protocol serialization and driver
- Reduced overall test time (no compilation required for pytest)

Fixes SCYLLADB-695

No backport needed as this is a refactoring.

Closes scylladb/scylladb#29593

* github.com:scylladb/scylladb:
  vector_search: test: migrate paging warnings tests to Python
  vector_search: test: migrate local_vector_index to Python
  vector_search: test: migrate vector_index_with_additional_filtering_column to Python
  vector_search: test: migrate cql_error_contains_http_error_description to Python
  vector_search: test: migrate pk in restriction test to Python
This commit is contained in:
Nadav Har'El
2026-05-10 22:09:17 +03:00
2 changed files with 247 additions and 176 deletions

View File

@@ -0,0 +1,247 @@
# Copyright 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
###############################################################################
# Tests for vector search (SELECT with ANN ordering).
#
# These tests use a mock vector store HTTP server to verify that Scylla
# correctly translates CQL queries with ANN ordering into HTTP requests
# for the vector store service and returns the expected results.
###############################################################################
from collections.abc import Callable
from dataclasses import dataclass
from http import HTTPStatus
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import threading
import pytest
from cassandra.protocol import InvalidRequest
from cassandra.query import SimpleStatement
from test.pylib.skip_types import skip_env
from .util import config_value_context, local_process_id, new_test_table, unique_name, is_scylla
@dataclass
class Request:
path: str
body: str
@dataclass
class Response:
status: int = 200
body: str = '{"primary_keys":{"pk1":[],"pk2":[],"ck1":[],"ck2":[]},"distances":[]}'
class VectorStoreMock:
def __init__(self):
self._ann_requests: list[Request] = []
self._lock = threading.Lock()
self._next_ann_response = Response()
self._server: HTTPServer | None = None
self._thread: threading.Thread | None = None
@property
def port(self) -> int:
return self._server.server_address[1] if self._server else 0
@property
def ann_requests(self) -> list[Request]:
with self._lock:
return self._ann_requests.copy()
def set_next_ann_response(self, status: int, body: str) -> None:
with self._lock:
self._next_ann_response = Response(status=status, body=body)
def _handle_ann(self, request: Request, send_response: Callable[[Response], None]) -> None:
with self._lock:
self._ann_requests.append(request)
response = self._next_ann_response
send_response(response)
def start(self, host: str):
mock = self
class Handler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length).decode()
mock._handle_ann(
Request(path=self.path, body=body), self._send_response)
def _send_response(self, response: Response):
payload = response.body.encode()
self.send_response(response.status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
self._server = HTTPServer((host, 0), Handler)
self._thread = threading.Thread(target=self._server.serve_forever)
self._thread.daemon = True
self._thread.start()
def stop(self):
if self._server:
self._server.shutdown()
self._server.server_close()
if self._thread:
self._thread.join()
@pytest.fixture
def vector_store_mock(cql):
mock = VectorStoreMock()
if not is_scylla(cql):
# Yield a mock without starting the HTTP server so tests can run
# on Cassandra (where the vector store service is not needed).
yield mock
return
if not local_process_id(cql):
skip_env("Vector store mock requires a local Scylla process")
host = cql.hosts[0].endpoint.address
mock.start(host)
try:
with config_value_context(cql, "vector_store_primary_uri", f"http://{host}:{mock.port}"):
yield mock
finally:
mock.stop()
# Verify that partition key IN restriction is forwarded to the vector store.
def test_vector_search_ann_with_partition_key_in_restriction(cql, test_keyspace, vector_store_mock, skip_without_tablets):
schema = "pk1 tinyint, pk2 tinyint, ck1 tinyint, ck2 tinyint, embedding vector<float, 3>, PRIMARY KEY ((pk1, pk2), ck1, ck2)"
with new_test_table(cql, test_keyspace, schema) as table:
index_name = unique_name()
cql.execute(
f"CREATE CUSTOM INDEX {index_name} ON {table}(embedding) USING 'vector_index'")
cql.execute(
f"INSERT INTO {table} (pk1, pk2, ck1, ck2, embedding) VALUES (5, 7, 9, 2, [0.1, 0.2, 0.3])")
vector_store_mock.set_next_ann_response(200, json.dumps({"primary_keys": {
"pk1": [5], "pk2": [7], "ck1": [9], "ck2": [2]}, "distances": [0.1]}))
result = cql.execute(
f"SELECT pk1, pk2, ck1, ck2 FROM {table} WHERE pk1 IN (5, 6) ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 2")
# Assert CQL SELECT results are returned according to the vector store mock response.
assert list(result) == [(5, 7, 9, 2)]
# Assert Scylla sent the expected ANN request to the vector store mock.
requests = vector_store_mock.ann_requests
assert len(requests) == 1
assert requests[0].path == f"/api/v1/indexes/{test_keyspace}/{index_name}/ann"
assert json.loads(requests[0].body) == {
"vector": [0.1, 0.2, 0.3],
"limit": 2,
"filter": {
"restrictions": [{"type": "IN", "lhs": "pk1", "rhs": [5, 6]}],
"allow_filtering": False,
},
}
# Verify that HTTP error responses from the vector store are propagated through CQL InvalidRequest.
def test_vector_search_cql_error_contains_http_error_description(cql, test_keyspace, vector_store_mock, skip_without_tablets):
schema = "pk1 tinyint, pk2 tinyint, ck1 tinyint, ck2 tinyint, embedding vector<float, 3>, PRIMARY KEY ((pk1, pk2), ck1, ck2)"
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(
f"CREATE CUSTOM INDEX ON {table}(embedding) USING 'vector_index'")
vector_store_mock.set_next_ann_response(HTTPStatus.NOT_FOUND, "index does not exist")
with pytest.raises(InvalidRequest, match="404.*index does not exist"):
cql.execute(
f"SELECT * FROM {table} ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5")
# Create a vector index with an additional filtering column.
# Because the local secondary index logic was used to determine the index target column,
# the implementation wrongly selects last column as the target(vectors) column, leading to
# an exception on the SELECT query:
# ANN ordering by vector requires the column to be indexed using 'vector_index'.
# Reproduces SCYLLADB-635.
def test_vector_search_vector_index_with_additional_filtering_column(cql, test_keyspace, vector_store_mock, skip_without_tablets):
schema = "pk1 tinyint, pk2 tinyint, ck1 tinyint, ck2 tinyint, embedding vector<float, 3>, PRIMARY KEY ((pk1, pk2), ck1, ck2)"
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(
f"CREATE CUSTOM INDEX ON {table}(embedding, ck1) USING 'vector_index'")
cql.execute(
f"SELECT * FROM {table} ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5")
def test_vector_search_local_vector_index_create_and_query_do_not_fail(cql, test_keyspace, vector_store_mock, skip_without_tablets):
schema = "pk1 tinyint, pk2 tinyint, ck1 tinyint, ck2 tinyint, embedding vector<float, 3>, PRIMARY KEY ((pk1, pk2), ck1, ck2)"
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(
f"CREATE CUSTOM INDEX ON {table}((pk1, pk2), embedding) USING 'vector_index'")
cql.execute(
f"SELECT * FROM {table} WHERE pk1 = 1 AND pk2 = 2 ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5")
# Verify that a paging warning is emitted when page size is smaller than LIMIT.
def test_vector_search_paging_warning_when_page_size_smaller_than_limit(cql, test_keyspace, vector_store_mock, skip_without_tablets):
schema = "pk1 tinyint, pk2 tinyint, ck1 tinyint, ck2 tinyint, embedding vector<float, 3>, PRIMARY KEY ((pk1, pk2), ck1, ck2)"
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(
f"CREATE CUSTOM INDEX ON {table}(embedding) USING 'vector_index'")
result = cql.execute(SimpleStatement(
f"SELECT * FROM {table} ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 100", fetch_size=5))
warnings = result.response_future.warnings
assert warnings
assert len(warnings) == 1
assert "Paging is not supported for Vector Search queries. The entire result set has been returned." == warnings[0]
# Verify no paging warning is emitted when paging is disabled (fetch_size=0).
def test_vector_search_no_paging_warning_when_paging_disabled(cql, test_keyspace, vector_store_mock, skip_without_tablets):
schema = "pk1 tinyint, pk2 tinyint, ck1 tinyint, ck2 tinyint, embedding vector<float, 3>, PRIMARY KEY ((pk1, pk2), ck1, ck2)"
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(
f"CREATE CUSTOM INDEX ON {table}(embedding) USING 'vector_index'")
result = cql.execute(SimpleStatement(
f"SELECT * FROM {table} ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 100", fetch_size=0))
assert not result.response_future.warnings
# Verify no paging warning is emitted when LIMIT is less than page size.
def test_vector_search_no_paging_warning_when_limit_less_than_page_size(cql, test_keyspace, vector_store_mock, skip_without_tablets):
schema = "pk1 tinyint, pk2 tinyint, ck1 tinyint, ck2 tinyint, embedding vector<float, 3>, PRIMARY KEY ((pk1, pk2), ck1, ck2)"
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(
f"CREATE CUSTOM INDEX ON {table}(embedding) USING 'vector_index'")
result = cql.execute(SimpleStatement(
f"SELECT * FROM {table} ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5", fetch_size=100))
assert not result.response_future.warnings

View File

@@ -83,25 +83,6 @@ timeout_config make_query_timeout(std::chrono::seconds timeout) {
return cfg;
}
future<> do_with_vector_store_mock(std::function<future<>(cql_test_env&, vs_mock_server&)> func) {
auto server = co_await make_vs_mock_server();
auto cfg = make_config();
cfg.db_config->vector_store_primary_uri.set(format("http://server.node:{}", server->port()));
co_await do_with_cql_env(
[&](cql_test_env& env) -> future<> {
co_await create_test_table(env, "ks", "test");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{"server.node", std::vector<std::string>{server->host()}}});
vs.start_background_tasks();
co_await func(env, *server);
},
cfg)
.finally(seastar::coroutine::lambda([&] -> future<> {
co_await server->stop();
}));
}
} // namespace
BOOST_AUTO_TEST_CASE(vector_store_client_test_ctor) {
@@ -474,44 +455,6 @@ SEASTAR_TEST_CASE(vector_store_client_test_filtering_ann_request) {
});
}
SEASTAR_TEST_CASE(vector_store_client_test_filtering_ann_cql) {
// Similar to `vector_store_client_test_filtering_ann_request`,
// but uses CQL query to verify that the WHERE clause expression (this time with IN operator) is handled correctly.
using namespace test::vector_search;
auto server = co_await make_vs_mock_server();
auto cfg = make_config();
cfg.db_config->vector_store_primary_uri.set(format("http://good.authority.here:{}", server->port()));
co_await do_with_cql_env(
[&server](cql_test_env& env) -> future<> {
auto schema = co_await create_test_table(env, "ks", "idx");
// Create the vector index and insert test data
co_await env.execute_cql("CREATE CUSTOM INDEX embedding_idx ON ks.idx (embedding) USING 'vector_index'");
co_await env.execute_cql("INSERT INTO ks.idx (pk1, pk2, ck1, ck2, embedding) VALUES (5, 7, 9, 2, [0.1, 0.2, 0.3])");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{"good.authority.here", "127.0.0.1"}});
vs.start_background_tasks();
// Mock response - service should return keys matching the WHERE filter
server->next_ann_response({http::reply::status_type::ok, R"({"primary_keys":{"pk1":[5],"pk2":[7],"ck1":[9],"ck2":[2]},"distances":[0.1]})"});
// Execute CQL query with WHERE clause filter
auto msg = co_await env.execute_cql("SELECT pk1, pk2, ck1, ck2 FROM ks.idx WHERE pk1 IN (5, 6) ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 2");
// Process results - expect 1 row with values [5, 7, 9, 2]
assert_that(msg).is_rows().with_rows({{
{byte_type->decompose(int8_t(5))},
{byte_type->decompose(int8_t(7))},
{byte_type->decompose(int8_t(9))},
{byte_type->decompose(int8_t(2))},
}});
},
cfg)
.finally([&server] {
return server->stop();
});
}
SEASTAR_TEST_CASE(vector_store_client_uri_update_to_empty) {
auto cfg = config();
auto count = 0;
@@ -765,82 +708,6 @@ SEASTAR_TEST_CASE(vector_search_metrics_test) {
cfg);
}
SEASTAR_TEST_CASE(vector_store_client_test_paging_warning) {
auto s1 = co_await make_vs_mock_server();
auto cfg = make_config();
cfg.db_config->vector_store_primary_uri.set(format("http://s1.node:{}", s1->port()));
co_await do_with_cql_env(
[&s1](cql_test_env& env) -> future<> {
auto schema = co_await create_test_table(env, "ks", "test");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{"s1.node", std::vector<std::string>{s1->host()}}});
vs.start_background_tasks();
auto result = co_await env.execute_cql("CREATE CUSTOM INDEX idx ON ks.test (embedding) USING 'vector_index'");
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()});
auto msg = co_await env.execute_cql("SELECT * FROM ks.test ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 100;", std::move(qo));
auto warns = msg->warnings();
BOOST_REQUIRE_EQUAL(warns.size(), 1);
BOOST_CHECK(warns[0] == "Paging is not supported for Vector Search queries. The entire result set has been returned.");
},
cfg)
.finally([&s1] {
return s1->stop();
});
}
SEASTAR_TEST_CASE(vector_store_client_test_paging_warning_doesnt_show_when_paging_disabled) {
auto s1 = co_await make_vs_mock_server();
auto cfg = make_config();
cfg.db_config->vector_store_primary_uri.set(format("http://s1.node:{}", s1->port()));
co_await do_with_cql_env(
[&s1](cql_test_env& env) -> future<> {
auto schema = co_await create_test_table(env, "ks", "test");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{"s1.node", std::vector<std::string>{s1->host()}}});
vs.start_background_tasks();
auto result = co_await env.execute_cql("CREATE CUSTOM INDEX idx ON ks.test (embedding) USING 'vector_index'");
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{0, nullptr, {}, api::new_timestamp()});
auto msg = co_await env.execute_cql("SELECT * FROM ks.test ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 100;", std::move(qo));
auto warns = msg->warnings();
BOOST_REQUIRE_EQUAL(warns.size(), 0);
},
cfg)
.finally([&s1] {
return s1->stop();
});
}
SEASTAR_TEST_CASE(vector_store_client_test_paging_warning_doesnt_show_when_limit_less_than_page_size) {
auto s1 = co_await make_vs_mock_server();
auto cfg = make_config();
cfg.db_config->vector_store_primary_uri.set(format("http://s1.node:{}", s1->port()));
co_await do_with_cql_env(
[&s1](cql_test_env& env) -> future<> {
auto schema = co_await create_test_table(env, "ks", "test");
auto& vs = env.local_qp().vector_store_client();
configure(vs).with_dns({{"s1.node", std::vector<std::string>{s1->host()}}});
vs.start_background_tasks();
auto result = co_await env.execute_cql("CREATE CUSTOM INDEX idx ON ks.test (embedding) USING 'vector_index'");
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()});
auto msg = co_await env.execute_cql("SELECT * FROM ks.test ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5;", std::move(qo));
auto warns = msg->warnings();
BOOST_REQUIRE_EQUAL(warns.size(), 0);
},
cfg)
.finally([&s1] {
return s1->stop();
});
}
SEASTAR_TEST_CASE(vector_store_client_node_recovery_after_backoff) {
auto unavail_server = co_await make_unavailable_server();
std::unique_ptr<vs_mock_server> avail_server;
@@ -1242,46 +1109,3 @@ SEASTAR_TEST_CASE(vector_store_client_abort_due_to_query_timeout) {
co_await server->stop();
}));
}
/// Verify that the HTTP error description from the vector store is propagated
/// through the CQL interface as part of the invalid_request_exception message.
SEASTAR_TEST_CASE(vector_store_client_cql_error_contains_http_error_description) {
co_await do_with_vector_store_mock([](cql_test_env& env, vs_mock_server& server) -> future<> {
co_await env.execute_cql("CREATE CUSTOM INDEX idx ON ks.test (embedding) USING 'vector_index'");
// Configure mock to return 404 with a specific error message
server.next_ann_response({status_type::not_found, "index does not exist"});
BOOST_CHECK_EXCEPTION(co_await env.execute_cql("SELECT * FROM ks.test ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5;"),
exceptions::invalid_request_exception, [](const exceptions::invalid_request_exception& ex) {
auto msg = std::string(ex.what());
// Verify the error message contains both the HTTP status and the error description
return msg.find("404") != std::string::npos && msg.find("index does not exist") != std::string::npos;
});
});
}
// Create a vector index with an additional filtering column.
// Because the local secondary index logic was used to determine the index target column,
// the implementation wrongly selects last column as the target(vectors) column, leading to an exception
// on the SELECT query:
// ANN ordering by vector requires the column to be indexed using 'vector_index'.
// Reproduces SCYLLADB-635.
SEASTAR_TEST_CASE(vector_store_client_vector_index_with_additional_filtering_column) {
co_await do_with_vector_store_mock([](cql_test_env& env, vs_mock_server&) -> future<> {
// Create a vector index on the embedding column, including ck1 for filtered ANN search support.
co_await env.execute_cql("CREATE CUSTOM INDEX idx ON ks.test (embedding, ck1) USING 'vector_index'");
BOOST_CHECK_NO_THROW(co_await env.execute_cql("SELECT * FROM ks.test ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5;"));
});
}
SEASTAR_TEST_CASE(vector_store_client_local_vector_index) {
co_await do_with_vector_store_mock([](cql_test_env& env, vs_mock_server&) -> future<> {
// Create a local vector index on the 'embedding' column.
co_await env.execute_cql("CREATE CUSTOM INDEX idx ON ks.test ((pk1, pk2), embedding) USING 'vector_index'");
BOOST_CHECK_NO_THROW(
co_await env.execute_cql("SELECT * FROM ks.test WHERE pk1 = 1 AND pk2 = 2 ORDER BY embedding ANN OF [0.1, 0.2, 0.3] LIMIT 5;"));
});
}