Compare commits

...

9 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
e69566a36a Address code review feedback
- Fix use-after-free bug in example code
- Update copyright years to 2026
- Add logging to test simulator for better debugging
- Improve documentation comments

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:26:17 +00:00
copilot-swe-agent[bot]
3fc31bc751 Add implementation summary document
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:23:39 +00:00
copilot-swe-agent[bot]
0e439ad03a Add documentation for tablet migration RPC decoupling
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:22:52 +00:00
copilot-swe-agent[bot]
d466213b78 Allow topology_coordinator to accept custom RPC handler for testing
- Add optional tablet_rpc_handler parameter to run_topology_coordinator
- Default to messaging_tablet_rpc_handler if not provided
- Create local_tablet_rpc_simulator for use in unit tests

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:22:05 +00:00
copilot-swe-agent[bot]
1a3e8fbdc7 Refactor topology_coordinator to use tablet_migration_rpc_handler interface
- Implement messaging_tablet_rpc_handler that wraps messaging_service RPCs
- Update handle_tablet_migration() to use the RPC handler interface
- This decouples the coordinator logic from actual RPC sending

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:20:23 +00:00
copilot-swe-agent[bot]
f9c811e88b Add tablet_migration_rpc_handler interface for decoupling RPCs
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:18:19 +00:00
copilot-swe-agent[bot]
601d73ea00 Initial plan 2026-01-30 12:12:55 +00:00
Pawel Pery
f49c9e896a vector_search: allow full secondary indexes syntax while creating the vector index
Vector Search feature needs to support creating vector indexes with additional
filtering column. There will be two types of indexes: global which indexes
vectors per table, and local which indexes vectors per partition key. The new
syntaxes are based on ScyllaDB's Global Secondary Index and Local Secondary
Index. Vector indexes don't use secondary indexes functionalities in any way -
all indexing, filtering and processing data will be done on Vector Store side.

This patch allows creating vector indexes using this CQL syntax:

```
CREATE TABLE IF NOT EXISTS cycling.comments_vs (
  commenter text,
  comment text,
  comment_vector VECTOR <FLOAT, 5>,
  created_at timestamp,
  discussion_board_id int,
  country text,
  lang text,
  PRIMARY KEY ((commenter, discussion_board_id), created_at)
);

CREATE CUSTOM INDEX IF NOT EXISTS global_ann_index
  ON cycling.comments_vs(comment_vector, country, lang) USING 'vector_index'
  WITH OPTIONS = { 'similarity_function': 'DOT_PRODUCT' };

CREATE CUSTOM INDEX IF NOT EXISTS local_ann_index
  ON cycling.comments_vs((commenter, discussion_board_id), comment_vector, country, lang)
  USING 'vector_index'
  WITH OPTIONS = { 'similarity_function': 'DOT_PRODUCT' };
```

Currently, if we run these queries to create indexes we will receive such errors:

```
InvalidRequest: Error from server: code=2200 [Invalid query] message="Vector index can only be created on a single column"
InvalidRequest: Error from server: code=2200 [Invalid query] message="Local index definition must contain full partition key only. Redundant column: XYZ"
```

This commit refactors `vector_index::check_target` to correctly validate
columns building the index. Vector-store currently support filtering by native
types, so the type of columns is checked. The first column from the list must
be a vector (to build index based on these vectors), so it is also checked.

Allowed types for columns are native types without counter (it is not possible
to create a table with counter and vector) and without duration (it is not
possible to correctly compare durations, this type is even not allowed in
secondary indexes).

This commits adds cqlpy test to check errors while creating indexes.

Fixes: SCYLLADB-298

This needs to be backported to version 2026.1 as this is a fix for filtering support.

Closes scylladb/scylladb#28366
2026-01-30 01:14:31 +02:00
Avi Kivity
3d1558be7e test: remove xfail markers from SELECT JSON count(*) tests
These were marked xfail due to #8077 (the column name was wrong),
but it was fixed long ago for 5.4 (exact commit not known).

Remove the xfail markers to prevent regressions.

Closes scylladb/scylladb#28432
2026-01-29 21:56:00 +02:00
12 changed files with 608 additions and 36 deletions

84
IMPLEMENTATION_SUMMARY.md Normal file
View File

@@ -0,0 +1,84 @@
# Tablet Migration RPC Decoupling - Implementation Summary
## What Was Changed
### Problem Statement
The topology_coordinator's `handle_tablet_migration()` function was tightly coupled with RPC sending, making it difficult to test in unit tests without setting up a full multi-node cluster.
### Solution
Introduced an abstraction layer (`tablet_migration_rpc_handler`) that decouples the coordinator logic from actual RPC operations.
## Files Added
### 1. `service/tablet_migration_rpc_handler.hh`
Interface defining the RPC operations needed for tablet migration:
- `tablet_repair()` - Repair a tablet on a replica
- `tablet_stream_data()` - Stream tablet data to a replica
- `tablet_cleanup()` - Clean up tablet data on a replica
- `repair_update_compaction_ctrl()` - Update compaction controller after repair
### 2. `test/lib/local_tablet_rpc_simulator.hh`
Test implementation that calls local RPC handlers instead of sending network RPCs.
### 3. `docs/dev/tablet-migration-rpc-decoupling.md`
Documentation explaining the architecture and how to use it.
## Files Modified
### 1. `service/topology_coordinator.cc`
- Added `messaging_tablet_rpc_handler` class - production implementation that wraps `messaging_service` RPCs
- Updated `handle_tablet_migration()` to use `_tablet_rpc_handler` interface instead of direct RPC calls
- Updated constructor to accept optional RPC handler and initialize it
- Modified 5 locations where tablet RPCs were called:
- rebuild_repair stage → `tablet_repair()`
- streaming stage → `tablet_stream_data()`
- cleanup stage → `tablet_cleanup()` (2 locations)
- repair stage → `tablet_repair()`
- end_repair stage → `repair_update_compaction_ctrl()`
### 2. `service/topology_coordinator.hh`
- Added forward declaration for `tablet_migration_rpc_handler`
- Updated `run_topology_coordinator()` signature to accept optional `tablet_rpc_handler` parameter
## How It Works
### Production Mode (Default)
```cpp
run_topology_coordinator(...);
// Automatically uses messaging_tablet_rpc_handler
```
The topology coordinator creates a `messaging_tablet_rpc_handler` that sends actual network RPCs.
### Test Mode
```cpp
run_topology_coordinator(..., std::make_unique<local_tablet_rpc_simulator>(storage_service));
```
Tests can provide a `local_tablet_rpc_simulator` that calls local RPC handlers directly, bypassing the network layer.
## Benefits
1. **Testability**: Can now test `handle_tablet_migration()` logic in unit tests
2. **Separation of Concerns**: Coordinator logic (group0 transitions, barriers) is separated from replica logic (RPCs)
3. **Minimal Changes**: The refactoring is minimal and surgical - only abstracts the RPC layer
4. **Backward Compatible**: Existing code continues to work without changes
## What Can Now Be Tested
With this refactoring, tests can now:
1. Verify that tablet migrations progress through the correct stages
2. Test barrier coordination between stages
3. Test rollback logic when migrations fail
4. Test the interaction between migrations and topology changes
5. Simulate various failure scenarios at the RPC level
All without needing multiple nodes or network communication.
## Future Work
This refactoring enables future enhancements:
1. More comprehensive topology coordinator unit tests
2. Fault injection at the RPC level in tests
3. Better simulation of network conditions
4. Potential extraction of more coordinator logic into testable components

View File

@@ -281,7 +281,8 @@ For example::
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
or columns provided in a definition of the index.
For example::

View File

@@ -140,17 +140,83 @@ Vector Index :label-note:`ScyllaDB Cloud`
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
similarity search on vector data.
similarity search on vector data. Vector indexes can be a global index for indexing vectors per table or a local
index for indexing vectors per partition.
The vector index is the only custom type index supported in ScyllaDB. It is created using
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. Example:
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. It is also possible to
add additional columns to the index for filtering the search results. The partition column
specified in the global vector index definition must be the vector column, and any subsequent
columns are treated as filtering columns. The local vector index requires that the partition key
of the base table is also the partition key of the index and the vector column is the first one
from the following columns.
Example of a simple index:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed to enable similarity search using
a global vector index. Additional filtering can be performed on the primary key
columns of the base table.
Example of a global vector index with additional filtering:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding, category, info)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed to enable similarity search using
a global index. Additional columns are added for filtering the search results.
The filtering is possible on ``category``, ``info`` and all primary key columns
of the base table.
Example of a local vector index:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings ((id, created_at), embedding, category, info)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed for similarity search (a local
index) and additional columns are added for filtering the search results. The
filtering is possible on ``category``, ``info`` and all primary key columns of
the base table. The columns ``id`` and ``created_at`` must be the partition key
of the base table.
Vector indexes support additional filtering columns of native data types
(excluding counter and duration). The indexed column itself must be a vector
column, while the extra columns can be used to filter search results.
The supported types are:
* ``ascii``
* ``bigint``
* ``blob``
* ``boolean``
* ``date``
* ``decimal``
* ``double``
* ``float``
* ``inet``
* ``int``
* ``smallint``
* ``text``
* ``varchar``
* ``time``
* ``timestamp``
* ``timeuuid``
* ``tinyint``
* ``uuid``
* ``varint``
The following options are supported for vector indexes. All of them are optional.
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+

View File

@@ -0,0 +1,78 @@
# Tablet Migration RPC Decoupling
## Overview
The topology coordinator's tablet migration logic has been refactored to decouple it from RPC operations. This allows the topology coordinator logic to be tested in unit tests without network communication.
## Architecture
### Components
1. **`tablet_migration_rpc_handler`** (interface): Defines the RPC operations needed for tablet migration
- `tablet_repair()`: Initiates tablet repair on a replica
- `tablet_stream_data()`: Streams tablet data to a replica
- `tablet_cleanup()`: Cleans up tablet data on a replica
- `repair_update_compaction_ctrl()`: Updates compaction controller after repair
2. **`messaging_tablet_rpc_handler`**: Production implementation that sends real network RPCs via `messaging_service`
3. **`local_tablet_rpc_simulator`**: Test implementation that calls RPC handlers locally without network communication
### How It Works
#### Production (Real RPCs)
```cpp
// In storage_service.cc
run_topology_coordinator(
sys_dist_ks, gossiper, messaging, shared_tm,
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
raft_topology_cmd_handler,
tablet_allocator, cdc_gens, ring_delay,
lifecycle_notifier, feature_service,
sl_controller, topology_cmd_rpc_tracker
// tablet_rpc_handler defaults to nullptr, so messaging_tablet_rpc_handler is used
);
```
#### Testing (Local Simulation)
```cpp
// In test code
run_topology_coordinator(
sys_dist_ks, gossiper, messaging, shared_tm,
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
raft_topology_cmd_handler,
tablet_allocator, cdc_gens, ring_delay,
lifecycle_notifier, feature_service,
sl_controller, topology_cmd_rpc_tracker,
std::make_unique<local_tablet_rpc_simulator>(storage_service) // Create unique_ptr directly
);
```
## Benefits
1. **Unit Testability**: The topology coordinator's `handle_tablet_migration()` logic can now be tested in unit tests without setting up a multi-node cluster
2. **Separation of Concerns**: Coordinator-side logic (group0 state transitions, barriers) is clearly separated from replica-side logic (RPC operations)
3. **Flexibility**: Different RPC implementations can be provided for different testing scenarios
## Migration Stages
The tablet migration process goes through multiple stages. Each stage has:
- **Coordinator-side logic**: What group0 changes to make, whether barriers are needed, whether rollback is needed
- **Replica-side logic**: RPC operations to perform on tablet replicas
Example stages:
- `allow_write_both_read_old` → coordinator initiates transition
- `write_both_read_old` → coordinator waits for barrier
- `streaming` → coordinator calls `tablet_stream_data()` RPC
- `write_both_read_new` → coordinator waits for barrier
- `use_new` → coordinator transitions to final state
- `cleanup` → coordinator calls `tablet_cleanup()` RPC
- `end_migration` → coordinator removes transition state
## Future Enhancements
The current refactoring enables future enhancements such as:
1. More comprehensive testing of topology coordinator logic
2. Fault injection at the RPC level in tests
3. Better simulation of network conditions in tests
4. Cleaner separation of concerns in the codebase

View File

@@ -17,11 +17,11 @@
#include "index/secondary_index.hh"
#include "index/secondary_index_manager.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "utils/managed_string.hh"
#include <seastar/core/sstring.hh>
#include <boost/algorithm/string.hpp>
namespace secondary_index {
static void validate_positive_option(int max, const sstring& value_name, const sstring& value) {
@@ -147,17 +147,88 @@ std::optional<cql3::description> vector_index::describe(const index_metadata& im
}
void vector_index::check_target(const schema& schema, const std::vector<::shared_ptr<cql3::statements::index_target>>& targets) const {
if (targets.size() != 1) {
throw exceptions::invalid_request_exception("Vector index can only be created on a single column");
}
auto target = targets[0];
auto c_def = schema.get_column_definition(to_bytes(target->column_name()));
if (!c_def) {
throw exceptions::invalid_request_exception(format("Column {} not found in schema", target->column_name()));
}
auto type = c_def->type;
if (!type->is_vector() || static_cast<const vector_type_impl*>(type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception(format("Vector indexes are only supported on columns of vectors of floats", target->column_name()));
struct validate_visitor {
const class schema& schema;
bool& is_vector;
/// Vector indexes support filtering on native types that can be used as primary key columns.
/// There is no counter (it cannot be used with vector columns)
/// and no duration (it cannot be used as a primary key or in secondary indexes).
static bool is_supported_filtering_column(abstract_type const & kind_type) {
switch (kind_type.get_kind()) {
case abstract_type::kind::ascii:
case abstract_type::kind::boolean:
case abstract_type::kind::byte:
case abstract_type::kind::bytes:
case abstract_type::kind::date:
case abstract_type::kind::decimal:
case abstract_type::kind::double_kind:
case abstract_type::kind::float_kind:
case abstract_type::kind::inet:
case abstract_type::kind::int32:
case abstract_type::kind::long_kind:
case abstract_type::kind::short_kind:
case abstract_type::kind::simple_date:
case abstract_type::kind::time:
case abstract_type::kind::timestamp:
case abstract_type::kind::timeuuid:
case abstract_type::kind::utf8:
case abstract_type::kind::uuid:
case abstract_type::kind::varint:
return true;
default:
break;
}
return false;
}
void validate(cql3::column_identifier const& column, bool is_vector) const {
auto const& c_name = column.to_string();
auto const* c_def = schema.get_column_definition(column.name());
if (c_def == nullptr) {
throw exceptions::invalid_request_exception(format("Column {} not found in schema", c_name));
}
auto type = c_def->type;
if (is_vector) {
auto const* vector_type = dynamic_cast<const vector_type_impl*>(type.get());
if (vector_type == nullptr) {
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
}
auto elements_type = vector_type->get_elements_type();
if (elements_type->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
}
return;
}
if (!is_supported_filtering_column(*type)) {
throw exceptions::invalid_request_exception(format("Unsupported vector index filtering column {} type", c_name));
}
}
void operator()(const std::vector<::shared_ptr<cql3::column_identifier>>& columns) const {
for (const auto& column : columns) {
// CQL restricts the secondary local index to have multiple columns with partition key only.
// Vectors shouldn't be partition key columns and they aren't supported as a filtering column,
// so we can assume here that these are non-vectors filtering columns.
validate(*column, false);
}
}
void operator()(const ::shared_ptr<cql3::column_identifier>& column) {
validate(*column, is_vector);
// The first column is the vector column, the rest mustn't be vectors.
is_vector = false;
}
};
bool is_vector = true;
for (const auto& target : targets) {
std::visit(validate_visitor{.schema = schema, .is_vector = is_vector}, target->value);
}
}

View File

@@ -0,0 +1,73 @@
// Copyright (C) 2026-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#pragma once
#include <seastar/core/future.hh>
#include "locator/tablets.hh"
#include "service/tablet_operation.hh"
#include "service/session.hh"
#include "raft/raft.hh"
namespace service {
/// Interface for handling tablet migration RPC operations.
/// This abstraction decouples the topology coordinator from actual RPC sending,
/// allowing unit tests to provide a local implementation that simulates RPC
/// behavior without network communication.
///
/// Each method corresponds to an RPC verb that the topology coordinator sends
/// to tablet replicas during migration stages.
class tablet_migration_rpc_handler {
public:
virtual ~tablet_migration_rpc_handler() = default;
/// Initiates tablet repair on a replica.
/// Called during the rebuild_repair or repair stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to repair
/// @param sid session ID for the repair operation
/// @return repair result containing the repair timestamp
virtual seastar::future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) = 0;
/// Streams tablet data to a replica.
/// Called during the streaming stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to stream
virtual seastar::future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) = 0;
/// Cleans up tablet data on a replica.
/// Called during the cleanup or cleanup_target stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to clean up
virtual seastar::future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) = 0;
/// Updates compaction controller after repair.
/// Called during the end_repair stage for each tablet replica.
///
/// @param dst destination host
/// @param tablet tablet to update
/// @param sid session ID from the repair operation
virtual seastar::future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) = 0;
};
}

View File

@@ -73,6 +73,7 @@
#include "idl/repair.dist.hh"
#include "service/topology_coordinator.hh"
#include "service/tablet_migration_rpc_handler.hh"
#include <boost/range/join.hpp>
#include <seastar/core/metrics_registration.hh>
@@ -117,6 +118,51 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm
// it's versioned_value::value(), not std::optional::value() - it does not throw
return it->second.value();
}
/// Real implementation of tablet_migration_rpc_handler that sends RPCs via messaging_service.
/// Used by the production topology coordinator to perform actual network communication.
class messaging_tablet_rpc_handler : public tablet_migration_rpc_handler {
netw::messaging_service& _messaging;
abort_source& _as;
public:
messaging_tablet_rpc_handler(netw::messaging_service& messaging, abort_source& as)
: _messaging(messaging), _as(as) {}
future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_repair(
&_messaging, dst, _as, dst_id, tablet, sid);
}
future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_stream_data(
&_messaging, dst, _as, dst_id, tablet);
}
future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_cleanup(
&_messaging, dst, _as, dst_id, tablet);
}
future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) override {
co_return co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(
&_messaging, dst, tablet, sid);
}
};
} // namespace
class topology_coordinator : public endpoint_lifecycle_subscriber
@@ -144,6 +190,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
tablet_allocator& _tablet_allocator;
std::unique_ptr<db::view::view_building_coordinator> _vb_coordinator;
std::unique_ptr<tablet_migration_rpc_handler> _tablet_rpc_handler;
cdc::generation_service& _cdc_gens;
@@ -1603,8 +1650,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id).discard_result();
return _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id).discard_result();
});
});
})) {
@@ -1676,8 +1722,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
auto dst = trinfo.pending_replica->host;
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid);
return _tablet_rpc_handler->tablet_stream_data(dst, raft::server_id(dst.uuid()), gid);
});
});
})) {
@@ -1746,8 +1791,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
});
}).then([] {
return utils::get_local_injector().inject("wait_after_tablet_cleanup", [] (auto& handler) -> future<> {
@@ -1775,8 +1819,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
@@ -1868,8 +1911,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id);
auto res = co_await _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id);
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
@@ -1935,7 +1977,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
tablet_state.repair_update_compaction_ctrl_retried++;
}
bool feature = _feature_service.tablet_incremental_repair;
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this, ms = &_messaging,
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this,
gid = gid, sid = tablet_state.session_id, feature, &tmap] () -> future<> {
if (feature) {
if (utils::get_local_injector().enter("fail_rpc_repair_update_compaction_ctrl")) {
@@ -1944,9 +1986,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
throw std::runtime_error(msg);
}
auto& replicas = tmap.get_tablet_info(gid.tablet).replicas;
co_await coroutine::parallel_for_each(replicas, [this, ms, gid, sid] (locator::tablet_replica r) -> future<> {
co_await coroutine::parallel_for_each(replicas, [this, gid, sid] (locator::tablet_replica r) -> future<> {
if (!is_excluded(raft::server_id(r.host.uuid()))) {
co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(ms, r.host, gid, sid);
co_await _tablet_rpc_handler->repair_update_compaction_ctrl(r.host, gid, sid);
}
});
}
@@ -3683,7 +3725,8 @@ public:
gms::feature_service& feature_service,
endpoint_lifecycle_notifier& lifecycle_notifier,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler)
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
, _tablet_load_stats_refresh_interval_in_seconds(db.get_config().tablet_load_stats_refresh_interval_in_seconds)
@@ -3694,6 +3737,7 @@ public:
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
, _tablet_allocator(tablet_allocator)
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
, _tablet_rpc_handler(tablet_rpc_handler ? std::move(tablet_rpc_handler) : std::make_unique<messaging_tablet_rpc_handler>(_messaging, _as))
, _cdc_gens(cdc_gens)
, _tablet_load_stats_refresh([this] {
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
@@ -4366,7 +4410,8 @@ future<> run_topology_coordinator(
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler) {
topology_coordinator coordinator{
sys_dist_ks, gossiper, messaging, shared_tm,
@@ -4377,7 +4422,8 @@ future<> run_topology_coordinator(
ring_delay,
feature_service, lifecycle_notifier,
sl_controller,
topology_cmd_rpc_tracker};
topology_cmd_rpc_tracker,
std::move(tablet_rpc_handler)};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);

View File

@@ -58,6 +58,7 @@ namespace service {
class raft_group0;
class tablet_allocator;
class tablet_migration_rpc_handler;
extern logging::logger rtlogger;
@@ -100,7 +101,8 @@ future<> run_topology_coordinator(
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker);
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler = nullptr);
class tablet_ops_metrics {
private:

View File

@@ -933,7 +933,6 @@ def testToJsonFct(cql, test_keyspace):
assert_rows(execute(cql, table, "SELECT k, toJson(durationval) FROM %s WHERE k = ?", 0), [0, "\"1y1mo2d10h5m\""])
# Reproduces issue #8077
@pytest.mark.xfail(reason="issues #8077")
def testJsonWithGroupBy(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int, c int, v int, PRIMARY KEY (k, c))") as table:
# tests SELECT JSON statements
@@ -954,7 +953,6 @@ def testJsonWithGroupBy(cql, test_keyspace):
["{\"count\": 1}"])
# Reproduces issues #8077, #8078
@pytest.mark.xfail(reason="issues #8077")
def testSelectJsonSyntax(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int primary key, v int)") as table:
# tests SELECT JSON statements

View File

@@ -520,7 +520,6 @@ def test_tojson_decimal_high_mantissa2(cql, table1):
# Reproducers for issue #8077: SELECT JSON on a function call should result
# in the same JSON strings as it does on Cassandra.
@pytest.mark.xfail(reason="issue #8077")
def test_select_json_function_call(cql, table1):
p = unique_key_int()
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, 17) USING TIMESTAMP 1234")

View File

@@ -10,6 +10,41 @@ import pytest
from .util import new_test_table, is_scylla, unique_name
from cassandra.protocol import InvalidRequest, ConfigurationException
supported_filtering_types = [
'ascii',
'bigint',
'blob',
'boolean',
'date',
'decimal',
'double',
'float',
'inet',
'int',
'smallint',
'text',
'varchar',
'time',
'timestamp',
'timeuuid',
'tinyint',
'uuid',
'varint',
]
unsupported_filtering_types = [
'duration',
'map<int, int>',
'list<int>',
'set<int>',
'tuple<int, int>',
'vector<float, 3>',
'frozen<map<int, int>>',
'frozen<list<int>>',
'frozen<set<int>>',
'frozen<tuple<int, int>>',
]
def test_create_vector_search_index(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p int primary key, v vector<float, 3>'
with new_test_table(cql, test_keyspace, schema) as table:
@@ -45,6 +80,57 @@ def test_create_vector_search_index_on_nonvector_column(cql, test_keyspace, scyl
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v) USING 'vector_index'")
def test_create_vector_search_global_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v, f1, f2) USING 'vector_index'")
def test_create_vector_search_local_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
def test_create_vector_search_local_index_with_filtering_columns_on_nonvector_column(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v int, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
def test_create_vector_search_index_with_supported_and_unsupported_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
supported_columns = ', '.join([f's{idx} {typ}' for idx, typ in enumerate(supported_filtering_types)])
unsupported_columns = ', '.join([f'u{idx} {typ}' for idx, typ in enumerate(unsupported_filtering_types)])
schema = f'p int, c int, v vector<float, 3>, {supported_columns}, {unsupported_columns}, primary key (p, c)'
with new_test_table(cql, test_keyspace, schema) as table:
for idx in range(len(supported_filtering_types)):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, s{idx}) USING 'vector_index'")
cql.execute(f"DROP INDEX {test_keyspace}.global_idx")
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, s{idx}) USING 'vector_index'")
cql.execute(f"DROP INDEX {test_keyspace}.local_idx")
for idx in range(len(unsupported_filtering_types)):
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, u{idx}) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, u{idx}) USING 'vector_index'")
def test_create_vector_search_local_index_with_unsupported_partition_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
for filter_type in unsupported_filtering_types:
schema = f'p {filter_type}, c int, v vector<float, 3>, f int, primary key (p, c)'
with pytest.raises(InvalidRequest, match="Unsupported|Invalid"):
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p), v, f) USING 'vector_index'")
def test_create_vector_search_index_with_duplicated_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = f'p int, c int, v vector<float, 3>, x int, primary key (p, c)'
with new_test_table(cql, test_keyspace, schema) as table:
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, p) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, x, x) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, p) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, x, x) USING 'vector_index'")
def test_create_vector_search_index_with_bad_options(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p int primary key, v vector<float, 3>'
with new_test_table(cql, test_keyspace, schema) as table:

View File

@@ -0,0 +1,68 @@
// Copyright (C) 2026-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#pragma once
#include "service/tablet_migration_rpc_handler.hh"
#include "service/storage_service.hh"
#include "utils/log.hh"
namespace service {
extern logging::logger rtlogger;
/// Test implementation of tablet_migration_rpc_handler that calls local RPC handlers
/// instead of sending network RPCs. This allows testing the topology coordinator logic
/// in unit tests without network communication.
///
/// Usage in tests:
/// auto handler = std::make_unique<local_tablet_rpc_simulator>(storage_service);
/// // Pass handler to run_topology_coordinator
class local_tablet_rpc_simulator : public tablet_migration_rpc_handler {
storage_service& _storage_service;
public:
explicit local_tablet_rpc_simulator(storage_service& ss)
: _storage_service(ss) {}
seastar::future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) override {
rtlogger.debug("local_tablet_rpc_simulator: repair tablet {} on {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.repair_tablet(tablet, sid);
}
seastar::future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
rtlogger.debug("local_tablet_rpc_simulator: stream tablet {} to {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.stream_tablet(tablet);
}
seastar::future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
rtlogger.debug("local_tablet_rpc_simulator: cleanup tablet {} on {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.cleanup_tablet(tablet);
}
seastar::future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) override {
rtlogger.debug("local_tablet_rpc_simulator: repair_update_compaction_ctrl for tablet {} on {}", tablet, dst);
// For the test simulator, we don't need to actually update compaction controller
// as there's no real compaction happening in the tests.
// Just log and return success.
co_return;
}
};
}