diff --git a/service/storage_service.cc b/service/storage_service.cc index f17e7c4d42..dda8f212f1 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1236,6 +1236,28 @@ class topology_coordinator { } } + void generate_migration_update(std::vector& out, const group0_guard& guard, const tablet_migration_info& mig) { + auto s = _db.find_schema(mig.tablet.table); + auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table); + auto last_token = tmap.get_last_token(mig.tablet.tablet); + if (tmap.get_tablet_transition_info(mig.tablet.tablet)) { + slogger.warn("Tablet already in transition, ignoring migration: {}", mig); + return; + } + out.emplace_back( + replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table) + .set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst)) + .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) + .build()); + } + + future<> generate_migration_updates(std::vector& out, const group0_guard& guard, const migration_plan& plan) { + for (const tablet_migration_info& mig : plan) { + co_await coroutine::maybe_yield(); + generate_migration_update(out, guard, mig); + } + } + future<> handle_tablet_migration(group0_guard guard) { // This step acts like a pump which advances state machines of individual tablets, // batching barriers and group0 updates. @@ -1807,21 +1829,7 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua std::vector updates; - for (const tablet_migration_info& mig : plan) { - co_await coroutine::maybe_yield(); - auto s = _db.find_schema(mig.tablet.table); - auto& tmap = tm->tablets().get_tablet_map(mig.tablet.table); - auto last_token = tmap.get_last_token(mig.tablet.tablet); - if (tmap.get_tablet_transition_info(mig.tablet.tablet)) { - slogger.warn("Tablet {} is already in transition, ignoring migration", mig.tablet); - continue; - } - updates.emplace_back( - replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table) - .set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst)) - .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) - .build()); - } + co_await generate_migration_updates(updates, guard, plan); updates.emplace_back( topology_mutation_builder(guard.write_timestamp())