Merge 'repair: Speed up ranges calculation when small table optimization is on' from Asias He

repair: Speed up ranges calculation when small table optimization is on

Normally, during bootstrap, in repair_service::bootstrap_with_repair, we
need to calculate which range to sync data from carefully for the new
node. With small table optimization on, we pass a single full range and
all peer nodes to row level repair to sync data with. Now that we only
need to pass a single range and full peers, there is no need to calculate
the ranges and peers in repair_service::bootstrap_with_repair and drop
it later. The calculation takes time which slows down bootstrap, e.g.,

```
Jul 08 22:01:41.927785 cluster-scale-50-200-test-scayle-t-db-node-51209daa-93 scylla[5326]:
[shard 0:strm] repair - bootstrap_with_repair: started with
keyspace=system_distributed_everywhere, nr_ranges=23809

Jul 08 22:01:57.883797 cluster-scale-50-200-test-scayle-t-db-node-51209daa-93 scylla[5326]:
[shard 0:strm] repair - repair[79eac1a1-5d5b-4028-ae1c-06e68bec2d50]:
sync data for keyspace=system_distributed_everywhere, status=started,
reason=bootstrap, small_table_optimization=true
```

The range calculation took 15 seconds for system_distributed_everywhere
table.

To fix, the ranges calculation is skipped if small table optimization is
on for the keyspace.

Before:
cluster    dev   [ PASS ] cluster.test_boot_nodes.1 104.59s

After:
cluster    dev   [ PASS ] cluster.test_boot_nodes.1 89.23s

A 15% improvement to bootstrap 30 node cluster was observed.

Fixes #24817

Closes scylladb/scylladb#24901

* github.com:scylladb/scylladb:
  repair: Speed up ranges calculation when small table optimization is on
  test: Add test_boot_nodes.py
This commit is contained in:
Botond Dénes
2025-07-17 10:23:45 +03:00
2 changed files with 83 additions and 34 deletions

View File

@@ -136,6 +136,26 @@ std::string_view format_as(row_level_diff_detect_algorithm algo) {
return "unknown";
}
bool should_enable_small_table_optimization_for_rbno(const replica::database& db, sstring keyspace, streaming::stream_reason reason) {
bool small_table_optimization = false;
auto enable_small_table_optimization_for_rbno = db.get_config().enable_small_table_optimization_for_rbno();
if (enable_small_table_optimization_for_rbno) {
static const std::unordered_set<sstring> small_table_optimization_enabled_ks = {
"system_distributed",
"system_distributed_everywhere",
"system_replicated_keys",
"system_auth",
"system_traces"
};
if (reason == streaming::stream_reason::bootstrap ||
reason == streaming::stream_reason::rebuild ||
reason == streaming::stream_reason::decommission) {
small_table_optimization = small_table_optimization_enabled_ks.contains(keyspace);
}
}
return small_table_optimization;
}
static size_t get_nr_tables(const replica::database& db, const sstring& keyspace) {
size_t tables = 0;
db.get_tables_metadata().for_each_table_id([&keyspace, &tables] (const std::pair<sstring, sstring>& kscf, table_id) {
@@ -1526,35 +1546,20 @@ future<> repair::data_sync_repair_task_impl::run() {
auto id = get_repair_uniq_id();
auto enable_small_table_optimization_for_rbno = db.get_config().enable_small_table_optimization_for_rbno();
bool small_table_optimization = false;
size_t ranges_reduced_factor = 1;
if (enable_small_table_optimization_for_rbno) {
static const std::unordered_set<sstring> small_table_optimization_enabled_ks = {
"system_distributed",
"system_distributed_everywhere",
"system_replicated_keys",
"system_auth",
"system_traces"
};
if (_reason == streaming::stream_reason::bootstrap ||
_reason == streaming::stream_reason::rebuild ||
_reason == streaming::stream_reason::decommission) {
small_table_optimization = small_table_optimization_enabled_ks.contains(keyspace);
}
if (small_table_optimization) {
auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false));
ranges_reduced_factor = _ranges.size();
_ranges = {range};
std::unordered_set<locator::host_id> nodes;
for (auto& [_, neighbor] : _neighbors) {
for (auto& n : neighbor.all) {
nodes.insert(n);
}
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace, _reason);
if (small_table_optimization) {
auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false));
ranges_reduced_factor = _ranges.size();
_ranges = {range};
std::unordered_set<locator::host_id> nodes;
for (auto& [_, neighbor] : _neighbors) {
for (auto& n : neighbor.all) {
nodes.insert(n);
}
auto nodes_vec = std::vector<locator::host_id>(nodes.begin(), nodes.end());
_neighbors = {{range, repair_neighbors(nodes_vec, nodes_vec)}};
}
auto nodes_vec = std::vector<locator::host_id>(nodes.begin(), nodes.end());
_neighbors = {{range, repair_neighbors(nodes_vec, nodes_vec)}};
}
auto start_time = std::chrono::steady_clock::now();
@@ -1643,10 +1648,15 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
if (!db.has_keyspace(keyspace_name)) {
continue;
}
auto nr_tables = get_nr_tables(db, keyspace_name);
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason);
if (small_table_optimization) {
nr_ranges_total += 1 * nr_tables;
continue;
}
auto& strat = erm->get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get();
seastar::thread::maybe_yield();
auto nr_tables = get_nr_tables(db, keyspace_name);
nr_ranges_total += desired_ranges.size() * nr_tables;
}
container().invoke_on_all([nr_ranges_total] (repair_service& rs) {
@@ -1659,8 +1669,28 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
continue;
}
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason);
dht::token_range_vector desired_ranges;
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
auto nr_tables = get_nr_tables(db, keyspace_name);
if (small_table_optimization) {
try {
auto germs = make_lw_shared(locator::make_global_effective_replication_map(get_db(), keyspace_name).get());
auto nodes = germs->get().get_token_metadata().get_normal_token_owners();
auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false));
auto nodes_vec = std::vector<locator::host_id>(nodes.begin(), nodes.end());
desired_ranges = {range};
range_sources = {{range, repair_neighbors(nodes_vec, nodes_vec)}};
rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}, nr_tables={}, small_table_optimization=true",
keyspace_name, desired_ranges.size() * nr_tables, nr_tables);
} catch (data_dictionary::no_such_keyspace) {
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
continue;
}
} else {
auto& strat = erm->get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get();
desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myid, myloc).get();
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
bool everywhere_topology = strat.get_type() == locator::replication_strategy_type::everywhere_topology;
auto replication_factor = erm->get_replication_factor();
@@ -1675,11 +1705,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
auto pending_range_addresses = strat.get_range_host_ids(metadata_clone).get();
metadata_clone.clear_gently().get();
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
auto nr_tables = get_nr_tables(db, keyspace_name);
rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}", keyspace_name, desired_ranges.size() * nr_tables);
rlogger.info("bootstrap_with_repair: started with keyspace={}, nr_ranges={}, nr_tables={}", keyspace_name, desired_ranges.size() * nr_tables, nr_tables);
for (auto& desired_range : desired_ranges) {
for (auto& x : range_addresses) {
const wrapping_interval<dht::token>& src_range = x.first;
@@ -1797,9 +1823,10 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
}
}
}
}
auto nr_ranges = desired_ranges.size();
sync_data_using_repair(keyspace_name, erm, std::move(desired_ranges), std::move(range_sources), reason, nullptr).get();
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges * nr_tables);
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}, nr_tables={}", keyspace_name, nr_ranges * nr_tables, nr_tables);
}
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", ks_erms | std::views::keys);
});

View File

@@ -0,0 +1,22 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import pytest
import time
import logging
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_boot(manager):
rbno = True
cfg = {'enable_repair_based_node_ops': rbno, 'num_tokens': 256}
nr_nodes = 3
for i in range(nr_nodes):
logger.info(f"booting node {i+1}")
start = time.time()
await manager.server_add(config=cfg)
end = time.time()
logger.info(f"node {i+1} took {end - start}ms to boot")