Compare commits
7 Commits
debug_form
...
copilot/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e69566a36a | ||
|
|
3fc31bc751 | ||
|
|
0e439ad03a | ||
|
|
d466213b78 | ||
|
|
1a3e8fbdc7 | ||
|
|
f9c811e88b | ||
|
|
601d73ea00 |
84
IMPLEMENTATION_SUMMARY.md
Normal file
84
IMPLEMENTATION_SUMMARY.md
Normal file
@@ -0,0 +1,84 @@
|
||||
# Tablet Migration RPC Decoupling - Implementation Summary
|
||||
|
||||
## What Was Changed
|
||||
|
||||
### Problem Statement
|
||||
The topology_coordinator's `handle_tablet_migration()` function was tightly coupled with RPC sending, making it difficult to test in unit tests without setting up a full multi-node cluster.
|
||||
|
||||
### Solution
|
||||
Introduced an abstraction layer (`tablet_migration_rpc_handler`) that decouples the coordinator logic from actual RPC operations.
|
||||
|
||||
## Files Added
|
||||
|
||||
### 1. `service/tablet_migration_rpc_handler.hh`
|
||||
Interface defining the RPC operations needed for tablet migration:
|
||||
- `tablet_repair()` - Repair a tablet on a replica
|
||||
- `tablet_stream_data()` - Stream tablet data to a replica
|
||||
- `tablet_cleanup()` - Clean up tablet data on a replica
|
||||
- `repair_update_compaction_ctrl()` - Update compaction controller after repair
|
||||
|
||||
### 2. `test/lib/local_tablet_rpc_simulator.hh`
|
||||
Test implementation that calls local RPC handlers instead of sending network RPCs.
|
||||
|
||||
### 3. `docs/dev/tablet-migration-rpc-decoupling.md`
|
||||
Documentation explaining the architecture and how to use it.
|
||||
|
||||
## Files Modified
|
||||
|
||||
### 1. `service/topology_coordinator.cc`
|
||||
- Added `messaging_tablet_rpc_handler` class - production implementation that wraps `messaging_service` RPCs
|
||||
- Updated `handle_tablet_migration()` to use `_tablet_rpc_handler` interface instead of direct RPC calls
|
||||
- Updated constructor to accept optional RPC handler and initialize it
|
||||
- Modified 5 locations where tablet RPCs were called:
|
||||
- rebuild_repair stage → `tablet_repair()`
|
||||
- streaming stage → `tablet_stream_data()`
|
||||
- cleanup stage → `tablet_cleanup()` (2 locations)
|
||||
- repair stage → `tablet_repair()`
|
||||
- end_repair stage → `repair_update_compaction_ctrl()`
|
||||
|
||||
### 2. `service/topology_coordinator.hh`
|
||||
- Added forward declaration for `tablet_migration_rpc_handler`
|
||||
- Updated `run_topology_coordinator()` signature to accept optional `tablet_rpc_handler` parameter
|
||||
|
||||
## How It Works
|
||||
|
||||
### Production Mode (Default)
|
||||
```cpp
|
||||
run_topology_coordinator(...);
|
||||
// Automatically uses messaging_tablet_rpc_handler
|
||||
```
|
||||
|
||||
The topology coordinator creates a `messaging_tablet_rpc_handler` that sends actual network RPCs.
|
||||
|
||||
### Test Mode
|
||||
```cpp
|
||||
run_topology_coordinator(..., std::make_unique<local_tablet_rpc_simulator>(storage_service));
|
||||
```
|
||||
|
||||
Tests can provide a `local_tablet_rpc_simulator` that calls local RPC handlers directly, bypassing the network layer.
|
||||
|
||||
## Benefits
|
||||
|
||||
1. **Testability**: Can now test `handle_tablet_migration()` logic in unit tests
|
||||
2. **Separation of Concerns**: Coordinator logic (group0 transitions, barriers) is separated from replica logic (RPCs)
|
||||
3. **Minimal Changes**: The refactoring is minimal and surgical - only abstracts the RPC layer
|
||||
4. **Backward Compatible**: Existing code continues to work without changes
|
||||
|
||||
## What Can Now Be Tested
|
||||
|
||||
With this refactoring, tests can now:
|
||||
1. Verify that tablet migrations progress through the correct stages
|
||||
2. Test barrier coordination between stages
|
||||
3. Test rollback logic when migrations fail
|
||||
4. Test the interaction between migrations and topology changes
|
||||
5. Simulate various failure scenarios at the RPC level
|
||||
|
||||
All without needing multiple nodes or network communication.
|
||||
|
||||
## Future Work
|
||||
|
||||
This refactoring enables future enhancements:
|
||||
1. More comprehensive topology coordinator unit tests
|
||||
2. Fault injection at the RPC level in tests
|
||||
3. Better simulation of network conditions
|
||||
4. Potential extraction of more coordinator logic into testable components
|
||||
78
docs/dev/tablet-migration-rpc-decoupling.md
Normal file
78
docs/dev/tablet-migration-rpc-decoupling.md
Normal file
@@ -0,0 +1,78 @@
|
||||
# Tablet Migration RPC Decoupling
|
||||
|
||||
## Overview
|
||||
|
||||
The topology coordinator's tablet migration logic has been refactored to decouple it from RPC operations. This allows the topology coordinator logic to be tested in unit tests without network communication.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Components
|
||||
|
||||
1. **`tablet_migration_rpc_handler`** (interface): Defines the RPC operations needed for tablet migration
|
||||
- `tablet_repair()`: Initiates tablet repair on a replica
|
||||
- `tablet_stream_data()`: Streams tablet data to a replica
|
||||
- `tablet_cleanup()`: Cleans up tablet data on a replica
|
||||
- `repair_update_compaction_ctrl()`: Updates compaction controller after repair
|
||||
|
||||
2. **`messaging_tablet_rpc_handler`**: Production implementation that sends real network RPCs via `messaging_service`
|
||||
|
||||
3. **`local_tablet_rpc_simulator`**: Test implementation that calls RPC handlers locally without network communication
|
||||
|
||||
### How It Works
|
||||
|
||||
#### Production (Real RPCs)
|
||||
```cpp
|
||||
// In storage_service.cc
|
||||
run_topology_coordinator(
|
||||
sys_dist_ks, gossiper, messaging, shared_tm,
|
||||
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
|
||||
raft_topology_cmd_handler,
|
||||
tablet_allocator, cdc_gens, ring_delay,
|
||||
lifecycle_notifier, feature_service,
|
||||
sl_controller, topology_cmd_rpc_tracker
|
||||
// tablet_rpc_handler defaults to nullptr, so messaging_tablet_rpc_handler is used
|
||||
);
|
||||
```
|
||||
|
||||
#### Testing (Local Simulation)
|
||||
```cpp
|
||||
// In test code
|
||||
run_topology_coordinator(
|
||||
sys_dist_ks, gossiper, messaging, shared_tm,
|
||||
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
|
||||
raft_topology_cmd_handler,
|
||||
tablet_allocator, cdc_gens, ring_delay,
|
||||
lifecycle_notifier, feature_service,
|
||||
sl_controller, topology_cmd_rpc_tracker,
|
||||
std::make_unique<local_tablet_rpc_simulator>(storage_service) // Create unique_ptr directly
|
||||
);
|
||||
```
|
||||
|
||||
## Benefits
|
||||
|
||||
1. **Unit Testability**: The topology coordinator's `handle_tablet_migration()` logic can now be tested in unit tests without setting up a multi-node cluster
|
||||
2. **Separation of Concerns**: Coordinator-side logic (group0 state transitions, barriers) is clearly separated from replica-side logic (RPC operations)
|
||||
3. **Flexibility**: Different RPC implementations can be provided for different testing scenarios
|
||||
|
||||
## Migration Stages
|
||||
|
||||
The tablet migration process goes through multiple stages. Each stage has:
|
||||
- **Coordinator-side logic**: What group0 changes to make, whether barriers are needed, whether rollback is needed
|
||||
- **Replica-side logic**: RPC operations to perform on tablet replicas
|
||||
|
||||
Example stages:
|
||||
- `allow_write_both_read_old` → coordinator initiates transition
|
||||
- `write_both_read_old` → coordinator waits for barrier
|
||||
- `streaming` → coordinator calls `tablet_stream_data()` RPC
|
||||
- `write_both_read_new` → coordinator waits for barrier
|
||||
- `use_new` → coordinator transitions to final state
|
||||
- `cleanup` → coordinator calls `tablet_cleanup()` RPC
|
||||
- `end_migration` → coordinator removes transition state
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
The current refactoring enables future enhancements such as:
|
||||
1. More comprehensive testing of topology coordinator logic
|
||||
2. Fault injection at the RPC level in tests
|
||||
3. Better simulation of network conditions in tests
|
||||
4. Cleaner separation of concerns in the codebase
|
||||
73
service/tablet_migration_rpc_handler.hh
Normal file
73
service/tablet_migration_rpc_handler.hh
Normal file
@@ -0,0 +1,73 @@
|
||||
// Copyright (C) 2026-present ScyllaDB
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include "locator/tablets.hh"
|
||||
#include "service/tablet_operation.hh"
|
||||
#include "service/session.hh"
|
||||
#include "raft/raft.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
/// Interface for handling tablet migration RPC operations.
|
||||
/// This abstraction decouples the topology coordinator from actual RPC sending,
|
||||
/// allowing unit tests to provide a local implementation that simulates RPC
|
||||
/// behavior without network communication.
|
||||
///
|
||||
/// Each method corresponds to an RPC verb that the topology coordinator sends
|
||||
/// to tablet replicas during migration stages.
|
||||
class tablet_migration_rpc_handler {
|
||||
public:
|
||||
virtual ~tablet_migration_rpc_handler() = default;
|
||||
|
||||
/// Initiates tablet repair on a replica.
|
||||
/// Called during the rebuild_repair or repair stage.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param dst_id destination raft server ID
|
||||
/// @param tablet tablet to repair
|
||||
/// @param sid session ID for the repair operation
|
||||
/// @return repair result containing the repair timestamp
|
||||
virtual seastar::future<tablet_operation_repair_result> tablet_repair(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) = 0;
|
||||
|
||||
/// Streams tablet data to a replica.
|
||||
/// Called during the streaming stage.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param dst_id destination raft server ID
|
||||
/// @param tablet tablet to stream
|
||||
virtual seastar::future<> tablet_stream_data(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) = 0;
|
||||
|
||||
/// Cleans up tablet data on a replica.
|
||||
/// Called during the cleanup or cleanup_target stage.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param dst_id destination raft server ID
|
||||
/// @param tablet tablet to clean up
|
||||
virtual seastar::future<> tablet_cleanup(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) = 0;
|
||||
|
||||
/// Updates compaction controller after repair.
|
||||
/// Called during the end_repair stage for each tablet replica.
|
||||
///
|
||||
/// @param dst destination host
|
||||
/// @param tablet tablet to update
|
||||
/// @param sid session ID from the repair operation
|
||||
virtual seastar::future<> repair_update_compaction_ctrl(
|
||||
locator::host_id dst,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -73,6 +73,7 @@
|
||||
#include "idl/repair.dist.hh"
|
||||
|
||||
#include "service/topology_coordinator.hh"
|
||||
#include "service/tablet_migration_rpc_handler.hh"
|
||||
|
||||
#include <boost/range/join.hpp>
|
||||
#include <seastar/core/metrics_registration.hh>
|
||||
@@ -117,6 +118,51 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm
|
||||
// it's versioned_value::value(), not std::optional::value() - it does not throw
|
||||
return it->second.value();
|
||||
}
|
||||
|
||||
/// Real implementation of tablet_migration_rpc_handler that sends RPCs via messaging_service.
|
||||
/// Used by the production topology coordinator to perform actual network communication.
|
||||
class messaging_tablet_rpc_handler : public tablet_migration_rpc_handler {
|
||||
netw::messaging_service& _messaging;
|
||||
abort_source& _as;
|
||||
|
||||
public:
|
||||
messaging_tablet_rpc_handler(netw::messaging_service& messaging, abort_source& as)
|
||||
: _messaging(messaging), _as(as) {}
|
||||
|
||||
future<tablet_operation_repair_result> tablet_repair(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
co_return co_await ser::storage_service_rpc_verbs::send_tablet_repair(
|
||||
&_messaging, dst, _as, dst_id, tablet, sid);
|
||||
}
|
||||
|
||||
future<> tablet_stream_data(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
co_return co_await ser::storage_service_rpc_verbs::send_tablet_stream_data(
|
||||
&_messaging, dst, _as, dst_id, tablet);
|
||||
}
|
||||
|
||||
future<> tablet_cleanup(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
co_return co_await ser::storage_service_rpc_verbs::send_tablet_cleanup(
|
||||
&_messaging, dst, _as, dst_id, tablet);
|
||||
}
|
||||
|
||||
future<> repair_update_compaction_ctrl(
|
||||
locator::host_id dst,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
co_return co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(
|
||||
&_messaging, dst, tablet, sid);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
@@ -144,6 +190,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
|
||||
tablet_allocator& _tablet_allocator;
|
||||
std::unique_ptr<db::view::view_building_coordinator> _vb_coordinator;
|
||||
std::unique_ptr<tablet_migration_rpc_handler> _tablet_rpc_handler;
|
||||
|
||||
cdc::generation_service& _cdc_gens;
|
||||
|
||||
@@ -1603,8 +1650,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
|
||||
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
|
||||
return ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
|
||||
dst, _as, raft::server_id(dst.uuid()), gid, session_id).discard_result();
|
||||
return _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id).discard_result();
|
||||
});
|
||||
});
|
||||
})) {
|
||||
@@ -1676,8 +1722,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
auto dst = trinfo.pending_replica->host;
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
|
||||
dst, _as, raft::server_id(dst.uuid()), gid);
|
||||
return _tablet_rpc_handler->tablet_stream_data(dst, raft::server_id(dst.uuid()), gid);
|
||||
});
|
||||
});
|
||||
})) {
|
||||
@@ -1746,8 +1791,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst);
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
|
||||
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
|
||||
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
|
||||
});
|
||||
}).then([] {
|
||||
return utils::get_local_injector().inject("wait_after_tablet_cleanup", [] (auto& handler) -> future<> {
|
||||
@@ -1775,8 +1819,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
|
||||
return do_with(gids, [this, dst] (const auto& gids) {
|
||||
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
|
||||
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
|
||||
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
|
||||
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
|
||||
});
|
||||
});
|
||||
})) {
|
||||
@@ -1868,8 +1911,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
|
||||
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
|
||||
service::session_id::create_random_id() : trinfo->session_id;
|
||||
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
|
||||
dst, _as, raft::server_id(dst.uuid()), gid, session_id);
|
||||
auto res = co_await _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id);
|
||||
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
|
||||
auto& tablet_state = _tablets[tablet];
|
||||
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
|
||||
@@ -1935,7 +1977,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
tablet_state.repair_update_compaction_ctrl_retried++;
|
||||
}
|
||||
bool feature = _feature_service.tablet_incremental_repair;
|
||||
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this, ms = &_messaging,
|
||||
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this,
|
||||
gid = gid, sid = tablet_state.session_id, feature, &tmap] () -> future<> {
|
||||
if (feature) {
|
||||
if (utils::get_local_injector().enter("fail_rpc_repair_update_compaction_ctrl")) {
|
||||
@@ -1944,9 +1986,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
auto& replicas = tmap.get_tablet_info(gid.tablet).replicas;
|
||||
co_await coroutine::parallel_for_each(replicas, [this, ms, gid, sid] (locator::tablet_replica r) -> future<> {
|
||||
co_await coroutine::parallel_for_each(replicas, [this, gid, sid] (locator::tablet_replica r) -> future<> {
|
||||
if (!is_excluded(raft::server_id(r.host.uuid()))) {
|
||||
co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(ms, r.host, gid, sid);
|
||||
co_await _tablet_rpc_handler->repair_update_compaction_ctrl(r.host, gid, sid);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -3683,7 +3725,8 @@ public:
|
||||
gms::feature_service& feature_service,
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
qos::service_level_controller& sl_controller,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
|
||||
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler)
|
||||
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
|
||||
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
|
||||
, _tablet_load_stats_refresh_interval_in_seconds(db.get_config().tablet_load_stats_refresh_interval_in_seconds)
|
||||
@@ -3694,6 +3737,7 @@ public:
|
||||
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
|
||||
, _tablet_allocator(tablet_allocator)
|
||||
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
|
||||
, _tablet_rpc_handler(tablet_rpc_handler ? std::move(tablet_rpc_handler) : std::make_unique<messaging_tablet_rpc_handler>(_messaging, _as))
|
||||
, _cdc_gens(cdc_gens)
|
||||
, _tablet_load_stats_refresh([this] {
|
||||
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
|
||||
@@ -4366,7 +4410,8 @@ future<> run_topology_coordinator(
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
gms::feature_service& feature_service,
|
||||
qos::service_level_controller& sl_controller,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
|
||||
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler) {
|
||||
|
||||
topology_coordinator coordinator{
|
||||
sys_dist_ks, gossiper, messaging, shared_tm,
|
||||
@@ -4377,7 +4422,8 @@ future<> run_topology_coordinator(
|
||||
ring_delay,
|
||||
feature_service, lifecycle_notifier,
|
||||
sl_controller,
|
||||
topology_cmd_rpc_tracker};
|
||||
topology_cmd_rpc_tracker,
|
||||
std::move(tablet_rpc_handler)};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
|
||||
@@ -58,6 +58,7 @@ namespace service {
|
||||
|
||||
class raft_group0;
|
||||
class tablet_allocator;
|
||||
class tablet_migration_rpc_handler;
|
||||
|
||||
extern logging::logger rtlogger;
|
||||
|
||||
@@ -100,7 +101,8 @@ future<> run_topology_coordinator(
|
||||
endpoint_lifecycle_notifier& lifecycle_notifier,
|
||||
gms::feature_service& feature_service,
|
||||
qos::service_level_controller& sl_controller,
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker);
|
||||
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
|
||||
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler = nullptr);
|
||||
|
||||
class tablet_ops_metrics {
|
||||
private:
|
||||
|
||||
68
test/lib/local_tablet_rpc_simulator.hh
Normal file
68
test/lib/local_tablet_rpc_simulator.hh
Normal file
@@ -0,0 +1,68 @@
|
||||
// Copyright (C) 2026-present ScyllaDB
|
||||
// SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "service/tablet_migration_rpc_handler.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "utils/log.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
extern logging::logger rtlogger;
|
||||
|
||||
/// Test implementation of tablet_migration_rpc_handler that calls local RPC handlers
|
||||
/// instead of sending network RPCs. This allows testing the topology coordinator logic
|
||||
/// in unit tests without network communication.
|
||||
///
|
||||
/// Usage in tests:
|
||||
/// auto handler = std::make_unique<local_tablet_rpc_simulator>(storage_service);
|
||||
/// // Pass handler to run_topology_coordinator
|
||||
class local_tablet_rpc_simulator : public tablet_migration_rpc_handler {
|
||||
storage_service& _storage_service;
|
||||
|
||||
public:
|
||||
explicit local_tablet_rpc_simulator(storage_service& ss)
|
||||
: _storage_service(ss) {}
|
||||
|
||||
seastar::future<tablet_operation_repair_result> tablet_repair(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: repair tablet {} on {} ({})", tablet, dst, dst_id);
|
||||
// Call the local RPC handler directly
|
||||
co_return co_await _storage_service.repair_tablet(tablet, sid);
|
||||
}
|
||||
|
||||
seastar::future<> tablet_stream_data(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: stream tablet {} to {} ({})", tablet, dst, dst_id);
|
||||
// Call the local RPC handler directly
|
||||
co_return co_await _storage_service.stream_tablet(tablet);
|
||||
}
|
||||
|
||||
seastar::future<> tablet_cleanup(
|
||||
locator::host_id dst,
|
||||
raft::server_id dst_id,
|
||||
locator::global_tablet_id tablet) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: cleanup tablet {} on {} ({})", tablet, dst, dst_id);
|
||||
// Call the local RPC handler directly
|
||||
co_return co_await _storage_service.cleanup_tablet(tablet);
|
||||
}
|
||||
|
||||
seastar::future<> repair_update_compaction_ctrl(
|
||||
locator::host_id dst,
|
||||
locator::global_tablet_id tablet,
|
||||
session_id sid) override {
|
||||
rtlogger.debug("local_tablet_rpc_simulator: repair_update_compaction_ctrl for tablet {} on {}", tablet, dst);
|
||||
// For the test simulator, we don't need to actually update compaction controller
|
||||
// as there's no real compaction happening in the tests.
|
||||
// Just log and return success.
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user