diff --git a/repair/repair.cc b/repair/repair.cc index af1381ba97..f9d610b6a8 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -136,6 +136,26 @@ std::string_view format_as(row_level_diff_detect_algorithm algo) { return "unknown"; } +bool should_enable_small_table_optimization_for_rbno(const replica::database& db, sstring keyspace, streaming::stream_reason reason) { + bool small_table_optimization = false; + auto enable_small_table_optimization_for_rbno = db.get_config().enable_small_table_optimization_for_rbno(); + if (enable_small_table_optimization_for_rbno) { + static const std::unordered_set small_table_optimization_enabled_ks = { + "system_distributed", + "system_distributed_everywhere", + "system_replicated_keys", + "system_auth", + "system_traces" + }; + if (reason == streaming::stream_reason::bootstrap || + reason == streaming::stream_reason::rebuild || + reason == streaming::stream_reason::decommission) { + small_table_optimization = small_table_optimization_enabled_ks.contains(keyspace); + } + } + return small_table_optimization; +} + static size_t get_nr_tables(const replica::database& db, const sstring& keyspace) { size_t tables = 0; db.get_tables_metadata().for_each_table_id([&keyspace, &tables] (const std::pair& kscf, table_id) { @@ -1526,35 +1546,20 @@ future<> repair::data_sync_repair_task_impl::run() { auto id = get_repair_uniq_id(); - auto enable_small_table_optimization_for_rbno = db.get_config().enable_small_table_optimization_for_rbno(); - bool small_table_optimization = false; size_t ranges_reduced_factor = 1; - if (enable_small_table_optimization_for_rbno) { - static const std::unordered_set small_table_optimization_enabled_ks = { - "system_distributed", - "system_distributed_everywhere", - "system_replicated_keys", - "system_auth", - "system_traces" - }; - if (_reason == streaming::stream_reason::bootstrap || - _reason == streaming::stream_reason::rebuild || - _reason == streaming::stream_reason::decommission) { - small_table_optimization = small_table_optimization_enabled_ks.contains(keyspace); - } - if (small_table_optimization) { - auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false)); - ranges_reduced_factor = _ranges.size(); - _ranges = {range}; - std::unordered_set nodes; - for (auto& [_, neighbor] : _neighbors) { - for (auto& n : neighbor.all) { - nodes.insert(n); - } + bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace, _reason); + if (small_table_optimization) { + auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false)); + ranges_reduced_factor = _ranges.size(); + _ranges = {range}; + std::unordered_set nodes; + for (auto& [_, neighbor] : _neighbors) { + for (auto& n : neighbor.all) { + nodes.insert(n); } - auto nodes_vec = std::vector(nodes.begin(), nodes.end()); - _neighbors = {{range, repair_neighbors(nodes_vec, nodes_vec)}}; } + auto nodes_vec = std::vector(nodes.begin(), nodes.end()); + _neighbors = {{range, repair_neighbors(nodes_vec, nodes_vec)}}; } auto start_time = std::chrono::steady_clock::now(); @@ -1643,10 +1648,15 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr if (!db.has_keyspace(keyspace_name)) { continue; } + auto nr_tables = get_nr_tables(db, keyspace_name); + bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason); + if (small_table_optimization) { + nr_ranges_total += 1 * nr_tables; + continue; + } auto& strat = erm->get_replication_strategy(); dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get(); seastar::thread::maybe_yield(); - auto nr_tables = get_nr_tables(db, keyspace_name); nr_ranges_total += desired_ranges.size() * nr_tables; } container().invoke_on_all([nr_ranges_total] (repair_service& rs) { @@ -1659,8 +1669,28 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name); continue; } + bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason); + dht::token_range_vector desired_ranges; + //Collects the source that will have its range moved to the new node + std::unordered_map range_sources; + auto nr_tables = get_nr_tables(db, keyspace_name); + if (small_table_optimization) { + try { + auto germs = make_lw_shared(locator::make_global_effective_replication_map(get_db(), keyspace_name).get()); + auto nodes = germs->get().get_token_metadata().get_normal_token_owners(); + auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false)); + auto nodes_vec = std::vector(nodes.begin(), nodes.end()); + desired_ranges = {range}; + range_sources = {{range, repair_neighbors(nodes_vec, nodes_vec)}}; + rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}, nr_tables={}, small_table_optimization=true", + keyspace_name, desired_ranges.size() * nr_tables, nr_tables); + } catch (data_dictionary::no_such_keyspace) { + rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name); + continue; + } + } else { auto& strat = erm->get_replication_strategy(); - dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get(); + desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get(); bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology; bool everywhere_topology = strat.get_type() == locator::replication_strategy_type::everywhere_topology; auto replication_factor = erm->get_replication_factor(); @@ -1675,11 +1705,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr auto pending_range_addresses = strat.get_range_host_ids(metadata_clone).get(); metadata_clone.clear_gently().get(); - //Collects the source that will have its range moved to the new node - std::unordered_map range_sources; - - auto nr_tables = get_nr_tables(db, keyspace_name); - rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size() * nr_tables); + rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}, nr_tables={}", keyspace_name, desired_ranges.size() * nr_tables, nr_tables); for (auto& desired_range : desired_ranges) { for (auto& x : range_addresses) { const wrapping_interval& src_range = x.first; @@ -1797,9 +1823,10 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr } } } + } auto nr_ranges = desired_ranges.size(); sync_data_using_repair(keyspace_name, erm, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get(); - rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges * nr_tables); + rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}, nr_tables={}", keyspace_name, nr_ranges * nr_tables, nr_tables); } rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | std::views::keys); }); diff --git a/test/cluster/test_boot_nodes.py b/test/cluster/test_boot_nodes.py new file mode 100644 index 0000000000..dc92e77e6c --- /dev/null +++ b/test/cluster/test_boot_nodes.py @@ -0,0 +1,22 @@ +# +# Copyright (C) 2025-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + +import pytest +import time +import logging +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +async def test_boot(manager): + rbno = True + cfg = {'enable_repair_based_node_ops': rbno, 'num_tokens': 256} + nr_nodes = 3 + for i in range(nr_nodes): + logger.info(f"booting node {i+1}") + start = time.time() + await manager.server_add(config=cfg) + end = time.time() + logger.info(f"node {i+1} took {end - start}ms to boot")