storage_service, tablets: Extract generate_migration_updates()

This commit is contained in:
Tomasz Grabiec
2023-07-24 23:52:59 +02:00
parent fbc6076e6a
commit c9ea215ce1

View File

@@ -1236,6 +1236,28 @@ class topology_coordinator {
}
}
void generate_migration_update(std::vector<canonical_mutation>& out, const group0_guard& guard, const tablet_migration_info& mig) {
auto s = _db.find_schema(mig.tablet.table);
auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table);
auto last_token = tmap.get_last_token(mig.tablet.tablet);
if (tmap.get_tablet_transition_info(mig.tablet.tablet)) {
slogger.warn("Tablet already in transition, ignoring migration: {}", mig);
return;
}
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table)
.set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.build());
}
future<> generate_migration_updates(std::vector<canonical_mutation>& out, const group0_guard& guard, const migration_plan& plan) {
for (const tablet_migration_info& mig : plan) {
co_await coroutine::maybe_yield();
generate_migration_update(out, guard, mig);
}
}
future<> handle_tablet_migration(group0_guard guard) {
// This step acts like a pump which advances state machines of individual tablets,
// batching barriers and group0 updates.
@@ -1807,21 +1829,7 @@ future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard gua
std::vector<canonical_mutation> updates;
for (const tablet_migration_info& mig : plan) {
co_await coroutine::maybe_yield();
auto s = _db.find_schema(mig.tablet.table);
auto& tmap = tm->tablets().get_tablet_map(mig.tablet.table);
auto last_token = tmap.get_last_token(mig.tablet.tablet);
if (tmap.get_tablet_transition_info(mig.tablet.tablet)) {
slogger.warn("Tablet {} is already in transition, ignoring migration", mig.tablet);
continue;
}
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table)
.set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.build());
}
co_await generate_migration_updates(updates, guard, plan);
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())