diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 89fac8d8af..bc9627923d 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -769,80 +769,80 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { break; case global_topology_request::keyspace_rf_change: { rtlogger.info("keyspace_rf_change requested"); - sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name; - std::unordered_map saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data; - cql3::statements::ks_prop_defs new_ks_props{std::map{saved_ks_props.begin(), saved_ks_props.end()}}; + sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name; + std::unordered_map saved_ks_props = *_topo_sm._topology.new_keyspace_rf_change_data; + cql3::statements::ks_prop_defs new_ks_props{std::map{saved_ks_props.begin(), saved_ks_props.end()}}; - auto repl_opts = new_ks_props.get_replication_options(); - repl_opts.erase(cql3::statements::ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY); - utils::UUID req_uuid = *_topo_sm._topology.global_request_id; - std::vector updates; - sstring error; - if (_db.has_keyspace(ks_name)) { - auto& ks = _db.find_keyspace(ks_name); - auto tmptr = get_token_metadata_ptr(); - size_t unimportant_init_tablet_count = 2; // must be a power of 2 - locator::tablet_map new_tablet_map{unimportant_init_tablet_count}; + auto repl_opts = new_ks_props.get_replication_options(); + repl_opts.erase(cql3::statements::ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY); + utils::UUID req_uuid = *_topo_sm._topology.global_request_id; + std::vector updates; + sstring error; + if (_db.has_keyspace(ks_name)) { + auto& ks = _db.find_keyspace(ks_name); + auto tmptr = get_token_metadata_ptr(); + size_t unimportant_init_tablet_count = 2; // must be a power of 2 + locator::tablet_map new_tablet_map{unimportant_init_tablet_count}; - auto tables_with_mvs = ks.metadata()->tables(); - auto views = ks.metadata()->views(); - tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end()); - for (const auto& table_or_mv : tables_with_mvs) { - try { - locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table_or_mv->id()); - locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()}; - auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params); - new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, old_tablets); - } catch (const std::exception& e) { - error = e.what(); - rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {}," - "desired new ks opts: {}", error, new_ks_props.get_replication_options()); - updates.clear(); // remove all tablets mutations ... - break; // ... and only create mutations deleting the global req - } - - replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id()); - co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> { - auto last_token = new_tablet_map.get_last_token(tablet_id); - updates.emplace_back(co_await make_canonical_mutation_gently( - replica::tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id()) - .set_new_replicas(last_token, tablet_info.replicas) - .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) - .set_transition(last_token, locator::tablet_transition_kind::rebuild) - .build() - )); - co_await coroutine::maybe_yield(); - }); + auto tables_with_mvs = ks.metadata()->tables(); + auto views = ks.metadata()->views(); + tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end()); + for (const auto& table_or_mv : tables_with_mvs) { + try { + locator::tablet_map old_tablets = tmptr->tablets().get_tablet_map(table_or_mv->id()); + locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()}; + auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params); + new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, old_tablets); + } catch (const std::exception& e) { + error = e.what(); + rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {}," + "desired new ks opts: {}", error, new_ks_props.get_replication_options()); + updates.clear(); // remove all tablets mutations ... + break; // ... and only create mutations deleting the global req } - } else { - error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist"; - } - updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp()) - .set_transition_state(topology::transition_state::tablet_migration) - .set_version(_topo_sm._topology.version + 1) - .del_global_topology_request() - .del_global_topology_request_id() - .build())); - updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_uuid) - .done(error) - .build())); - if (error.empty()) { - const sstring strategy_name = "NetworkTopologyStrategy"; - auto ks_md = keyspace_metadata::new_keyspace(ks_name, strategy_name, repl_opts, - new_ks_props.get_initial_tablets(std::nullopt), - new_ks_props.get_durable_writes(), new_ks_props.get_storage_options()); - auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp()); - for (auto& m: schema_muts) { - updates.emplace_back(m); - } + replica::tablet_mutation_builder tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id()); + co_await new_tablet_map.for_each_tablet([&](locator::tablet_id tablet_id, const locator::tablet_info& tablet_info) -> future<> { + auto last_token = new_tablet_map.get_last_token(tablet_id); + updates.emplace_back(co_await make_canonical_mutation_gently( + replica::tablet_mutation_builder(guard.write_timestamp(), table_or_mv->id()) + .set_new_replicas(last_token, tablet_info.replicas) + .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) + .set_transition(last_token, locator::tablet_transition_kind::rebuild) + .build() + )); + co_await coroutine::maybe_yield(); + }); } + } else { + error = "Can't ALTER keyspace " + ks_name + ", keyspace doesn't exist"; + } - sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props); - rtlogger.trace("do update {} reason {}", updates, reason); - mixed_change change{std::move(updates)}; - group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); - co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); + updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp()) + .set_transition_state(topology::transition_state::tablet_migration) + .set_version(_topo_sm._topology.version + 1) + .del_global_topology_request() + .del_global_topology_request_id() + .build())); + updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_uuid) + .done(error) + .build())); + if (error.empty()) { + const sstring strategy_name = "NetworkTopologyStrategy"; + auto ks_md = keyspace_metadata::new_keyspace(ks_name, strategy_name, repl_opts, + new_ks_props.get_initial_tablets(std::nullopt), + new_ks_props.get_durable_writes(), new_ks_props.get_storage_options()); + auto schema_muts = prepare_keyspace_update_announcement(_db, ks_md, guard.write_timestamp()); + for (auto& m: schema_muts) { + updates.emplace_back(m); + } + } + + sstring reason = seastar::format("ALTER tablets KEYSPACE called with options: {}", saved_ks_props); + rtlogger.trace("do update {} reason {}", updates, reason); + mixed_change change{std::move(updates)}; + group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); + co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); } break; }