Merge 'Introduce repair_service::config and cut dependency from db::config' from Pavel Emelyanov

Spreading db::config around and making all services depend on it is not nice. Most other service that need configuration provide their own config that's populated from db::config in main.cc/cql_test_env.cc and use it, not the global config.

This PR does the same for repair_service.

Enhancing components dependencies, not backporting

Closes scylladb/scylladb#29153

* github.com:scylladb/scylladb:
  repair: Remove db/config.hh from repair/*.cc files
  repair: Move repair_multishard_reader options onto repair_service::config
  repair: Move critical_disk_utilization_level onto repair_service::config
  repair: Move repair_partition_count_estimation_ratio onto repair_service::config
  repair: Move repair_hints_batchlog_flush_cache_time_in_ms onto repair_service::config
  repair: Move enable_small_table_optimization_for_rbno onto repair_service::config
  repair: Introduce repair_service::config
This commit is contained in:
Botond Dénes
2026-04-09 11:44:25 +03:00
6 changed files with 65 additions and 25 deletions

12
main.cc
View File

@@ -1807,7 +1807,17 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
checkpoint(stop_signal, "starting repair service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
auto repair_config = sharded_parameter([&] {
return repair_service::config{
.enable_small_table_optimization_for_rbno = cfg->enable_small_table_optimization_for_rbno,
.repair_hints_batchlog_flush_cache_time_in_ms = cfg->repair_hints_batchlog_flush_cache_time_in_ms,
.repair_partition_count_estimation_ratio = cfg->repair_partition_count_estimation_ratio,
.critical_disk_utilization_level = cfg->critical_disk_utilization_level,
.repair_multishard_reader_buffer_hint_size = cfg->repair_multishard_reader_buffer_hint_size,
.repair_multishard_reader_enable_read_ahead = cfg->repair_multishard_reader_enable_read_ahead,
};
});
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair, std::move(repair_config)).get();
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
repair.stop().get();
});

View File

@@ -46,7 +46,9 @@ private:
const dht::sharder& remote_sharder,
unsigned remote_shard,
gc_clock::time_point compaction_time,
incremental_repair_meta inc);
incremental_repair_meta inc,
uint64_t multishard_reader_buffer_hint_size,
bool multishard_reader_enable_read_ahead);
public:
repair_reader(
@@ -60,7 +62,9 @@ public:
uint64_t seed,
read_strategy strategy,
gc_clock::time_point compaction_time,
incremental_repair_meta inc);
incremental_repair_meta inc,
uint64_t multishard_reader_buffer_hint_size,
bool multishard_reader_enable_read_ahead);
future<mutation_fragment_opt>
read_mutation_fragment();

View File

@@ -6,7 +6,6 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/config.hh"
#include "repair.hh"
#include "gms/gossip_address_map.hh"
#include "locator/abstract_replication_strategy.hh"
@@ -137,9 +136,8 @@ std::string_view format_as(row_level_diff_detect_algorithm algo) {
return "unknown";
}
bool should_enable_small_table_optimization_for_rbno(const replica::database& db, sstring keyspace, streaming::stream_reason reason) {
bool should_enable_small_table_optimization_for_rbno(bool enable_small_table_optimization_for_rbno, sstring keyspace, streaming::stream_reason reason) {
bool small_table_optimization = false;
auto enable_small_table_optimization_for_rbno = db.get_config().enable_small_table_optimization_for_rbno();
if (enable_small_table_optimization_for_rbno) {
static const std::unordered_set<sstring> small_table_optimization_enabled_ks = {
"system_distributed",
@@ -1507,7 +1505,7 @@ future<> repair::data_sync_repair_task_impl::run() {
auto id = get_repair_uniq_id();
size_t ranges_reduced_factor = 1;
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace, _reason);
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(rs.get_config().enable_small_table_optimization_for_rbno(), keyspace, _reason);
if (small_table_optimization) {
auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false));
ranges_reduced_factor = _ranges.size();
@@ -1601,7 +1599,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
continue;
}
auto nr_tables = get_nr_tables(db, keyspace_name);
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason);
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(_config.enable_small_table_optimization_for_rbno(), keyspace_name, reason);
if (small_table_optimization) {
nr_ranges_total += 1 * nr_tables;
continue;
@@ -1621,7 +1619,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
continue;
}
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason);
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(_config.enable_small_table_optimization_for_rbno(), keyspace_name, reason);
dht::token_range_vector desired_ranges;
//Collects the source that will have its range moved to the new node
std::unordered_map<dht::token_range, repair_neighbors> range_sources;

View File

@@ -47,7 +47,6 @@
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/as_future.hh>
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
#include "db/batchlog_manager.hh"
@@ -287,7 +286,9 @@ mutation_reader repair_reader::make_reader(
const dht::sharder& remote_sharder,
unsigned remote_shard,
gc_clock::time_point compaction_time,
incremental_repair_meta inc) {
incremental_repair_meta inc,
uint64_t multishard_reader_buffer_hint_size,
bool multishard_reader_enable_read_ahead) {
switch (strategy) {
case read_strategy::local: {
auto ms = mutation_source([&cf, compaction_time] (
@@ -313,12 +314,11 @@ mutation_reader repair_reader::make_reader(
}
case read_strategy::multishard_split: {
std::optional<size_t> multishard_reader_buffer_size;
const auto& dbconfig = db.local().get_config();
if (dbconfig.repair_multishard_reader_buffer_hint_size()) {
if (multishard_reader_buffer_hint_size) {
// Setting the repair buffer size as the multishard reader's buffer
// size helps avoid extra cross-shard round-trips and possible
// evict-recreate cycles.
multishard_reader_buffer_size = dbconfig.repair_multishard_reader_buffer_hint_size();
multishard_reader_buffer_size = multishard_reader_buffer_hint_size;
}
return make_multishard_streaming_reader(db, _schema, _permit, [this] {
auto shard_range = _sharder.next();
@@ -326,7 +326,7 @@ mutation_reader repair_reader::make_reader(
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
}, compaction_time, multishard_reader_buffer_size, read_ahead(dbconfig.repair_multishard_reader_enable_read_ahead()));
}, compaction_time, multishard_reader_buffer_size, read_ahead(multishard_reader_enable_read_ahead));
}
case read_strategy::multishard_filter: {
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time, {}, read_ahead::yes),
@@ -354,14 +354,17 @@ repair_reader::repair_reader(
uint64_t seed,
read_strategy strategy,
gc_clock::time_point compaction_time,
incremental_repair_meta inc)
incremental_repair_meta inc,
uint64_t multishard_reader_buffer_hint_size,
bool multishard_reader_enable_read_ahead)
: _schema(s)
, _permit(std::move(permit))
, _range(dht::to_partition_range(range))
, _sharder(remote_sharder, range, remote_shard)
, _seed(seed)
, _local_read_op(strategy == read_strategy::local ? std::optional(cf.read_in_progress()) : std::nullopt)
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc))
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc,
multishard_reader_buffer_hint_size, multishard_reader_enable_read_ahead))
{ }
future<mutation_fragment_opt>
@@ -1321,7 +1324,9 @@ private:
return read_strategy;
}),
_compaction_time,
_incremental_repair_meta);
_incremental_repair_meta,
_rs.get_config().repair_multishard_reader_buffer_hint_size(),
bool(_rs.get_config().repair_multishard_reader_enable_read_ahead()));
}
try {
while (cur_size < _max_row_buf_size) {
@@ -2630,7 +2635,7 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1);
bool updated = false;
auto now = gc_clock::now();
auto cache_time = std::chrono::milliseconds(get_db().local().get_config().repair_hints_batchlog_flush_cache_time_in_ms());
auto cache_time = std::chrono::milliseconds(_config.repair_hints_batchlog_flush_cache_time_in_ms());
auto cache_disabled = cache_time == std::chrono::milliseconds(0);
auto flush_time = now;
db::all_batches_replayed all_replayed = db::all_batches_replayed::yes;
@@ -3500,7 +3505,7 @@ public:
// To save memory and have less different conditions, we
// use the estimation for RBNO repair as well.
_estimated_partitions *= _shard_task.db.local().get_config().repair_partition_count_estimation_ratio();
_estimated_partitions *= _shard_task.rs.get_config().repair_partition_count_estimation_ratio();
}
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
@@ -3636,7 +3641,8 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
sharded<db::view::view_building_worker>& vbw,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory)
size_t max_repair_memory,
config cfg)
: _tsm(tsm)
, _gossiper(gossiper)
, _messaging(ms)
@@ -3651,6 +3657,7 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
, _node_ops_metrics(_repair_module)
, _max_repair_memory(max_repair_memory)
, _memory_sem(max_repair_memory)
, _config(std::move(cfg))
{
tm.register_module("repair", _repair_module);
if (this_shard_id() == 0) {
@@ -3661,7 +3668,7 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
future<> repair_service::start(utils::disk_space_monitor* dsm) {
if (dsm && (this_shard_id() == 0)) {
_out_of_space_subscription = dsm->subscribe(_db.local().get_config().critical_disk_utilization_level, [this] (auto threshold_reached) {
_out_of_space_subscription = dsm->subscribe(_config.critical_disk_utilization_level, [this] (auto threshold_reached) {
if (threshold_reached) {
return container().invoke_on_all([] (repair_service& rs) { return rs.drain(); });
}

View File

@@ -109,6 +109,17 @@ struct repair_task_progress {
};
class repair_service : public seastar::peering_sharded_service<repair_service> {
public:
struct config {
utils::updateable_value<bool> enable_small_table_optimization_for_rbno = utils::updateable_value<bool>(true);
utils::updateable_value<uint32_t> repair_hints_batchlog_flush_cache_time_in_ms = utils::updateable_value<uint32_t>(60*1000);
utils::updateable_value<double> repair_partition_count_estimation_ratio = utils::updateable_value<double>(0.1);
utils::updateable_value<float> critical_disk_utilization_level = utils::updateable_value<float>(0.98);
utils::updateable_value<uint64_t> repair_multishard_reader_buffer_hint_size = utils::updateable_value<uint64_t>(1024 * 1024);
utils::updateable_value<uint64_t> repair_multishard_reader_enable_read_ahead = utils::updateable_value<uint64_t>(0);
};
private:
sharded<service::topology_state_machine>& _tsm;
sharded<gms::gossiper>& _gossiper;
netw::messaging_service& _messaging;
@@ -162,6 +173,9 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
sstring keyspace, std::vector<sstring> cfs,
std::unordered_set<locator::host_id> ignore_nodes);
config _config;
static config default_config() { return {}; }
public:
std::unordered_map<locator::global_tablet_id, std::vector<seastar::rwlock::holder>> _repair_compaction_locks;
@@ -177,12 +191,15 @@ public:
sharded<db::view::view_building_worker>& vbw,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory
size_t max_repair_memory,
repair_service::config cfg = default_config()
);
~repair_service();
future<> start(utils::disk_space_monitor* dsm);
future<> stop();
const config& get_config() const noexcept { return _config; }
// shutdown() stops all ongoing repairs started on this node (and
// prevents any further repairs from being started). It returns a future
// saying when all repairs have stopped, and attempts to stop them as

View File

@@ -193,7 +193,9 @@ SEASTAR_TEST_CASE(test_reader_with_different_strategies) {
});
auto read_all = [&](repair_reader::read_strategy strategy) -> future<std::vector<mutation_fragment>> {
auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e),
random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now(), incremental_repair_meta());
random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now(), incremental_repair_meta(),
e.db_config().repair_multishard_reader_buffer_hint_size(),
e.db_config().repair_multishard_reader_enable_read_ahead());
std::vector<mutation_fragment> result;
while (auto mf = co_await reader.read_mutation_fragment()) {
result.push_back(std::move(*mf));
@@ -284,7 +286,9 @@ static future<> run_repair_reader_corruption_test(random_mutation_generator::com
auto test_range = dht::token_range::make_open_ended_both_sides();
auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e),
test_range, local_sharder, 0, 0, repair_reader::read_strategy::local,
gc_clock::now(), incremental_repair_meta());
gc_clock::now(), incremental_repair_meta(),
e.db_config().repair_multishard_reader_buffer_hint_size(),
e.db_config().repair_multishard_reader_enable_read_ahead());
try {
while (auto mf = co_await reader.read_mutation_fragment()) {