Compare commits

..

2 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
9e806cb3f7 Fix critical bugs and issues found in alternator code review
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-01-29 22:54:57 +00:00
copilot-swe-agent[bot]
f267af38bd Initial plan 2026-01-29 22:49:31 +00:00
16 changed files with 54 additions and 619 deletions

View File

@@ -1,84 +0,0 @@
# Tablet Migration RPC Decoupling - Implementation Summary
## What Was Changed
### Problem Statement
The topology_coordinator's `handle_tablet_migration()` function was tightly coupled with RPC sending, making it difficult to test in unit tests without setting up a full multi-node cluster.
### Solution
Introduced an abstraction layer (`tablet_migration_rpc_handler`) that decouples the coordinator logic from actual RPC operations.
## Files Added
### 1. `service/tablet_migration_rpc_handler.hh`
Interface defining the RPC operations needed for tablet migration:
- `tablet_repair()` - Repair a tablet on a replica
- `tablet_stream_data()` - Stream tablet data to a replica
- `tablet_cleanup()` - Clean up tablet data on a replica
- `repair_update_compaction_ctrl()` - Update compaction controller after repair
### 2. `test/lib/local_tablet_rpc_simulator.hh`
Test implementation that calls local RPC handlers instead of sending network RPCs.
### 3. `docs/dev/tablet-migration-rpc-decoupling.md`
Documentation explaining the architecture and how to use it.
## Files Modified
### 1. `service/topology_coordinator.cc`
- Added `messaging_tablet_rpc_handler` class - production implementation that wraps `messaging_service` RPCs
- Updated `handle_tablet_migration()` to use `_tablet_rpc_handler` interface instead of direct RPC calls
- Updated constructor to accept optional RPC handler and initialize it
- Modified 5 locations where tablet RPCs were called:
- rebuild_repair stage → `tablet_repair()`
- streaming stage → `tablet_stream_data()`
- cleanup stage → `tablet_cleanup()` (2 locations)
- repair stage → `tablet_repair()`
- end_repair stage → `repair_update_compaction_ctrl()`
### 2. `service/topology_coordinator.hh`
- Added forward declaration for `tablet_migration_rpc_handler`
- Updated `run_topology_coordinator()` signature to accept optional `tablet_rpc_handler` parameter
## How It Works
### Production Mode (Default)
```cpp
run_topology_coordinator(...);
// Automatically uses messaging_tablet_rpc_handler
```
The topology coordinator creates a `messaging_tablet_rpc_handler` that sends actual network RPCs.
### Test Mode
```cpp
run_topology_coordinator(..., std::make_unique<local_tablet_rpc_simulator>(storage_service));
```
Tests can provide a `local_tablet_rpc_simulator` that calls local RPC handlers directly, bypassing the network layer.
## Benefits
1. **Testability**: Can now test `handle_tablet_migration()` logic in unit tests
2. **Separation of Concerns**: Coordinator logic (group0 transitions, barriers) is separated from replica logic (RPCs)
3. **Minimal Changes**: The refactoring is minimal and surgical - only abstracts the RPC layer
4. **Backward Compatible**: Existing code continues to work without changes
## What Can Now Be Tested
With this refactoring, tests can now:
1. Verify that tablet migrations progress through the correct stages
2. Test barrier coordination between stages
3. Test rollback logic when migrations fail
4. Test the interaction between migrations and topology changes
5. Simulate various failure scenarios at the RPC level
All without needing multiple nodes or network communication.
## Future Work
This refactoring enables future enhancements:
1. More comprehensive topology coordinator unit tests
2. Fault injection at the RPC level in tests
3. Better simulation of network conditions
4. Potential extraction of more coordinator logic into testable components

View File

@@ -244,7 +244,10 @@ static bool is_set_of(const rjson::value& type1, const rjson::value& type2) {
// Check if two JSON-encoded values match with the CONTAINS relation
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) {
if (!v1) {
if (!v1 || !v1->IsObject() || v1->MemberCount() == 0) {
return false;
}
if (!v2.IsObject() || v2.MemberCount() == 0) {
return false;
}
const auto& kv1 = *v1->MemberBegin();

View File

@@ -45,7 +45,7 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
}
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
if (_should_add_to_reponse) {
if (_should_add_to_response) {
auto consumption = rjson::empty_object();
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
rjson::add(response, "ConsumedCapacity", std::move(consumption));
@@ -53,7 +53,9 @@ void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjso
}
static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) {
uint64_t half_units = (total_bytes + unit_block_size -1) / unit_block_size; //divide by unit_block_size and round up
// Avoid potential integer overflow when total_bytes is close to UINT64_MAX
// by using division with modulo instead of addition before division
uint64_t half_units = total_bytes / unit_block_size + (total_bytes % unit_block_size != 0 ? 1 : 0);
if (is_quorum) {
half_units *= 2;

View File

@@ -28,9 +28,9 @@ namespace alternator {
class consumed_capacity_counter {
public:
consumed_capacity_counter() = default;
consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){}
bool operator()() const noexcept {
return _should_add_to_reponse;
return _should_add_to_response;
}
consumed_capacity_counter& operator +=(uint64_t bytes);
@@ -44,7 +44,7 @@ public:
uint64_t _total_bytes = 0;
static bool should_add_capacity(const rjson::value& request);
protected:
bool _should_add_to_reponse = false;
bool _should_add_to_response = false;
};
class rcu_consumed_capacity_counter : public consumed_capacity_counter {

View File

@@ -834,11 +834,13 @@ future<> executor::fill_table_size(rjson::value &table_description, schema_ptr s
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
// Note: we don't care when the notification of other shards will finish, as long as it will be done
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
// which is also fine, as the specification doesn't give precision guarantees of any kind.
// A race condition is possible: if a DescribeTable request arrives on a different shard before
// that shard receives the cached size, it will recalculate independently. This is acceptable because:
// 1. Both calculations will cache their results with an expiry time
// 2. Expiry times are unlikely to be identical, so eventually all shards converge to the most recent value
// 3. Even if expiry times match, different shards may briefly return different table sizes
// 4. This temporary inconsistency is acceptable per DynamoDB specification, which doesn't guarantee
// exact precision for DescribeTable size information
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
}
}

View File

@@ -281,8 +281,7 @@ For example::
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
or columns provided in a definition of the index.
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
For example::

View File

@@ -140,83 +140,17 @@ Vector Index :label-note:`ScyllaDB Cloud`
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
similarity search on vector data. Vector indexes can be a global index for indexing vectors per table or a local
index for indexing vectors per partition.
similarity search on vector data.
The vector index is the only custom type index supported in ScyllaDB. It is created using
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. It is also possible to
add additional columns to the index for filtering the search results. The partition column
specified in the global vector index definition must be the vector column, and any subsequent
columns are treated as filtering columns. The local vector index requires that the partition key
of the base table is also the partition key of the index and the vector column is the first one
from the following columns.
Example of a simple index:
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. Example:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed to enable similarity search using
a global vector index. Additional filtering can be performed on the primary key
columns of the base table.
Example of a global vector index with additional filtering:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding, category, info)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed to enable similarity search using
a global index. Additional columns are added for filtering the search results.
The filtering is possible on ``category``, ``info`` and all primary key columns
of the base table.
Example of a local vector index:
.. code-block:: cql
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings ((id, created_at), embedding, category, info)
USING 'vector_index'
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
The vector column (``embedding``) is indexed for similarity search (a local
index) and additional columns are added for filtering the search results. The
filtering is possible on ``category``, ``info`` and all primary key columns of
the base table. The columns ``id`` and ``created_at`` must be the partition key
of the base table.
Vector indexes support additional filtering columns of native data types
(excluding counter and duration). The indexed column itself must be a vector
column, while the extra columns can be used to filter search results.
The supported types are:
* ``ascii``
* ``bigint``
* ``blob``
* ``boolean``
* ``date``
* ``decimal``
* ``double``
* ``float``
* ``inet``
* ``int``
* ``smallint``
* ``text``
* ``varchar``
* ``time``
* ``timestamp``
* ``timeuuid``
* ``tinyint``
* ``uuid``
* ``varint``
The following options are supported for vector indexes. All of them are optional.
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+

View File

@@ -1,78 +0,0 @@
# Tablet Migration RPC Decoupling
## Overview
The topology coordinator's tablet migration logic has been refactored to decouple it from RPC operations. This allows the topology coordinator logic to be tested in unit tests without network communication.
## Architecture
### Components
1. **`tablet_migration_rpc_handler`** (interface): Defines the RPC operations needed for tablet migration
- `tablet_repair()`: Initiates tablet repair on a replica
- `tablet_stream_data()`: Streams tablet data to a replica
- `tablet_cleanup()`: Cleans up tablet data on a replica
- `repair_update_compaction_ctrl()`: Updates compaction controller after repair
2. **`messaging_tablet_rpc_handler`**: Production implementation that sends real network RPCs via `messaging_service`
3. **`local_tablet_rpc_simulator`**: Test implementation that calls RPC handlers locally without network communication
### How It Works
#### Production (Real RPCs)
```cpp
// In storage_service.cc
run_topology_coordinator(
sys_dist_ks, gossiper, messaging, shared_tm,
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
raft_topology_cmd_handler,
tablet_allocator, cdc_gens, ring_delay,
lifecycle_notifier, feature_service,
sl_controller, topology_cmd_rpc_tracker
// tablet_rpc_handler defaults to nullptr, so messaging_tablet_rpc_handler is used
);
```
#### Testing (Local Simulation)
```cpp
// In test code
run_topology_coordinator(
sys_dist_ks, gossiper, messaging, shared_tm,
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
raft_topology_cmd_handler,
tablet_allocator, cdc_gens, ring_delay,
lifecycle_notifier, feature_service,
sl_controller, topology_cmd_rpc_tracker,
std::make_unique<local_tablet_rpc_simulator>(storage_service) // Create unique_ptr directly
);
```
## Benefits
1. **Unit Testability**: The topology coordinator's `handle_tablet_migration()` logic can now be tested in unit tests without setting up a multi-node cluster
2. **Separation of Concerns**: Coordinator-side logic (group0 state transitions, barriers) is clearly separated from replica-side logic (RPC operations)
3. **Flexibility**: Different RPC implementations can be provided for different testing scenarios
## Migration Stages
The tablet migration process goes through multiple stages. Each stage has:
- **Coordinator-side logic**: What group0 changes to make, whether barriers are needed, whether rollback is needed
- **Replica-side logic**: RPC operations to perform on tablet replicas
Example stages:
- `allow_write_both_read_old` → coordinator initiates transition
- `write_both_read_old` → coordinator waits for barrier
- `streaming` → coordinator calls `tablet_stream_data()` RPC
- `write_both_read_new` → coordinator waits for barrier
- `use_new` → coordinator transitions to final state
- `cleanup` → coordinator calls `tablet_cleanup()` RPC
- `end_migration` → coordinator removes transition state
## Future Enhancements
The current refactoring enables future enhancements such as:
1. More comprehensive testing of topology coordinator logic
2. Fault injection at the RPC level in tests
3. Better simulation of network conditions in tests
4. Cleaner separation of concerns in the codebase

View File

@@ -17,11 +17,11 @@
#include "index/secondary_index.hh"
#include "index/secondary_index_manager.hh"
#include "types/concrete_types.hh"
#include "types/types.hh"
#include "utils/managed_string.hh"
#include <seastar/core/sstring.hh>
#include <boost/algorithm/string.hpp>
namespace secondary_index {
static void validate_positive_option(int max, const sstring& value_name, const sstring& value) {
@@ -147,88 +147,17 @@ std::optional<cql3::description> vector_index::describe(const index_metadata& im
}
void vector_index::check_target(const schema& schema, const std::vector<::shared_ptr<cql3::statements::index_target>>& targets) const {
struct validate_visitor {
const class schema& schema;
bool& is_vector;
/// Vector indexes support filtering on native types that can be used as primary key columns.
/// There is no counter (it cannot be used with vector columns)
/// and no duration (it cannot be used as a primary key or in secondary indexes).
static bool is_supported_filtering_column(abstract_type const & kind_type) {
switch (kind_type.get_kind()) {
case abstract_type::kind::ascii:
case abstract_type::kind::boolean:
case abstract_type::kind::byte:
case abstract_type::kind::bytes:
case abstract_type::kind::date:
case abstract_type::kind::decimal:
case abstract_type::kind::double_kind:
case abstract_type::kind::float_kind:
case abstract_type::kind::inet:
case abstract_type::kind::int32:
case abstract_type::kind::long_kind:
case abstract_type::kind::short_kind:
case abstract_type::kind::simple_date:
case abstract_type::kind::time:
case abstract_type::kind::timestamp:
case abstract_type::kind::timeuuid:
case abstract_type::kind::utf8:
case abstract_type::kind::uuid:
case abstract_type::kind::varint:
return true;
default:
break;
}
return false;
}
void validate(cql3::column_identifier const& column, bool is_vector) const {
auto const& c_name = column.to_string();
auto const* c_def = schema.get_column_definition(column.name());
if (c_def == nullptr) {
throw exceptions::invalid_request_exception(format("Column {} not found in schema", c_name));
}
auto type = c_def->type;
if (is_vector) {
auto const* vector_type = dynamic_cast<const vector_type_impl*>(type.get());
if (vector_type == nullptr) {
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
}
auto elements_type = vector_type->get_elements_type();
if (elements_type->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
}
return;
}
if (!is_supported_filtering_column(*type)) {
throw exceptions::invalid_request_exception(format("Unsupported vector index filtering column {} type", c_name));
}
}
void operator()(const std::vector<::shared_ptr<cql3::column_identifier>>& columns) const {
for (const auto& column : columns) {
// CQL restricts the secondary local index to have multiple columns with partition key only.
// Vectors shouldn't be partition key columns and they aren't supported as a filtering column,
// so we can assume here that these are non-vectors filtering columns.
validate(*column, false);
}
}
void operator()(const ::shared_ptr<cql3::column_identifier>& column) {
validate(*column, is_vector);
// The first column is the vector column, the rest mustn't be vectors.
is_vector = false;
}
};
bool is_vector = true;
for (const auto& target : targets) {
std::visit(validate_visitor{.schema = schema, .is_vector = is_vector}, target->value);
if (targets.size() != 1) {
throw exceptions::invalid_request_exception("Vector index can only be created on a single column");
}
auto target = targets[0];
auto c_def = schema.get_column_definition(to_bytes(target->column_name()));
if (!c_def) {
throw exceptions::invalid_request_exception(format("Column {} not found in schema", target->column_name()));
}
auto type = c_def->type;
if (!type->is_vector() || static_cast<const vector_type_impl*>(type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
throw exceptions::invalid_request_exception(format("Vector indexes are only supported on columns of vectors of floats", target->column_name()));
}
}

View File

@@ -1,73 +0,0 @@
// Copyright (C) 2026-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#pragma once
#include <seastar/core/future.hh>
#include "locator/tablets.hh"
#include "service/tablet_operation.hh"
#include "service/session.hh"
#include "raft/raft.hh"
namespace service {
/// Interface for handling tablet migration RPC operations.
/// This abstraction decouples the topology coordinator from actual RPC sending,
/// allowing unit tests to provide a local implementation that simulates RPC
/// behavior without network communication.
///
/// Each method corresponds to an RPC verb that the topology coordinator sends
/// to tablet replicas during migration stages.
class tablet_migration_rpc_handler {
public:
virtual ~tablet_migration_rpc_handler() = default;
/// Initiates tablet repair on a replica.
/// Called during the rebuild_repair or repair stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to repair
/// @param sid session ID for the repair operation
/// @return repair result containing the repair timestamp
virtual seastar::future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) = 0;
/// Streams tablet data to a replica.
/// Called during the streaming stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to stream
virtual seastar::future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) = 0;
/// Cleans up tablet data on a replica.
/// Called during the cleanup or cleanup_target stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to clean up
virtual seastar::future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) = 0;
/// Updates compaction controller after repair.
/// Called during the end_repair stage for each tablet replica.
///
/// @param dst destination host
/// @param tablet tablet to update
/// @param sid session ID from the repair operation
virtual seastar::future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) = 0;
};
}

View File

@@ -73,7 +73,6 @@
#include "idl/repair.dist.hh"
#include "service/topology_coordinator.hh"
#include "service/tablet_migration_rpc_handler.hh"
#include <boost/range/join.hpp>
#include <seastar/core/metrics_registration.hh>
@@ -118,51 +117,6 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm
// it's versioned_value::value(), not std::optional::value() - it does not throw
return it->second.value();
}
/// Real implementation of tablet_migration_rpc_handler that sends RPCs via messaging_service.
/// Used by the production topology coordinator to perform actual network communication.
class messaging_tablet_rpc_handler : public tablet_migration_rpc_handler {
netw::messaging_service& _messaging;
abort_source& _as;
public:
messaging_tablet_rpc_handler(netw::messaging_service& messaging, abort_source& as)
: _messaging(messaging), _as(as) {}
future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_repair(
&_messaging, dst, _as, dst_id, tablet, sid);
}
future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_stream_data(
&_messaging, dst, _as, dst_id, tablet);
}
future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_cleanup(
&_messaging, dst, _as, dst_id, tablet);
}
future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) override {
co_return co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(
&_messaging, dst, tablet, sid);
}
};
} // namespace
class topology_coordinator : public endpoint_lifecycle_subscriber
@@ -190,7 +144,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
tablet_allocator& _tablet_allocator;
std::unique_ptr<db::view::view_building_coordinator> _vb_coordinator;
std::unique_ptr<tablet_migration_rpc_handler> _tablet_rpc_handler;
cdc::generation_service& _cdc_gens;
@@ -1650,7 +1603,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
return _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id).discard_result();
return ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id).discard_result();
});
});
})) {
@@ -1722,7 +1676,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
auto dst = trinfo.pending_replica->host;
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return _tablet_rpc_handler->tablet_stream_data(dst, raft::server_id(dst.uuid()), gid);
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid);
});
});
})) {
@@ -1791,7 +1746,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
}).then([] {
return utils::get_local_injector().inject("wait_after_tablet_cleanup", [] (auto& handler) -> future<> {
@@ -1819,7 +1775,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
@@ -1911,7 +1868,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
auto res = co_await _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id);
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id);
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
@@ -1977,7 +1935,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
tablet_state.repair_update_compaction_ctrl_retried++;
}
bool feature = _feature_service.tablet_incremental_repair;
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this,
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this, ms = &_messaging,
gid = gid, sid = tablet_state.session_id, feature, &tmap] () -> future<> {
if (feature) {
if (utils::get_local_injector().enter("fail_rpc_repair_update_compaction_ctrl")) {
@@ -1986,9 +1944,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
throw std::runtime_error(msg);
}
auto& replicas = tmap.get_tablet_info(gid.tablet).replicas;
co_await coroutine::parallel_for_each(replicas, [this, gid, sid] (locator::tablet_replica r) -> future<> {
co_await coroutine::parallel_for_each(replicas, [this, ms, gid, sid] (locator::tablet_replica r) -> future<> {
if (!is_excluded(raft::server_id(r.host.uuid()))) {
co_await _tablet_rpc_handler->repair_update_compaction_ctrl(r.host, gid, sid);
co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(ms, r.host, gid, sid);
}
});
}
@@ -3725,8 +3683,7 @@ public:
gms::feature_service& feature_service,
endpoint_lifecycle_notifier& lifecycle_notifier,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler)
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
, _tablet_load_stats_refresh_interval_in_seconds(db.get_config().tablet_load_stats_refresh_interval_in_seconds)
@@ -3737,7 +3694,6 @@ public:
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
, _tablet_allocator(tablet_allocator)
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
, _tablet_rpc_handler(tablet_rpc_handler ? std::move(tablet_rpc_handler) : std::make_unique<messaging_tablet_rpc_handler>(_messaging, _as))
, _cdc_gens(cdc_gens)
, _tablet_load_stats_refresh([this] {
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
@@ -4410,8 +4366,7 @@ future<> run_topology_coordinator(
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler) {
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
topology_coordinator coordinator{
sys_dist_ks, gossiper, messaging, shared_tm,
@@ -4422,8 +4377,7 @@ future<> run_topology_coordinator(
ring_delay,
feature_service, lifecycle_notifier,
sl_controller,
topology_cmd_rpc_tracker,
std::move(tablet_rpc_handler)};
topology_cmd_rpc_tracker};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);

View File

@@ -58,7 +58,6 @@ namespace service {
class raft_group0;
class tablet_allocator;
class tablet_migration_rpc_handler;
extern logging::logger rtlogger;
@@ -101,8 +100,7 @@ future<> run_topology_coordinator(
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler = nullptr);
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker);
class tablet_ops_metrics {
private:

View File

@@ -933,6 +933,7 @@ def testToJsonFct(cql, test_keyspace):
assert_rows(execute(cql, table, "SELECT k, toJson(durationval) FROM %s WHERE k = ?", 0), [0, "\"1y1mo2d10h5m\""])
# Reproduces issue #8077
@pytest.mark.xfail(reason="issues #8077")
def testJsonWithGroupBy(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int, c int, v int, PRIMARY KEY (k, c))") as table:
# tests SELECT JSON statements
@@ -953,6 +954,7 @@ def testJsonWithGroupBy(cql, test_keyspace):
["{\"count\": 1}"])
# Reproduces issues #8077, #8078
@pytest.mark.xfail(reason="issues #8077")
def testSelectJsonSyntax(cql, test_keyspace):
with create_table(cql, test_keyspace, "(k int primary key, v int)") as table:
# tests SELECT JSON statements

View File

@@ -520,6 +520,7 @@ def test_tojson_decimal_high_mantissa2(cql, table1):
# Reproducers for issue #8077: SELECT JSON on a function call should result
# in the same JSON strings as it does on Cassandra.
@pytest.mark.xfail(reason="issue #8077")
def test_select_json_function_call(cql, table1):
p = unique_key_int()
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, 17) USING TIMESTAMP 1234")

View File

@@ -10,41 +10,6 @@ import pytest
from .util import new_test_table, is_scylla, unique_name
from cassandra.protocol import InvalidRequest, ConfigurationException
supported_filtering_types = [
'ascii',
'bigint',
'blob',
'boolean',
'date',
'decimal',
'double',
'float',
'inet',
'int',
'smallint',
'text',
'varchar',
'time',
'timestamp',
'timeuuid',
'tinyint',
'uuid',
'varint',
]
unsupported_filtering_types = [
'duration',
'map<int, int>',
'list<int>',
'set<int>',
'tuple<int, int>',
'vector<float, 3>',
'frozen<map<int, int>>',
'frozen<list<int>>',
'frozen<set<int>>',
'frozen<tuple<int, int>>',
]
def test_create_vector_search_index(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p int primary key, v vector<float, 3>'
with new_test_table(cql, test_keyspace, schema) as table:
@@ -80,57 +45,6 @@ def test_create_vector_search_index_on_nonvector_column(cql, test_keyspace, scyl
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v) USING 'vector_index'")
def test_create_vector_search_global_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v, f1, f2) USING 'vector_index'")
def test_create_vector_search_local_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
def test_create_vector_search_local_index_with_filtering_columns_on_nonvector_column(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p1 int, p2 int, c1 int, c2 int, v int, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
with new_test_table(cql, test_keyspace, schema) as table:
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
def test_create_vector_search_index_with_supported_and_unsupported_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
supported_columns = ', '.join([f's{idx} {typ}' for idx, typ in enumerate(supported_filtering_types)])
unsupported_columns = ', '.join([f'u{idx} {typ}' for idx, typ in enumerate(unsupported_filtering_types)])
schema = f'p int, c int, v vector<float, 3>, {supported_columns}, {unsupported_columns}, primary key (p, c)'
with new_test_table(cql, test_keyspace, schema) as table:
for idx in range(len(supported_filtering_types)):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, s{idx}) USING 'vector_index'")
cql.execute(f"DROP INDEX {test_keyspace}.global_idx")
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, s{idx}) USING 'vector_index'")
cql.execute(f"DROP INDEX {test_keyspace}.local_idx")
for idx in range(len(unsupported_filtering_types)):
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, u{idx}) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, u{idx}) USING 'vector_index'")
def test_create_vector_search_local_index_with_unsupported_partition_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
for filter_type in unsupported_filtering_types:
schema = f'p {filter_type}, c int, v vector<float, 3>, f int, primary key (p, c)'
with pytest.raises(InvalidRequest, match="Unsupported|Invalid"):
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p), v, f) USING 'vector_index'")
def test_create_vector_search_index_with_duplicated_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = f'p int, c int, v vector<float, 3>, x int, primary key (p, c)'
with new_test_table(cql, test_keyspace, schema) as table:
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, p) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, x, x) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, p) USING 'vector_index'")
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, x, x) USING 'vector_index'")
def test_create_vector_search_index_with_bad_options(cql, test_keyspace, scylla_only, skip_without_tablets):
schema = 'p int primary key, v vector<float, 3>'
with new_test_table(cql, test_keyspace, schema) as table:

View File

@@ -1,68 +0,0 @@
// Copyright (C) 2026-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#pragma once
#include "service/tablet_migration_rpc_handler.hh"
#include "service/storage_service.hh"
#include "utils/log.hh"
namespace service {
extern logging::logger rtlogger;
/// Test implementation of tablet_migration_rpc_handler that calls local RPC handlers
/// instead of sending network RPCs. This allows testing the topology coordinator logic
/// in unit tests without network communication.
///
/// Usage in tests:
/// auto handler = std::make_unique<local_tablet_rpc_simulator>(storage_service);
/// // Pass handler to run_topology_coordinator
class local_tablet_rpc_simulator : public tablet_migration_rpc_handler {
storage_service& _storage_service;
public:
explicit local_tablet_rpc_simulator(storage_service& ss)
: _storage_service(ss) {}
seastar::future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) override {
rtlogger.debug("local_tablet_rpc_simulator: repair tablet {} on {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.repair_tablet(tablet, sid);
}
seastar::future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
rtlogger.debug("local_tablet_rpc_simulator: stream tablet {} to {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.stream_tablet(tablet);
}
seastar::future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
rtlogger.debug("local_tablet_rpc_simulator: cleanup tablet {} on {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.cleanup_tablet(tablet);
}
seastar::future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) override {
rtlogger.debug("local_tablet_rpc_simulator: repair_update_compaction_ctrl for tablet {} on {}", tablet, dst);
// For the test simulator, we don't need to actually update compaction controller
// as there's no real compaction happening in the tests.
// Just log and return success.
co_return;
}
};
}