Compare commits
2 Commits
copilot/re
...
copilot/co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e806cb3f7 | ||
|
|
f267af38bd |
@@ -1,84 +0,0 @@
|
||||
# Tablet Migration RPC Decoupling - Implementation Summary
|
||||
|
||||
## What Was Changed
|
||||
|
||||
### Problem Statement
|
||||
The topology_coordinator's `handle_tablet_migration()` function was tightly coupled with RPC sending, making it difficult to test in unit tests without setting up a full multi-node cluster.
|
||||
|
||||
### Solution
|
||||
Introduced an abstraction layer (`tablet_migration_rpc_handler`) that decouples the coordinator logic from actual RPC operations.
|
||||
|
||||
## Files Added
|
||||
|
||||
### 1. `service/tablet_migration_rpc_handler.hh`
|
||||
Interface defining the RPC operations needed for tablet migration:
|
||||
- `tablet_repair()` - Repair a tablet on a replica
|
||||
- `tablet_stream_data()` - Stream tablet data to a replica
|
||||
- `tablet_cleanup()` - Clean up tablet data on a replica
|
||||
- `repair_update_compaction_ctrl()` - Update compaction controller after repair
|
||||
|
||||
### 2. `test/lib/local_tablet_rpc_simulator.hh`
|
||||
Test implementation that calls local RPC handlers instead of sending network RPCs.
|
||||
|
||||
### 3. `docs/dev/tablet-migration-rpc-decoupling.md`
|
||||
Documentation explaining the architecture and how to use it.
|
||||
|
||||
## Files Modified
|
||||
|
||||
### 1. `service/topology_coordinator.cc`
|
||||
- Added `messaging_tablet_rpc_handler` class - production implementation that wraps `messaging_service` RPCs
|
||||
- Updated `handle_tablet_migration()` to use `_tablet_rpc_handler` interface instead of direct RPC calls
|
||||
- Updated constructor to accept optional RPC handler and initialize it
|
||||
- Modified 5 locations where tablet RPCs were called:
|
||||
- rebuild_repair stage → `tablet_repair()`
|
||||
- streaming stage → `tablet_stream_data()`
|
||||
- cleanup stage → `tablet_cleanup()` (2 locations)
|
||||
- repair stage → `tablet_repair()`
|
||||
- end_repair stage → `repair_update_compaction_ctrl()`
|
||||
|
||||
### 2. `service/topology_coordinator.hh`
|
||||
- Added forward declaration for `tablet_migration_rpc_handler`
|
||||
- Updated `run_topology_coordinator()` signature to accept optional `tablet_rpc_handler` parameter
|
||||
|
||||
## How It Works
|
||||
|
||||
### Production Mode (Default)
|
||||
```cpp
|
||||
run_topology_coordinator(...);
|
||||
// Automatically uses messaging_tablet_rpc_handler
|
||||
```
|
||||
|
||||
The topology coordinator creates a `messaging_tablet_rpc_handler` that sends actual network RPCs.
|
||||
|
||||
### Test Mode
|
||||
```cpp
|
||||
run_topology_coordinator(..., std::make_unique<local_tablet_rpc_simulator>(storage_service));
|
||||
```
|
||||
|
||||
Tests can provide a `local_tablet_rpc_simulator` that calls local RPC handlers directly, bypassing the network layer.
|
||||
|
||||
## Benefits
|
||||
|
||||
1. **Testability**: Can now test `handle_tablet_migration()` logic in unit tests
|
||||
2. **Separation of Concerns**: Coordinator logic (group0 transitions, barriers) is separated from replica logic (RPCs)
|
||||
3. **Minimal Changes**: The refactoring is minimal and surgical - only abstracts the RPC layer
|
||||
4. **Backward Compatible**: Existing code continues to work without changes
|
||||
|
||||
## What Can Now Be Tested
|
||||
|
||||
With this refactoring, tests can now:
|
||||
1. Verify that tablet migrations progress through the correct stages
|
||||
2. Test barrier coordination between stages
|
||||
3. Test rollback logic when migrations fail
|
||||
4. Test the interaction between migrations and topology changes
|
||||
5. Simulate various failure scenarios at the RPC level
|
||||
|
||||
All without needing multiple nodes or network communication.
|
||||
|
||||
## Future Work
|
||||
|
||||
This refactoring enables future enhancements:
|
||||
1. More comprehensive topology coordinator unit tests
|
||||
2. Fault injection at the RPC level in tests
|
||||
3. Better simulation of network conditions
|
||||
4. Potential extraction of more coordinator logic into testable components
|
||||
@@ -244,7 +244,10 @@ static bool is_set_of(const rjson::value& type1, const rjson::value& type2) {
|
||||
|
||||
// Check if two JSON-encoded values match with the CONTAINS relation
|
||||
bool check_CONTAINS(const rjson::value* v1, const rjson::value& v2, bool v1_from_query, bool v2_from_query) {
|
||||
if (!v1) {
|
||||
if (!v1 || !v1->IsObject() || v1->MemberCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
if (!v2.IsObject() || v2.MemberCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
const auto& kv1 = *v1->MemberBegin();
|
||||
|
||||
@@ -45,7 +45,7 @@ bool consumed_capacity_counter::should_add_capacity(const rjson::value& request)
|
||||
}
|
||||
|
||||
void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjson::value& response) const noexcept {
|
||||
if (_should_add_to_reponse) {
|
||||
if (_should_add_to_response) {
|
||||
auto consumption = rjson::empty_object();
|
||||
rjson::add(consumption, "CapacityUnits", get_consumed_capacity_units());
|
||||
rjson::add(response, "ConsumedCapacity", std::move(consumption));
|
||||
@@ -53,7 +53,9 @@ void consumed_capacity_counter::add_consumed_capacity_to_response_if_needed(rjso
|
||||
}
|
||||
|
||||
static uint64_t calculate_half_units(uint64_t unit_block_size, uint64_t total_bytes, bool is_quorum) {
|
||||
uint64_t half_units = (total_bytes + unit_block_size -1) / unit_block_size; //divide by unit_block_size and round up
|
||||
// Avoid potential integer overflow when total_bytes is close to UINT64_MAX
|
||||
// by using division with modulo instead of addition before division
|
||||
uint64_t half_units = total_bytes / unit_block_size + (total_bytes % unit_block_size != 0 ? 1 : 0);
|
||||
|
||||
if (is_quorum) {
|
||||
half_units *= 2;
|
||||
|
||||
@@ -28,9 +28,9 @@ namespace alternator {
|
||||
class consumed_capacity_counter {
|
||||
public:
|
||||
consumed_capacity_counter() = default;
|
||||
consumed_capacity_counter(bool should_add_to_reponse) : _should_add_to_reponse(should_add_to_reponse){}
|
||||
consumed_capacity_counter(bool should_add_to_response) : _should_add_to_response(should_add_to_response){}
|
||||
bool operator()() const noexcept {
|
||||
return _should_add_to_reponse;
|
||||
return _should_add_to_response;
|
||||
}
|
||||
|
||||
consumed_capacity_counter& operator +=(uint64_t bytes);
|
||||
@@ -44,7 +44,7 @@ public:
|
||||
uint64_t _total_bytes = 0;
|
||||
static bool should_add_capacity(const rjson::value& request);
|
||||
protected:
|
||||
bool _should_add_to_reponse = false;
|
||||
bool _should_add_to_response = false;
|
||||
};
|
||||
|
||||
class rcu_consumed_capacity_counter : public consumed_capacity_counter {
|
||||
|
||||
@@ -834,11 +834,13 @@ future<> executor::fill_table_size(rjson::value &table_description, schema_ptr s
|
||||
total_size = co_await _ss.estimate_total_sstable_volume(schema->id(), service::storage_service::ignore_errors::yes);
|
||||
const auto expiry = std::chrono::seconds{ _proxy.data_dictionary().get_config().alternator_describe_table_info_cache_validity_in_seconds() };
|
||||
// Note: we don't care when the notification of other shards will finish, as long as it will be done
|
||||
// it's possible to get into race condition (next DescribeTable comes to other shard, that new shard doesn't have
|
||||
// the size yet, so it will calculate it again) - this is not a problem, because it will call cache_newly_calculated_size_on_all_shards
|
||||
// with expiry, which is extremely unlikely to be exactly the same as the previous one, all shards will keep the size coming with expiry that is further into the future.
|
||||
// In case of the same expiry, some shards will have different size, which means DescribeTable will return different values depending on the shard
|
||||
// which is also fine, as the specification doesn't give precision guarantees of any kind.
|
||||
// A race condition is possible: if a DescribeTable request arrives on a different shard before
|
||||
// that shard receives the cached size, it will recalculate independently. This is acceptable because:
|
||||
// 1. Both calculations will cache their results with an expiry time
|
||||
// 2. Expiry times are unlikely to be identical, so eventually all shards converge to the most recent value
|
||||
// 3. Even if expiry times match, different shards may briefly return different table sizes
|
||||
// 4. This temporary inconsistency is acceptable per DynamoDB specification, which doesn't guarantee
|
||||
// exact precision for DescribeTable size information
|
||||
co_await cache_newly_calculated_size_on_all_shards(schema, total_size, expiry);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,8 +281,7 @@ For example::
|
||||
ORDER BY embedding ANN OF [0.1, 0.2, 0.3, 0.4] LIMIT 5;
|
||||
|
||||
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key
|
||||
or columns provided in a definition of the index.
|
||||
Vector queries also support filtering with ``WHERE`` clauses on columns that are part of the primary key.
|
||||
|
||||
For example::
|
||||
|
||||
|
||||
@@ -140,83 +140,17 @@ Vector Index :label-note:`ScyllaDB Cloud`
|
||||
`ScyllaDB Cloud documentation <https://cloud.docs.scylladb.com/stable/vector-search/>`_.
|
||||
|
||||
ScyllaDB supports creating vector indexes on tables, allowing queries on the table to use those indexes for efficient
|
||||
similarity search on vector data. Vector indexes can be a global index for indexing vectors per table or a local
|
||||
index for indexing vectors per partition.
|
||||
similarity search on vector data.
|
||||
|
||||
The vector index is the only custom type index supported in ScyllaDB. It is created using
|
||||
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. It is also possible to
|
||||
add additional columns to the index for filtering the search results. The partition column
|
||||
specified in the global vector index definition must be the vector column, and any subsequent
|
||||
columns are treated as filtering columns. The local vector index requires that the partition key
|
||||
of the base table is also the partition key of the index and the vector column is the first one
|
||||
from the following columns.
|
||||
|
||||
Example of a simple index:
|
||||
the ``CUSTOM`` keyword and specifying the index type as ``vector_index``. Example:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed to enable similarity search using
|
||||
a global vector index. Additional filtering can be performed on the primary key
|
||||
columns of the base table.
|
||||
|
||||
Example of a global vector index with additional filtering:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings (embedding, category, info)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed to enable similarity search using
|
||||
a global index. Additional columns are added for filtering the search results.
|
||||
The filtering is possible on ``category``, ``info`` and all primary key columns
|
||||
of the base table.
|
||||
|
||||
Example of a local vector index:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
CREATE CUSTOM INDEX vectorIndex ON ImageEmbeddings ((id, created_at), embedding, category, info)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE', 'maximum_node_connections': '16'};
|
||||
|
||||
The vector column (``embedding``) is indexed for similarity search (a local
|
||||
index) and additional columns are added for filtering the search results. The
|
||||
filtering is possible on ``category``, ``info`` and all primary key columns of
|
||||
the base table. The columns ``id`` and ``created_at`` must be the partition key
|
||||
of the base table.
|
||||
|
||||
Vector indexes support additional filtering columns of native data types
|
||||
(excluding counter and duration). The indexed column itself must be a vector
|
||||
column, while the extra columns can be used to filter search results.
|
||||
|
||||
The supported types are:
|
||||
|
||||
* ``ascii``
|
||||
* ``bigint``
|
||||
* ``blob``
|
||||
* ``boolean``
|
||||
* ``date``
|
||||
* ``decimal``
|
||||
* ``double``
|
||||
* ``float``
|
||||
* ``inet``
|
||||
* ``int``
|
||||
* ``smallint``
|
||||
* ``text``
|
||||
* ``varchar``
|
||||
* ``time``
|
||||
* ``timestamp``
|
||||
* ``timeuuid``
|
||||
* ``tinyint``
|
||||
* ``uuid``
|
||||
* ``varint``
|
||||
|
||||
|
||||
The following options are supported for vector indexes. All of them are optional.
|
||||
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
# Tablet Migration RPC Decoupling
|
||||
|
||||
## Overview
|
||||
|
||||
The topology coordinator's tablet migration logic has been refactored to decouple it from RPC operations. This allows the topology coordinator logic to be tested in unit tests without network communication.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Components
|
||||
|
||||
1. **`tablet_migration_rpc_handler`** (interface): Defines the RPC operations needed for tablet migration
|
||||
- `tablet_repair()`: Initiates tablet repair on a replica
|
||||
- `tablet_stream_data()`: Streams tablet data to a replica
|
||||
- `tablet_cleanup()`: Cleans up tablet data on a replica
|
||||
- `repair_update_compaction_ctrl()`: Updates compaction controller after repair
|
||||
|
||||
2. **`messaging_tablet_rpc_handler`**: Production implementation that sends real network RPCs via `messaging_service`
|
||||
|
||||
3. **`local_tablet_rpc_simulator`**: Test implementation that calls RPC handlers locally without network communication
|
||||
|
||||
### How It Works
|
||||
|
||||
#### Production (Real RPCs)
|
||||
```cpp
|
||||
// In storage_service.cc
|
||||
run_topology_coordinator(
|
||||
sys_dist_ks, gossiper, messaging, shared_tm,
|
||||
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
|
||||
raft_topology_cmd_handler,
|
||||
tablet_allocator, cdc_gens, ring_delay,
|
||||
lifecycle_notifier, feature_service,
|
||||
sl_controller, topology_cmd_rpc_tracker
|
||||
// tablet_rpc_handler defaults to nullptr, so messaging_tablet_rpc_handler is used
|
||||
);
|
||||
```
|
||||
|
||||
#### Testing (Local Simulation)
|
||||
```cpp
|
||||
// In test code
|
||||
run_topology_coordinator(
|
||||
sys_dist_ks, gossiper, messaging, shared_tm,
|
||||
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
|
||||
raft_topology_cmd_handler,
|
||||
tablet_allocator, cdc_gens, ring_delay,
|
||||
lifecycle_notifier, feature_service,
|
||||
sl_controller, topology_cmd_rpc_tracker,
|
||||
std::make_unique<local_tablet_rpc_simulator>(storage_service) // Create unique_ptr directly
|
||||
);
|
||||
```
|
||||
|
||||
## Benefits
|
||||
|
||||
1. **Unit Testability**: The topology coordinator's `handle_tablet_migration()` logic can now be tested in unit tests without setting up a multi-node cluster
|
||||
2. **Separation of Concerns**: Coordinator-side logic (group0 state transitions, barriers) is clearly separated from replica-side logic (RPC operations)
|
||||
3. **Flexibility**: Different RPC implementations can be provided for different testing scenarios
|
||||
|
||||
## Migration Stages
|
||||
|
||||
The tablet migration process goes through multiple stages. Each stage has:
|
||||
- **Coordinator-side logic**: What group0 changes to make, whether barriers are needed, whether rollback is needed
|
||||
- **Replica-side logic**: RPC operations to perform on tablet replicas
|
||||
|
||||
Example stages:
|
||||
- `allow_write_both_read_old` → coordinator initiates transition
|
||||
- `write_both_read_old` → coordinator waits for barrier
|
||||
- `streaming` → coordinator calls `tablet_stream_data()` RPC
|
||||
- `write_both_read_new` → coordinator waits for barrier
|
||||
- `use_new` → coordinator transitions to final state
|
||||
- `cleanup` → coordinator calls `tablet_cleanup()` RPC
|
||||
- `end_migration` → coordinator removes transition state
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
The current refactoring enables future enhancements such as:
|
||||
1. More comprehensive testing of topology coordinator logic
|
||||
2. Fault injection at the RPC level in tests
|
||||
3. Better simulation of network conditions in tests
|
||||
4. Cleaner separation of concerns in the codebase
|
||||
@@ -17,11 +17,11 @@
|
||||
#include "index/secondary_index.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
|
||||
namespace secondary_index {
|
||||
|
||||
static void validate_positive_option(int max, const sstring& value_name, const sstring& value) {
|
||||
@@ -147,88 +147,17 @@ std::optional<cql3::description> vector_index::describe(const index_metadata& im
|
||||
}
|
||||
|
||||
void vector_index::check_target(const schema& schema, const std::vector<::shared_ptr<cql3::statements::index_target>>& targets) const {
|
||||
|
||||
struct validate_visitor {
|
||||
const class schema& schema;
|
||||
bool& is_vector;
|
||||
|
||||
/// Vector indexes support filtering on native types that can be used as primary key columns.
|
||||
/// There is no counter (it cannot be used with vector columns)
|
||||
/// and no duration (it cannot be used as a primary key or in secondary indexes).
|
||||
static bool is_supported_filtering_column(abstract_type const & kind_type) {
|
||||
switch (kind_type.get_kind()) {
|
||||
case abstract_type::kind::ascii:
|
||||
case abstract_type::kind::boolean:
|
||||
case abstract_type::kind::byte:
|
||||
case abstract_type::kind::bytes:
|
||||
case abstract_type::kind::date:
|
||||
case abstract_type::kind::decimal:
|
||||
case abstract_type::kind::double_kind:
|
||||
case abstract_type::kind::float_kind:
|
||||
case abstract_type::kind::inet:
|
||||
case abstract_type::kind::int32:
|
||||
case abstract_type::kind::long_kind:
|
||||
case abstract_type::kind::short_kind:
|
||||
case abstract_type::kind::simple_date:
|
||||
case abstract_type::kind::time:
|
||||
case abstract_type::kind::timestamp:
|
||||
case abstract_type::kind::timeuuid:
|
||||
case abstract_type::kind::utf8:
|
||||
case abstract_type::kind::uuid:
|
||||
case abstract_type::kind::varint:
|
||||
return true;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void validate(cql3::column_identifier const& column, bool is_vector) const {
|
||||
auto const& c_name = column.to_string();
|
||||
auto const* c_def = schema.get_column_definition(column.name());
|
||||
if (c_def == nullptr) {
|
||||
throw exceptions::invalid_request_exception(format("Column {} not found in schema", c_name));
|
||||
}
|
||||
|
||||
auto type = c_def->type;
|
||||
|
||||
if (is_vector) {
|
||||
auto const* vector_type = dynamic_cast<const vector_type_impl*>(type.get());
|
||||
if (vector_type == nullptr) {
|
||||
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
|
||||
}
|
||||
|
||||
auto elements_type = vector_type->get_elements_type();
|
||||
if (elements_type->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception("Vector indexes are only supported on columns of vectors of floats");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!is_supported_filtering_column(*type)) {
|
||||
throw exceptions::invalid_request_exception(format("Unsupported vector index filtering column {} type", c_name));
|
||||
}
|
||||
}
|
||||
|
||||
void operator()(const std::vector<::shared_ptr<cql3::column_identifier>>& columns) const {
|
||||
for (const auto& column : columns) {
|
||||
// CQL restricts the secondary local index to have multiple columns with partition key only.
|
||||
// Vectors shouldn't be partition key columns and they aren't supported as a filtering column,
|
||||
// so we can assume here that these are non-vectors filtering columns.
|
||||
validate(*column, false);
|
||||
}
|
||||
}
|
||||
|
||||
void operator()(const ::shared_ptr<cql3::column_identifier>& column) {
|
||||
validate(*column, is_vector);
|
||||
// The first column is the vector column, the rest mustn't be vectors.
|
||||
is_vector = false;
|
||||
}
|
||||
};
|
||||
|
||||
bool is_vector = true;
|
||||
for (const auto& target : targets) {
|
||||
std::visit(validate_visitor{.schema = schema, .is_vector = is_vector}, target->value);
|
||||
if (targets.size() != 1) {
|
||||
throw exceptions::invalid_request_exception("Vector index can only be created on a single column");
|
||||
}
|
||||
auto target = targets[0];
|
||||
auto c_def = schema.get_column_definition(to_bytes(target->column_name()));
|
||||
if (!c_def) {
|
||||
throw exceptions::invalid_request_exception(format("Column {} not found in schema", target->column_name()));
|
||||
}
|
||||
auto type = c_def->type;
|
||||
if (!type->is_vector() || static_cast<const vector_type_impl*>(type.get())->get_elements_type()->get_kind() != abstract_type::kind::float_kind) {
|
||||
throw exceptions::invalid_request_exception(format("Vector indexes are only supported on columns of vectors of floats", target->column_name()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
// Copyright (C) 2026-present ScyllaDB
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include "locator/tablets.hh"
|
||||
#include "service/tablet_operation.hh"
|
||||
#include "service/session.hh"
|
||||
#include "raft/raft.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
/// Interface for handling tablet migration RPC operations.
|
||||
/// This abstraction decouples the topology coordinator from actual RPC sending,
|
||||
/// allowing unit tests to provide a local implementation that simulates RPC
|
||||
/// behavior without network communication.
|
||||
///
|
||||
/// Each method corresponds to an RPC verb that the topology coordinator sends
|
||||
/// to tablet replicas during migration stages.
|
||||
class tablet_migration_rpc_handler {
|
||||
public:
|
||||
virtual ~tablet_migration_rpc_handler() = default;
|
||||
|
||||
/// Initiates tablet repair on a replica.
|
||||
/// Called during the rebuild_repair or repair stage.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param dst_id destination raft server ID
|
||||
/// @param tablet tablet to repair
|
||||
/// @param sid session ID for the repair operation
|
||||
/// @return repair result containing the repair timestamp
|
||||
virtual seastar::future<tablet_operation_repair_result> tablet_repair(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) = 0;
|
||||
|
||||
/// Streams tablet data to a replica.
|
||||
/// Called during the streaming stage.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param dst_id destination raft server ID
|
||||
/// @param tablet tablet to stream
|
||||
virtual seastar::future<> tablet_stream_data(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) = 0;
|
||||
|
||||
/// Cleans up tablet data on a replica.
|
||||
/// Called during the cleanup or cleanup_target stage.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param dst_id destination raft server ID
|
||||
/// @param tablet tablet to clean up
|
||||
virtual seastar::future<> tablet_cleanup(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) = 0;
|
||||
|
||||
/// Updates compaction controller after repair.
|
||||
/// Called during the end_repair stage for each tablet replica.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param tablet tablet to update
|
||||
/// @param sid session ID from the repair operation
|
||||
virtual seastar::future<> repair_update_compaction_ctrl(
|
||||
locator::host_id dst,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -73,7 +73,6 @@
|
||||
#include "idl/repair.dist.hh"
|
||||
|
||||
#include "service/topology_coordinator.hh"
|
||||
#include "service/tablet_migration_rpc_handler.hh"
|
||||
|
||||
#include <boost/range/join.hpp>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
@@ -118,51 +117,6 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm
|
||||
// it's versioned_value::value(), not std::optional::value() - it does not throw
|
||||
return it->second.value();
|
||||
}
|
||||
|
||||
/// Real implementation of tablet_migration_rpc_handler that sends RPCs via messaging_service.
|
||||
/// Used by the production topology coordinator to perform actual network communication.
|
||||
class messaging_tablet_rpc_handler : public tablet_migration_rpc_handler {
|
||||
netw::messaging_service& _messaging;
|
||||
abort_source& _as;
|
||||
|
||||
public:
|
||||
messaging_tablet_rpc_handler(netw::messaging_service& messaging, abort_source& as)
|
||||
: _messaging(messaging), _as(as) {}
|
||||
|
||||
future<tablet_operation_repair_result> tablet_repair(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
co_return co_await ser::storage_service_rpc_verbs::send_tablet_repair(
|
||||
&_messaging, dst, _as, dst_id, tablet, sid);
|
||||
}
|
||||
|
||||
future<> tablet_stream_data(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
co_return co_await ser::storage_service_rpc_verbs::send_tablet_stream_data(
|
||||
&_messaging, dst, _as, dst_id, tablet);
|
||||
}
|
||||
|
||||
future<> tablet_cleanup(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
co_return co_await ser::storage_service_rpc_verbs::send_tablet_cleanup(
|
||||
&_messaging, dst, _as, dst_id, tablet);
|
||||
}
|
||||
|
||||
future<> repair_update_compaction_ctrl(
|
||||
locator::host_id dst,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(
|
||||
&_messaging, dst, tablet, sid);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
@@ -190,7 +144,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
|
||||
tablet_allocator& _tablet_allocator;
|
||||
std::unique_ptr<db::view::view_building_coordinator> _vb_coordinator;
|
||||
std::unique_ptr<tablet_migration_rpc_handler> _tablet_rpc_handler;
|
||||
|
||||
cdc::generation_service& _cdc_gens;
|
||||
|
||||
@@ -1650,7 +1603,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
|
||||
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
|
||||
return _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id).discard_result();
|
||||
return ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
|
||||
dst, _as, raft::server_id(dst.uuid()), gid, session_id).discard_result();
|
||||
});
|
||||
});
|
||||
})) {
|
||||
@@ -1722,7 +1676,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
auto dst = trinfo.pending_replica->host;
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return _tablet_rpc_handler->tablet_stream_data(dst, raft::server_id(dst.uuid()), gid);
|
||||
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
|
||||
dst, _as, raft::server_id(dst.uuid()), gid);
|
||||
});
|
||||
});
|
||||
})) {
|
||||
@@ -1791,7 +1746,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst);
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
|
||||
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
|
||||
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
|
||||
});
|
||||
}).then([] {
|
||||
return utils::get_local_injector().inject("wait_after_tablet_cleanup", [] (auto& handler) -> future<> {
|
||||
@@ -1819,7 +1775,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
|
||||
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
|
||||
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
|
||||
});
|
||||
});
|
||||
})) {
|
||||
@@ -1911,7 +1868,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
|
||||
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
|
||||
service::session_id::create_random_id() : trinfo->session_id;
|
||||
auto res = co_await _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id);
|
||||
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
|
||||
dst, _as, raft::server_id(dst.uuid()), gid, session_id);
|
||||
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
|
||||
auto& tablet_state = _tablets[tablet];
|
||||
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
|
||||
@@ -1977,7 +1935,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
tablet_state.repair_update_compaction_ctrl_retried++;
|
||||
}
|
||||
bool feature = _feature_service.tablet_incremental_repair;
|
||||
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this,
|
||||
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this, ms = &_messaging,
|
||||
gid = gid, sid = tablet_state.session_id, feature, &tmap] () -> future<> {
|
||||
if (feature) {
|
||||
if (utils::get_local_injector().enter("fail_rpc_repair_update_compaction_ctrl")) {
|
||||
@@ -1986,9 +1944,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
auto& replicas = tmap.get_tablet_info(gid.tablet).replicas;
|
||||
co_await coroutine::parallel_for_each(replicas, [this, gid, sid] (locator::tablet_replica r) -> future<> {
|
||||
co_await coroutine::parallel_for_each(replicas, [this, ms, gid, sid] (locator::tablet_replica r) -> future<> {
|
||||
if (!is_excluded(raft::server_id(r.host.uuid()))) {
|
||||
co_await _tablet_rpc_handler->repair_update_compaction_ctrl(r.host, gid, sid);
|
||||
co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(ms, r.host, gid, sid);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -3725,8 +3683,7 @@ public:
|
||||
gms::feature_service& feature_service,
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
qos::service_level_controller& sl_controller,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
|
||||
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler)
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
|
||||
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
|
||||
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
|
||||
, _tablet_load_stats_refresh_interval_in_seconds(db.get_config().tablet_load_stats_refresh_interval_in_seconds)
|
||||
@@ -3737,7 +3694,6 @@ public:
|
||||
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
|
||||
, _tablet_allocator(tablet_allocator)
|
||||
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
|
||||
, _tablet_rpc_handler(tablet_rpc_handler ? std::move(tablet_rpc_handler) : std::make_unique<messaging_tablet_rpc_handler>(_messaging, _as))
|
||||
, _cdc_gens(cdc_gens)
|
||||
, _tablet_load_stats_refresh([this] {
|
||||
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
|
||||
@@ -4410,8 +4366,7 @@ future<> run_topology_coordinator(
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
gms::feature_service& feature_service,
|
||||
qos::service_level_controller& sl_controller,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
|
||||
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler) {
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
|
||||
|
||||
topology_coordinator coordinator{
|
||||
sys_dist_ks, gossiper, messaging, shared_tm,
|
||||
@@ -4422,8 +4377,7 @@ future<> run_topology_coordinator(
|
||||
ring_delay,
|
||||
feature_service, lifecycle_notifier,
|
||||
sl_controller,
|
||||
topology_cmd_rpc_tracker,
|
||||
std::move(tablet_rpc_handler)};
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
|
||||
@@ -58,7 +58,6 @@ namespace service {
|
||||
|
||||
class raft_group0;
|
||||
class tablet_allocator;
|
||||
class tablet_migration_rpc_handler;
|
||||
|
||||
extern logging::logger rtlogger;
|
||||
|
||||
@@ -101,8 +100,7 @@ future<> run_topology_coordinator(
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
gms::feature_service& feature_service,
|
||||
qos::service_level_controller& sl_controller,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
|
||||
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler = nullptr);
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker);
|
||||
|
||||
class tablet_ops_metrics {
|
||||
private:
|
||||
|
||||
@@ -933,6 +933,7 @@ def testToJsonFct(cql, test_keyspace):
|
||||
assert_rows(execute(cql, table, "SELECT k, toJson(durationval) FROM %s WHERE k = ?", 0), [0, "\"1y1mo2d10h5m\""])
|
||||
|
||||
# Reproduces issue #8077
|
||||
@pytest.mark.xfail(reason="issues #8077")
|
||||
def testJsonWithGroupBy(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int, c int, v int, PRIMARY KEY (k, c))") as table:
|
||||
# tests SELECT JSON statements
|
||||
@@ -953,6 +954,7 @@ def testJsonWithGroupBy(cql, test_keyspace):
|
||||
["{\"count\": 1}"])
|
||||
|
||||
# Reproduces issues #8077, #8078
|
||||
@pytest.mark.xfail(reason="issues #8077")
|
||||
def testSelectJsonSyntax(cql, test_keyspace):
|
||||
with create_table(cql, test_keyspace, "(k int primary key, v int)") as table:
|
||||
# tests SELECT JSON statements
|
||||
|
||||
@@ -520,6 +520,7 @@ def test_tojson_decimal_high_mantissa2(cql, table1):
|
||||
|
||||
# Reproducers for issue #8077: SELECT JSON on a function call should result
|
||||
# in the same JSON strings as it does on Cassandra.
|
||||
@pytest.mark.xfail(reason="issue #8077")
|
||||
def test_select_json_function_call(cql, table1):
|
||||
p = unique_key_int()
|
||||
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, 17) USING TIMESTAMP 1234")
|
||||
|
||||
@@ -10,41 +10,6 @@ import pytest
|
||||
from .util import new_test_table, is_scylla, unique_name
|
||||
from cassandra.protocol import InvalidRequest, ConfigurationException
|
||||
|
||||
supported_filtering_types = [
|
||||
'ascii',
|
||||
'bigint',
|
||||
'blob',
|
||||
'boolean',
|
||||
'date',
|
||||
'decimal',
|
||||
'double',
|
||||
'float',
|
||||
'inet',
|
||||
'int',
|
||||
'smallint',
|
||||
'text',
|
||||
'varchar',
|
||||
'time',
|
||||
'timestamp',
|
||||
'timeuuid',
|
||||
'tinyint',
|
||||
'uuid',
|
||||
'varint',
|
||||
]
|
||||
|
||||
unsupported_filtering_types = [
|
||||
'duration',
|
||||
'map<int, int>',
|
||||
'list<int>',
|
||||
'set<int>',
|
||||
'tuple<int, int>',
|
||||
'vector<float, 3>',
|
||||
'frozen<map<int, int>>',
|
||||
'frozen<list<int>>',
|
||||
'frozen<set<int>>',
|
||||
'frozen<tuple<int, int>>',
|
||||
]
|
||||
|
||||
def test_create_vector_search_index(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p int primary key, v vector<float, 3>'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
@@ -80,57 +45,6 @@ def test_create_vector_search_index_on_nonvector_column(cql, test_keyspace, scyl
|
||||
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_global_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}(v, f1, f2) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_local_index_with_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p1 int, p2 int, c1 int, c2 int, v vector<float, 3>, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_local_index_with_filtering_columns_on_nonvector_column(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p1 int, p2 int, c1 int, c2 int, v int, f1 int, f2 int, primary key ((p1, p2), c1, c2)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
with pytest.raises(InvalidRequest, match="Vector indexes are only supported on columns of vectors of floats"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p1, p2), v, f1, f2) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_index_with_supported_and_unsupported_filtering_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
supported_columns = ', '.join([f's{idx} {typ}' for idx, typ in enumerate(supported_filtering_types)])
|
||||
unsupported_columns = ', '.join([f'u{idx} {typ}' for idx, typ in enumerate(unsupported_filtering_types)])
|
||||
schema = f'p int, c int, v vector<float, 3>, {supported_columns}, {unsupported_columns}, primary key (p, c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
for idx in range(len(supported_filtering_types)):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, s{idx}) USING 'vector_index'")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.global_idx")
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, s{idx}) USING 'vector_index'")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.local_idx")
|
||||
for idx in range(len(unsupported_filtering_types)):
|
||||
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, u{idx}) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Unsupported vector index filtering column u{idx} type|Secondary indexes are not supported"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, u{idx}) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_local_index_with_unsupported_partition_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
for filter_type in unsupported_filtering_types:
|
||||
schema = f'p {filter_type}, c int, v vector<float, 3>, f int, primary key (p, c)'
|
||||
with pytest.raises(InvalidRequest, match="Unsupported|Invalid"):
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE CUSTOM INDEX ON {table}((p), v, f) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_index_with_duplicated_columns(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = f'p int, c int, v vector<float, 3>, x int, primary key (p, c)'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, p) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX global_idx ON {table}(v, x, x) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Cannot create secondary index on partition key column p"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, p) USING 'vector_index'")
|
||||
with pytest.raises(InvalidRequest, match=f"Duplicate column x in index target list"):
|
||||
cql.execute(f"CREATE CUSTOM INDEX local_idx ON {table}((p), v, x, x) USING 'vector_index'")
|
||||
|
||||
def test_create_vector_search_index_with_bad_options(cql, test_keyspace, scylla_only, skip_without_tablets):
|
||||
schema = 'p int primary key, v vector<float, 3>'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
|
||||
@@ -1,68 +0,0 @@
|
||||
// Copyright (C) 2026-present ScyllaDB
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "service/tablet_migration_rpc_handler.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "utils/log.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
extern logging::logger rtlogger;
|
||||
|
||||
/// Test implementation of tablet_migration_rpc_handler that calls local RPC handlers
|
||||
/// instead of sending network RPCs. This allows testing the topology coordinator logic
|
||||
/// in unit tests without network communication.
|
||||
///
|
||||
/// Usage in tests:
|
||||
/// auto handler = std::make_unique<local_tablet_rpc_simulator>(storage_service);
|
||||
/// // Pass handler to run_topology_coordinator
|
||||
class local_tablet_rpc_simulator : public tablet_migration_rpc_handler {
|
||||
storage_service& _storage_service;
|
||||
|
||||
public:
|
||||
explicit local_tablet_rpc_simulator(storage_service& ss)
|
||||
: _storage_service(ss) {}
|
||||
|
||||
seastar::future<tablet_operation_repair_result> tablet_repair(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: repair tablet {} on {} ({})", tablet, dst, dst_id);
|
||||
// Call the local RPC handler directly
|
||||
co_return co_await _storage_service.repair_tablet(tablet, sid);
|
||||
}
|
||||
|
||||
seastar::future<> tablet_stream_data(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: stream tablet {} to {} ({})", tablet, dst, dst_id);
|
||||
// Call the local RPC handler directly
|
||||
co_return co_await _storage_service.stream_tablet(tablet);
|
||||
}
|
||||
|
||||
seastar::future<> tablet_cleanup(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: cleanup tablet {} on {} ({})", tablet, dst, dst_id);
|
||||
// Call the local RPC handler directly
|
||||
co_return co_await _storage_service.cleanup_tablet(tablet);
|
||||
}
|
||||
|
||||
seastar::future<> repair_update_compaction_ctrl(
|
||||
locator::host_id dst,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: repair_update_compaction_ctrl for tablet {} on {}", tablet, dst);
|
||||
// For the test simulator, we don't need to actually update compaction controller
|
||||
// as there's no real compaction happening in the tests.
|
||||
// Just log and return success.
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user