Compare commits

...

7 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
e69566a36a Address code review feedback
- Fix use-after-free bug in example code
- Update copyright years to 2026
- Add logging to test simulator for better debugging
- Improve documentation comments

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:26:17 +00:00
copilot-swe-agent[bot]
3fc31bc751 Add implementation summary document
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:23:39 +00:00
copilot-swe-agent[bot]
0e439ad03a Add documentation for tablet migration RPC decoupling
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:22:52 +00:00
copilot-swe-agent[bot]
d466213b78 Allow topology_coordinator to accept custom RPC handler for testing
- Add optional tablet_rpc_handler parameter to run_topology_coordinator
- Default to messaging_tablet_rpc_handler if not provided
- Create local_tablet_rpc_simulator for use in unit tests

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:22:05 +00:00
copilot-swe-agent[bot]
1a3e8fbdc7 Refactor topology_coordinator to use tablet_migration_rpc_handler interface
- Implement messaging_tablet_rpc_handler that wraps messaging_service RPCs
- Update handle_tablet_migration() to use the RPC handler interface
- This decouples the coordinator logic from actual RPC sending

Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:20:23 +00:00
copilot-swe-agent[bot]
f9c811e88b Add tablet_migration_rpc_handler interface for decoupling RPCs
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2026-01-30 12:18:19 +00:00
copilot-swe-agent[bot]
601d73ea00 Initial plan 2026-01-30 12:12:55 +00:00
6 changed files with 368 additions and 17 deletions

84
IMPLEMENTATION_SUMMARY.md Normal file
View File

@@ -0,0 +1,84 @@
# Tablet Migration RPC Decoupling - Implementation Summary
## What Was Changed
### Problem Statement
The topology_coordinator's `handle_tablet_migration()` function was tightly coupled with RPC sending, making it difficult to test in unit tests without setting up a full multi-node cluster.
### Solution
Introduced an abstraction layer (`tablet_migration_rpc_handler`) that decouples the coordinator logic from actual RPC operations.
## Files Added
### 1. `service/tablet_migration_rpc_handler.hh`
Interface defining the RPC operations needed for tablet migration:
- `tablet_repair()` - Repair a tablet on a replica
- `tablet_stream_data()` - Stream tablet data to a replica
- `tablet_cleanup()` - Clean up tablet data on a replica
- `repair_update_compaction_ctrl()` - Update compaction controller after repair
### 2. `test/lib/local_tablet_rpc_simulator.hh`
Test implementation that calls local RPC handlers instead of sending network RPCs.
### 3. `docs/dev/tablet-migration-rpc-decoupling.md`
Documentation explaining the architecture and how to use it.
## Files Modified
### 1. `service/topology_coordinator.cc`
- Added `messaging_tablet_rpc_handler` class - production implementation that wraps `messaging_service` RPCs
- Updated `handle_tablet_migration()` to use `_tablet_rpc_handler` interface instead of direct RPC calls
- Updated constructor to accept optional RPC handler and initialize it
- Modified 5 locations where tablet RPCs were called:
- rebuild_repair stage → `tablet_repair()`
- streaming stage → `tablet_stream_data()`
- cleanup stage → `tablet_cleanup()` (2 locations)
- repair stage → `tablet_repair()`
- end_repair stage → `repair_update_compaction_ctrl()`
### 2. `service/topology_coordinator.hh`
- Added forward declaration for `tablet_migration_rpc_handler`
- Updated `run_topology_coordinator()` signature to accept optional `tablet_rpc_handler` parameter
## How It Works
### Production Mode (Default)
```cpp
run_topology_coordinator(...);
// Automatically uses messaging_tablet_rpc_handler
```
The topology coordinator creates a `messaging_tablet_rpc_handler` that sends actual network RPCs.
### Test Mode
```cpp
run_topology_coordinator(..., std::make_unique<local_tablet_rpc_simulator>(storage_service));
```
Tests can provide a `local_tablet_rpc_simulator` that calls local RPC handlers directly, bypassing the network layer.
## Benefits
1. **Testability**: Can now test `handle_tablet_migration()` logic in unit tests
2. **Separation of Concerns**: Coordinator logic (group0 transitions, barriers) is separated from replica logic (RPCs)
3. **Minimal Changes**: The refactoring is minimal and surgical - only abstracts the RPC layer
4. **Backward Compatible**: Existing code continues to work without changes
## What Can Now Be Tested
With this refactoring, tests can now:
1. Verify that tablet migrations progress through the correct stages
2. Test barrier coordination between stages
3. Test rollback logic when migrations fail
4. Test the interaction between migrations and topology changes
5. Simulate various failure scenarios at the RPC level
All without needing multiple nodes or network communication.
## Future Work
This refactoring enables future enhancements:
1. More comprehensive topology coordinator unit tests
2. Fault injection at the RPC level in tests
3. Better simulation of network conditions
4. Potential extraction of more coordinator logic into testable components

View File

@@ -0,0 +1,78 @@
# Tablet Migration RPC Decoupling
## Overview
The topology coordinator's tablet migration logic has been refactored to decouple it from RPC operations. This allows the topology coordinator logic to be tested in unit tests without network communication.
## Architecture
### Components
1. **`tablet_migration_rpc_handler`** (interface): Defines the RPC operations needed for tablet migration
- `tablet_repair()`: Initiates tablet repair on a replica
- `tablet_stream_data()`: Streams tablet data to a replica
- `tablet_cleanup()`: Cleans up tablet data on a replica
- `repair_update_compaction_ctrl()`: Updates compaction controller after repair
2. **`messaging_tablet_rpc_handler`**: Production implementation that sends real network RPCs via `messaging_service`
3. **`local_tablet_rpc_simulator`**: Test implementation that calls RPC handlers locally without network communication
### How It Works
#### Production (Real RPCs)
```cpp
// In storage_service.cc
run_topology_coordinator(
sys_dist_ks, gossiper, messaging, shared_tm,
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
raft_topology_cmd_handler,
tablet_allocator, cdc_gens, ring_delay,
lifecycle_notifier, feature_service,
sl_controller, topology_cmd_rpc_tracker
// tablet_rpc_handler defaults to nullptr, so messaging_tablet_rpc_handler is used
);
```
#### Testing (Local Simulation)
```cpp
// In test code
run_topology_coordinator(
sys_dist_ks, gossiper, messaging, shared_tm,
sys_ks, db, group0, topo_sm, vb_sm, as, raft,
raft_topology_cmd_handler,
tablet_allocator, cdc_gens, ring_delay,
lifecycle_notifier, feature_service,
sl_controller, topology_cmd_rpc_tracker,
std::make_unique<local_tablet_rpc_simulator>(storage_service) // Create unique_ptr directly
);
```
## Benefits
1. **Unit Testability**: The topology coordinator's `handle_tablet_migration()` logic can now be tested in unit tests without setting up a multi-node cluster
2. **Separation of Concerns**: Coordinator-side logic (group0 state transitions, barriers) is clearly separated from replica-side logic (RPC operations)
3. **Flexibility**: Different RPC implementations can be provided for different testing scenarios
## Migration Stages
The tablet migration process goes through multiple stages. Each stage has:
- **Coordinator-side logic**: What group0 changes to make, whether barriers are needed, whether rollback is needed
- **Replica-side logic**: RPC operations to perform on tablet replicas
Example stages:
- `allow_write_both_read_old` → coordinator initiates transition
- `write_both_read_old` → coordinator waits for barrier
- `streaming` → coordinator calls `tablet_stream_data()` RPC
- `write_both_read_new` → coordinator waits for barrier
- `use_new` → coordinator transitions to final state
- `cleanup` → coordinator calls `tablet_cleanup()` RPC
- `end_migration` → coordinator removes transition state
## Future Enhancements
The current refactoring enables future enhancements such as:
1. More comprehensive testing of topology coordinator logic
2. Fault injection at the RPC level in tests
3. Better simulation of network conditions in tests
4. Cleaner separation of concerns in the codebase

View File

@@ -0,0 +1,73 @@
// Copyright (C) 2026-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#pragma once
#include <seastar/core/future.hh>
#include "locator/tablets.hh"
#include "service/tablet_operation.hh"
#include "service/session.hh"
#include "raft/raft.hh"
namespace service {
/// Interface for handling tablet migration RPC operations.
/// This abstraction decouples the topology coordinator from actual RPC sending,
/// allowing unit tests to provide a local implementation that simulates RPC
/// behavior without network communication.
///
/// Each method corresponds to an RPC verb that the topology coordinator sends
/// to tablet replicas during migration stages.
class tablet_migration_rpc_handler {
public:
virtual ~tablet_migration_rpc_handler() = default;
/// Initiates tablet repair on a replica.
/// Called during the rebuild_repair or repair stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to repair
/// @param sid session ID for the repair operation
/// @return repair result containing the repair timestamp
virtual seastar::future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) = 0;
/// Streams tablet data to a replica.
/// Called during the streaming stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to stream
virtual seastar::future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) = 0;
/// Cleans up tablet data on a replica.
/// Called during the cleanup or cleanup_target stage.
///
/// @param dst destination host
/// @param dst_id destination raft server ID
/// @param tablet tablet to clean up
virtual seastar::future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) = 0;
/// Updates compaction controller after repair.
/// Called during the end_repair stage for each tablet replica.
///
/// @param dst destination host
/// @param tablet tablet to update
/// @param sid session ID from the repair operation
virtual seastar::future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) = 0;
};
}

View File

@@ -73,6 +73,7 @@
#include "idl/repair.dist.hh"
#include "service/topology_coordinator.hh"
#include "service/tablet_migration_rpc_handler.hh"
#include <boost/range/join.hpp>
#include <seastar/core/metrics_registration.hh>
@@ -117,6 +118,51 @@ sstring get_application_state_gently(const gms::application_state_map& epmap, gm
// it's versioned_value::value(), not std::optional::value() - it does not throw
return it->second.value();
}
/// Real implementation of tablet_migration_rpc_handler that sends RPCs via messaging_service.
/// Used by the production topology coordinator to perform actual network communication.
class messaging_tablet_rpc_handler : public tablet_migration_rpc_handler {
netw::messaging_service& _messaging;
abort_source& _as;
public:
messaging_tablet_rpc_handler(netw::messaging_service& messaging, abort_source& as)
: _messaging(messaging), _as(as) {}
future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_repair(
&_messaging, dst, _as, dst_id, tablet, sid);
}
future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_stream_data(
&_messaging, dst, _as, dst_id, tablet);
}
future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
co_return co_await ser::storage_service_rpc_verbs::send_tablet_cleanup(
&_messaging, dst, _as, dst_id, tablet);
}
future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) override {
co_return co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(
&_messaging, dst, tablet, sid);
}
};
} // namespace
class topology_coordinator : public endpoint_lifecycle_subscriber
@@ -144,6 +190,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
tablet_allocator& _tablet_allocator;
std::unique_ptr<db::view::view_building_coordinator> _vb_coordinator;
std::unique_ptr<tablet_migration_rpc_handler> _tablet_rpc_handler;
cdc::generation_service& _cdc_gens;
@@ -1603,8 +1650,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating repair phase of tablet rebuild host={} tablet={}", dst, gid);
return do_with(gids, [this, dst, session_id = trinfo.session_id] (const auto& gids) {
return do_for_each(gids, [this, dst, session_id] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id).discard_result();
return _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id).discard_result();
});
});
})) {
@@ -1676,8 +1722,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
auto dst = trinfo.pending_replica->host;
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid);
return _tablet_rpc_handler->tablet_stream_data(dst, raft::server_id(dst.uuid()), gid);
});
});
})) {
@@ -1746,8 +1791,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet cleanup of {} on {}", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
});
}).then([] {
return utils::get_local_injector().inject("wait_after_tablet_cleanup", [] (auto& handler) -> future<> {
@@ -1775,8 +1819,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet cleanup of {} on {} to revert migration", gid, dst);
return do_with(gids, [this, dst] (const auto& gids) {
return do_for_each(gids, [this, dst] (locator::global_tablet_id gid) {
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
dst.host, _as, raft::server_id(dst.host.uuid()), gid);
return _tablet_rpc_handler->tablet_cleanup(dst.host, raft::server_id(dst.host.uuid()), gid);
});
});
})) {
@@ -1868,8 +1911,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
auto session_id = utils::get_local_injector().enter("handle_tablet_migration_repair_random_session") ?
service::session_id::create_random_id() : trinfo->session_id;
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid, session_id);
auto res = co_await _tablet_rpc_handler->tablet_repair(dst, raft::server_id(dst.uuid()), gid, session_id);
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
@@ -1935,7 +1977,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
tablet_state.repair_update_compaction_ctrl_retried++;
}
bool feature = _feature_service.tablet_incremental_repair;
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this, ms = &_messaging,
if (advance_in_background(gid, tablet_state.repair_update_compaction_ctrl, "repair_update_compaction_ctrl", [this,
gid = gid, sid = tablet_state.session_id, feature, &tmap] () -> future<> {
if (feature) {
if (utils::get_local_injector().enter("fail_rpc_repair_update_compaction_ctrl")) {
@@ -1944,9 +1986,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber
throw std::runtime_error(msg);
}
auto& replicas = tmap.get_tablet_info(gid.tablet).replicas;
co_await coroutine::parallel_for_each(replicas, [this, ms, gid, sid] (locator::tablet_replica r) -> future<> {
co_await coroutine::parallel_for_each(replicas, [this, gid, sid] (locator::tablet_replica r) -> future<> {
if (!is_excluded(raft::server_id(r.host.uuid()))) {
co_await ser::repair_rpc_verbs::send_repair_update_compaction_ctrl(ms, r.host, gid, sid);
co_await _tablet_rpc_handler->repair_update_compaction_ctrl(r.host, gid, sid);
}
});
}
@@ -3683,7 +3725,8 @@ public:
gms::feature_service& feature_service,
endpoint_lifecycle_notifier& lifecycle_notifier,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler)
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
, _tablet_load_stats_refresh_interval_in_seconds(db.get_config().tablet_load_stats_refresh_interval_in_seconds)
@@ -3694,6 +3737,7 @@ public:
, _raft_topology_cmd_handler(std::move(raft_topology_cmd_handler))
, _tablet_allocator(tablet_allocator)
, _vb_coordinator(std::make_unique<db::view::view_building_coordinator>(_db, _raft, _group0, _sys_ks, _gossiper, _messaging, _vb_sm, _topo_sm, _term, _as))
, _tablet_rpc_handler(tablet_rpc_handler ? std::move(tablet_rpc_handler) : std::make_unique<messaging_tablet_rpc_handler>(_messaging, _as))
, _cdc_gens(cdc_gens)
, _tablet_load_stats_refresh([this] {
return with_scheduling_group(_db.get_gossip_scheduling_group(), [this] {
@@ -4366,7 +4410,8 @@ future<> run_topology_coordinator(
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker) {
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler) {
topology_coordinator coordinator{
sys_dist_ks, gossiper, messaging, shared_tm,
@@ -4377,7 +4422,8 @@ future<> run_topology_coordinator(
ring_delay,
feature_service, lifecycle_notifier,
sl_controller,
topology_cmd_rpc_tracker};
topology_cmd_rpc_tracker,
std::move(tablet_rpc_handler)};
std::exception_ptr ex;
lifecycle_notifier.register_subscriber(&coordinator);

View File

@@ -58,6 +58,7 @@ namespace service {
class raft_group0;
class tablet_allocator;
class tablet_migration_rpc_handler;
extern logging::logger rtlogger;
@@ -100,7 +101,8 @@ future<> run_topology_coordinator(
endpoint_lifecycle_notifier& lifecycle_notifier,
gms::feature_service& feature_service,
qos::service_level_controller& sl_controller,
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker);
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker,
std::unique_ptr<tablet_migration_rpc_handler> tablet_rpc_handler = nullptr);
class tablet_ops_metrics {
private:

View File

@@ -0,0 +1,68 @@
// Copyright (C) 2026-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#pragma once
#include "service/tablet_migration_rpc_handler.hh"
#include "service/storage_service.hh"
#include "utils/log.hh"
namespace service {
extern logging::logger rtlogger;
/// Test implementation of tablet_migration_rpc_handler that calls local RPC handlers
/// instead of sending network RPCs. This allows testing the topology coordinator logic
/// in unit tests without network communication.
///
/// Usage in tests:
/// auto handler = std::make_unique<local_tablet_rpc_simulator>(storage_service);
/// // Pass handler to run_topology_coordinator
class local_tablet_rpc_simulator : public tablet_migration_rpc_handler {
storage_service& _storage_service;
public:
explicit local_tablet_rpc_simulator(storage_service& ss)
: _storage_service(ss) {}
seastar::future<tablet_operation_repair_result> tablet_repair(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet,
session_id sid) override {
rtlogger.debug("local_tablet_rpc_simulator: repair tablet {} on {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.repair_tablet(tablet, sid);
}
seastar::future<> tablet_stream_data(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
rtlogger.debug("local_tablet_rpc_simulator: stream tablet {} to {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.stream_tablet(tablet);
}
seastar::future<> tablet_cleanup(
locator::host_id dst,
raft::server_id dst_id,
locator::global_tablet_id tablet) override {
rtlogger.debug("local_tablet_rpc_simulator: cleanup tablet {} on {} ({})", tablet, dst, dst_id);
// Call the local RPC handler directly
co_return co_await _storage_service.cleanup_tablet(tablet);
}
seastar::future<> repair_update_compaction_ctrl(
locator::host_id dst,
locator::global_tablet_id tablet,
session_id sid) override {
rtlogger.debug("local_tablet_rpc_simulator: repair_update_compaction_ctrl for tablet {} on {}", tablet, dst);
// For the test simulator, we don't need to actually update compaction controller
// as there's no real compaction happening in the tests.
// Just log and return success.
co_return;
}
};
}