diff --git a/main.cc b/main.cc index 0f25f04f4b..9691752582 100644 --- a/main.cc +++ b/main.cc @@ -1807,7 +1807,17 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl checkpoint(stop_signal, "starting repair service"); auto max_memory_repair = memory::stats().total_memory() * 0.1; - repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair).get(); + auto repair_config = sharded_parameter([&] { + return repair_service::config{ + .enable_small_table_optimization_for_rbno = cfg->enable_small_table_optimization_for_rbno, + .repair_hints_batchlog_flush_cache_time_in_ms = cfg->repair_hints_batchlog_flush_cache_time_in_ms, + .repair_partition_count_estimation_ratio = cfg->repair_partition_count_estimation_ratio, + .critical_disk_utilization_level = cfg->critical_disk_utilization_level, + .repair_multishard_reader_buffer_hint_size = cfg->repair_multishard_reader_buffer_hint_size, + .repair_multishard_reader_enable_read_ahead = cfg->repair_multishard_reader_enable_read_ahead, + }; + }); + repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair, std::move(repair_config)).get(); auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] { repair.stop().get(); }); diff --git a/repair/reader.hh b/repair/reader.hh index 49ac0295a7..8a7dcd0b81 100644 --- a/repair/reader.hh +++ b/repair/reader.hh @@ -46,7 +46,9 @@ private: const dht::sharder& remote_sharder, unsigned remote_shard, gc_clock::time_point compaction_time, - incremental_repair_meta inc); + incremental_repair_meta inc, + uint64_t multishard_reader_buffer_hint_size, + bool multishard_reader_enable_read_ahead); public: repair_reader( @@ -60,7 +62,9 @@ public: uint64_t seed, read_strategy strategy, gc_clock::time_point compaction_time, - incremental_repair_meta inc); + incremental_repair_meta inc, + uint64_t multishard_reader_buffer_hint_size, + bool multishard_reader_enable_read_ahead); future read_mutation_fragment(); diff --git a/repair/repair.cc b/repair/repair.cc index 91b397a2d2..465dfb699d 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -6,7 +6,6 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ -#include "db/config.hh" #include "repair.hh" #include "gms/gossip_address_map.hh" #include "locator/abstract_replication_strategy.hh" @@ -137,9 +136,8 @@ std::string_view format_as(row_level_diff_detect_algorithm algo) { return "unknown"; } -bool should_enable_small_table_optimization_for_rbno(const replica::database& db, sstring keyspace, streaming::stream_reason reason) { +bool should_enable_small_table_optimization_for_rbno(bool enable_small_table_optimization_for_rbno, sstring keyspace, streaming::stream_reason reason) { bool small_table_optimization = false; - auto enable_small_table_optimization_for_rbno = db.get_config().enable_small_table_optimization_for_rbno(); if (enable_small_table_optimization_for_rbno) { static const std::unordered_set small_table_optimization_enabled_ks = { "system_distributed", @@ -1507,7 +1505,7 @@ future<> repair::data_sync_repair_task_impl::run() { auto id = get_repair_uniq_id(); size_t ranges_reduced_factor = 1; - bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace, _reason); + bool small_table_optimization = should_enable_small_table_optimization_for_rbno(rs.get_config().enable_small_table_optimization_for_rbno(), keyspace, _reason); if (small_table_optimization) { auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false)); ranges_reduced_factor = _ranges.size(); @@ -1601,7 +1599,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr continue; } auto nr_tables = get_nr_tables(db, keyspace_name); - bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason); + bool small_table_optimization = should_enable_small_table_optimization_for_rbno(_config.enable_small_table_optimization_for_rbno(), keyspace_name, reason); if (small_table_optimization) { nr_ranges_total += 1 * nr_tables; continue; @@ -1621,7 +1619,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name); continue; } - bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason); + bool small_table_optimization = should_enable_small_table_optimization_for_rbno(_config.enable_small_table_optimization_for_rbno(), keyspace_name, reason); dht::token_range_vector desired_ranges; //Collects the source that will have its range moved to the new node std::unordered_map range_sources; diff --git a/repair/row_level.cc b/repair/row_level.cc index b9b0cdbb09..5b5bc2be3b 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -47,7 +47,6 @@ #include #include #include -#include "db/config.hh" #include "db/system_keyspace.hh" #include "service/storage_proxy.hh" #include "db/batchlog_manager.hh" @@ -287,7 +286,9 @@ mutation_reader repair_reader::make_reader( const dht::sharder& remote_sharder, unsigned remote_shard, gc_clock::time_point compaction_time, - incremental_repair_meta inc) { + incremental_repair_meta inc, + uint64_t multishard_reader_buffer_hint_size, + bool multishard_reader_enable_read_ahead) { switch (strategy) { case read_strategy::local: { auto ms = mutation_source([&cf, compaction_time] ( @@ -313,12 +314,11 @@ mutation_reader repair_reader::make_reader( } case read_strategy::multishard_split: { std::optional multishard_reader_buffer_size; - const auto& dbconfig = db.local().get_config(); - if (dbconfig.repair_multishard_reader_buffer_hint_size()) { + if (multishard_reader_buffer_hint_size) { // Setting the repair buffer size as the multishard reader's buffer // size helps avoid extra cross-shard round-trips and possible // evict-recreate cycles. - multishard_reader_buffer_size = dbconfig.repair_multishard_reader_buffer_hint_size(); + multishard_reader_buffer_size = multishard_reader_buffer_hint_size; } return make_multishard_streaming_reader(db, _schema, _permit, [this] { auto shard_range = _sharder.next(); @@ -326,7 +326,7 @@ mutation_reader repair_reader::make_reader( return std::optional(dht::to_partition_range(*shard_range)); } return std::optional(); - }, compaction_time, multishard_reader_buffer_size, read_ahead(dbconfig.repair_multishard_reader_enable_read_ahead())); + }, compaction_time, multishard_reader_buffer_size, read_ahead(multishard_reader_enable_read_ahead)); } case read_strategy::multishard_filter: { return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time, {}, read_ahead::yes), @@ -354,14 +354,17 @@ repair_reader::repair_reader( uint64_t seed, read_strategy strategy, gc_clock::time_point compaction_time, - incremental_repair_meta inc) + incremental_repair_meta inc, + uint64_t multishard_reader_buffer_hint_size, + bool multishard_reader_enable_read_ahead) : _schema(s) , _permit(std::move(permit)) , _range(dht::to_partition_range(range)) , _sharder(remote_sharder, range, remote_shard) , _seed(seed) , _local_read_op(strategy == read_strategy::local ? std::optional(cf.read_in_progress()) : std::nullopt) - , _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc)) + , _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc, + multishard_reader_buffer_hint_size, multishard_reader_enable_read_ahead)) { } future @@ -1321,7 +1324,9 @@ private: return read_strategy; }), _compaction_time, - _incremental_repair_meta); + _incremental_repair_meta, + _rs.get_config().repair_multishard_reader_buffer_hint_size(), + bool(_rs.get_config().repair_multishard_reader_enable_read_ahead())); } try { while (cur_size < _max_row_buf_size) { @@ -2630,7 +2635,7 @@ future repair_service::repair_flush_hints_ auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1); bool updated = false; auto now = gc_clock::now(); - auto cache_time = std::chrono::milliseconds(get_db().local().get_config().repair_hints_batchlog_flush_cache_time_in_ms()); + auto cache_time = std::chrono::milliseconds(_config.repair_hints_batchlog_flush_cache_time_in_ms()); auto cache_disabled = cache_time == std::chrono::milliseconds(0); auto flush_time = now; db::all_batches_replayed all_replayed = db::all_batches_replayed::yes; @@ -3500,7 +3505,7 @@ public: // To save memory and have less different conditions, we // use the estimation for RBNO repair as well. - _estimated_partitions *= _shard_task.db.local().get_config().repair_partition_count_estimation_ratio(); + _estimated_partitions *= _shard_task.rs.get_config().repair_partition_count_estimation_ratio(); } parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> { @@ -3636,7 +3641,8 @@ repair_service::repair_service(sharded& tsm, sharded& vbw, tasks::task_manager& tm, service::migration_manager& mm, - size_t max_repair_memory) + size_t max_repair_memory, + config cfg) : _tsm(tsm) , _gossiper(gossiper) , _messaging(ms) @@ -3651,6 +3657,7 @@ repair_service::repair_service(sharded& tsm, , _node_ops_metrics(_repair_module) , _max_repair_memory(max_repair_memory) , _memory_sem(max_repair_memory) + , _config(std::move(cfg)) { tm.register_module("repair", _repair_module); if (this_shard_id() == 0) { @@ -3661,7 +3668,7 @@ repair_service::repair_service(sharded& tsm, future<> repair_service::start(utils::disk_space_monitor* dsm) { if (dsm && (this_shard_id() == 0)) { - _out_of_space_subscription = dsm->subscribe(_db.local().get_config().critical_disk_utilization_level, [this] (auto threshold_reached) { + _out_of_space_subscription = dsm->subscribe(_config.critical_disk_utilization_level, [this] (auto threshold_reached) { if (threshold_reached) { return container().invoke_on_all([] (repair_service& rs) { return rs.drain(); }); } diff --git a/repair/row_level.hh b/repair/row_level.hh index 1f3c6e644d..f061a8b823 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -109,6 +109,17 @@ struct repair_task_progress { }; class repair_service : public seastar::peering_sharded_service { +public: + struct config { + utils::updateable_value enable_small_table_optimization_for_rbno = utils::updateable_value(true); + utils::updateable_value repair_hints_batchlog_flush_cache_time_in_ms = utils::updateable_value(60*1000); + utils::updateable_value repair_partition_count_estimation_ratio = utils::updateable_value(0.1); + utils::updateable_value critical_disk_utilization_level = utils::updateable_value(0.98); + utils::updateable_value repair_multishard_reader_buffer_hint_size = utils::updateable_value(1024 * 1024); + utils::updateable_value repair_multishard_reader_enable_read_ahead = utils::updateable_value(0); + }; + +private: sharded& _tsm; sharded& _gossiper; netw::messaging_service& _messaging; @@ -162,6 +173,9 @@ class repair_service : public seastar::peering_sharded_service { sstring keyspace, std::vector cfs, std::unordered_set ignore_nodes); + config _config; + static config default_config() { return {}; } + public: std::unordered_map> _repair_compaction_locks; @@ -177,12 +191,15 @@ public: sharded& vbw, tasks::task_manager& tm, service::migration_manager& mm, - size_t max_repair_memory + size_t max_repair_memory, + repair_service::config cfg = default_config() ); ~repair_service(); future<> start(utils::disk_space_monitor* dsm); future<> stop(); + const config& get_config() const noexcept { return _config; } + // shutdown() stops all ongoing repairs started on this node (and // prevents any further repairs from being started). It returns a future // saying when all repairs have stopped, and attempts to stop them as diff --git a/test/boost/repair_test.cc b/test/boost/repair_test.cc index a760d710c2..7de8088565 100644 --- a/test/boost/repair_test.cc +++ b/test/boost/repair_test.cc @@ -193,7 +193,9 @@ SEASTAR_TEST_CASE(test_reader_with_different_strategies) { }); auto read_all = [&](repair_reader::read_strategy strategy) -> future> { auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e), - random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now(), incremental_repair_meta()); + random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now(), incremental_repair_meta(), + e.db_config().repair_multishard_reader_buffer_hint_size(), + e.db_config().repair_multishard_reader_enable_read_ahead()); std::vector result; while (auto mf = co_await reader.read_mutation_fragment()) { result.push_back(std::move(*mf)); @@ -284,7 +286,9 @@ static future<> run_repair_reader_corruption_test(random_mutation_generator::com auto test_range = dht::token_range::make_open_ended_both_sides(); auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e), test_range, local_sharder, 0, 0, repair_reader::read_strategy::local, - gc_clock::now(), incremental_repair_meta()); + gc_clock::now(), incremental_repair_meta(), + e.db_config().repair_multishard_reader_buffer_hint_size(), + e.db_config().repair_multishard_reader_enable_read_ahead()); try { while (auto mf = co_await reader.read_mutation_fragment()) {