topology_coordinator: Make tablet_load_stats_refresh_interval configurable

This commits introduces an config option 'tablet_load_stats_refresh_interval_in_seconds'
that allows overriding the default value without using error injection.

Fixes scylladb/scylladb#24641

Closes scylladb/scylladb#24746
This commit is contained in:
Taras Veretilnyk
2025-06-30 16:05:00 +02:00
committed by Botond Dénes
parent 43f7eecf9e
commit 1d6808aec4
12 changed files with 35 additions and 30 deletions

View File

@@ -1470,6 +1470,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, rf_rack_valid_keyspaces(this, "rf_rack_valid_keyspaces", liveness::MustRestart, value_status::Used, false,
"Enforce RF-rack-valid keyspaces. Additionally, if there are existing RF-rack-invalid "
"keyspaces, attempting to start a node with this option ON will fail.")
// FIXME: make frequency per table in order to reduce work in each iteration.
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
, tablet_load_stats_refresh_interval_in_seconds(this, "tablet_load_stats_refresh_interval_in_seconds", liveness::LiveUpdate, value_status::Used, 60,
"Tablet load stats refresh rate in seconds.")
, default_log_level(this, "default_log_level", value_status::Used, seastar::log_level::info, "Default log level for log messages")
, logger_log_level(this, "logger_log_level", value_status::Used, {}, "Map of logger name to log level. Valid log levels are 'error', 'warn', 'info', 'debug' and 'trace'")
, log_to_stdout(this, "log_to_stdout", value_status::Used, true, "Send log output to stdout")

View File

@@ -612,6 +612,8 @@ public:
named_value<bool> rf_rack_valid_keyspaces;
named_value<uint32_t> tablet_load_stats_refresh_interval_in_seconds;
static const sstring default_tls_priority;
private:
template<typename T>

View File

@@ -39,6 +39,7 @@
#include "replica/database.hh"
#include "replica/tablet_mutation_builder.hh"
#include "replica/tablets.hh"
#include "db/config.hh"
#include "db/view/view_builder.hh"
#include "service/qos/service_level_controller.hh"
#include "service/migration_manager.hh"
@@ -61,6 +62,7 @@
#include "replica/exceptions.hh"
#include "service/paxos/prepare_response.hh"
#include "idl/storage_proxy.dist.hh"
#include "utils/updateable_value.hh"
#include "service/topology_coordinator.hh"
@@ -114,6 +116,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
locator::shared_token_metadata& _shared_tm;
db::system_keyspace& _sys_ks;
replica::database& _db;
utils::updateable_value<uint32_t> _tablet_load_stats_refresh_interval_in_seconds;
service::raft_group0& _group0;
service::topology_state_machine& _topo_sm;
abort_source& _as;
@@ -131,9 +134,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// to suffer lifetime issues when stats refresh fiber overrides the current stats.
std::unordered_map<locator::host_id, locator::load_stats> _load_stats_per_node;
serialized_action _tablet_load_stats_refresh;
// FIXME: make frequency per table in order to reduce work in each iteration.
// Bigger tables will take longer to be resized. similar-sized tables can be batched into same iteration.
static constexpr std::chrono::seconds tablet_load_stats_refresh_interval = std::chrono::seconds(60);
std::chrono::milliseconds _ring_delay;
@@ -3026,6 +3026,7 @@ public:
topology_coordinator_cmd_rpc_tracker& topology_cmd_rpc_tracker)
: _sys_dist_ks(sys_dist_ks), _gossiper(gossiper), _messaging(messaging)
, _shared_tm(shared_tm), _sys_ks(sys_ks), _db(db)
, _tablet_load_stats_refresh_interval_in_seconds(db.get_config().tablet_load_stats_refresh_interval_in_seconds)
, _group0(group0), _topo_sm(topo_sm), _as(as)
, _feature_service(feature_service)
, _raft(raft_server), _term(raft_server.get_current_term())
@@ -3254,8 +3255,7 @@ future<> topology_coordinator::start_tablet_load_stats_refresher() {
} catch (...) {
rtlogger.warn("Found error while refreshing load stats for tablets: {}, retrying...", std::current_exception());
}
auto refresh_interval = utils::get_local_injector().is_enabled("short_tablet_stats_refresh_interval") ?
std::chrono::seconds(1) : tablet_load_stats_refresh_interval;
auto refresh_interval = std::chrono::seconds(_tablet_load_stats_refresh_interval_in_seconds.get());
if (sleep && can_proceed()) {
try {
co_await seastar::sleep_abortable(refresh_interval, _as);

View File

@@ -294,7 +294,7 @@ async def test_mv_tablet_split(manager: ManagerClient):
'--target-tablet-size-in-bytes', '1024',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)

View File

@@ -397,7 +397,7 @@ async def test_tablet_resize_task(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -437,7 +437,7 @@ async def test_tablet_resize_list(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -452,7 +452,7 @@ async def test_tablet_resize_list(manager: ManagerClient):
await prepare_split(manager, servers[0], keyspace, table1, keys)
servers.append(await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
}))
s1_log = await manager.server_open_log(servers[0].server_id)
@@ -495,7 +495,7 @@ async def test_tablet_resize_revoked(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
})]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)

View File

@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
cfg = {
'data_file_capacity': 7000000,
'error_injections_at_startup': ['short_tablet_stats_refresh_interval'],
'tablet_load_stats_refresh_interval_in_seconds': 1
}
servers = await manager.servers_add(3, config=cfg)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]

View File

@@ -938,7 +938,7 @@ async def test_drop_keyspace_while_split(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [ '--target-tablet-size-in-bytes', '8192',
'--smp', '2' ]
config = { 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] }
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = [await manager.server_add(config=config, cmdline=cmdline)]
s0_log = await manager.server_open_log(servers[0].server_id)
@@ -1026,10 +1026,10 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
cfg = {
'enable_user_defined_functions': False, 'enable_tablets': True,
'error_injections_at_startup': [
'short_tablet_stats_refresh_interval',
# intially disable transitioning into tablet_resize_finalization topology state
'tablet_split_finalization_postpone',
]
],
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'raft_topology=debug',

View File

@@ -623,7 +623,7 @@ async def test_tablet_split(manager: ManagerClient, injection_error: str):
'--target-tablet-size-in-bytes', '1024',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -692,7 +692,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
'--target-tablet-size-in-bytes', '1024',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval'],
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -737,7 +737,7 @@ async def test_correctness_of_tablet_split_finalization_after_restart(manager: M
# Delays refresh of tablet stats, so balancer works with whichever it got last.
await manager.api.disable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing")
await manager.api.disable_injection(servers[0].ip_addr, "short_tablet_stats_refresh_interval")
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 60)
time.sleep(1)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -1200,7 +1200,7 @@ async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClie
'--target-tablet-size-in-bytes', '5000',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)

View File

@@ -190,7 +190,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -309,7 +309,7 @@ async def test_tablet_split_and_merge(manager: ManagerClient, with_merge: bool):
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.parametrize("wait_stage", [("streaming", "stream_tablet_wait"), ("cleanup", "cleanup_tablet_wait")])
async def test_create_colocated_table_while_base_is_migrating(manager: ManagerClient, wait_stage):
cfg = {'enable_tablets': True, 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] }
cfg = {'enable_tablets': True, 'tablet_load_stats_refresh_interval_in_seconds': 1 }
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',

View File

@@ -45,7 +45,7 @@ async def test_tablet_merge_simple(manager: ManagerClient):
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -191,7 +191,7 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
'--target-tablet-size-in-bytes', '30000',
]
config = {
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
'tablet_load_stats_refresh_interval_in_seconds': 1
}
servers = [await manager.server_add(config=config, cmdline=cmdline),
await manager.server_add(config=config, cmdline=cmdline),
@@ -328,8 +328,7 @@ async def test_tablet_split_and_merge_with_concurrent_topology_changes(manager:
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks):
cmdline = ['--target-tablet-size-in-bytes', '30000',]
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = []
rf = racks
for rack_id in range(0, racks):
@@ -381,7 +380,7 @@ async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks)
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_merge_with_many_tables(manager: ManagerClient, racks = 2):
cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',]
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = []
rf = racks
@@ -522,8 +521,8 @@ async def test_missing_data(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
}
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',

View File

@@ -408,7 +408,7 @@ async def test_restart_leaving_replica_during_cleanup(manager: ManagerClient, mi
"""
stage, injection = migration_stage_and_injection
cfg = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = await manager.servers_add(2, config=cfg)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -478,7 +478,7 @@ async def test_restart_in_cleanup_stage_after_cleanup(manager: ManagerClient):
the tablet cleanup stage, after tablet cleanup is completed.
Reproduces issue #24857
"""
cfg = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
cfg = {'tablet_load_stats_refresh_interval_in_seconds': 1}
servers = await manager.servers_add(2, config=cfg)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)

View File

@@ -48,7 +48,7 @@ async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fas
assert rf <= 3, "A keyspace with RF > 3 will be RF-rack-invalid if there are fewer racks than the RF"
if fast_stats_refresh:
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
config = {'tablet_load_stats_refresh_interval_in_seconds': 1}
else:
config = {}
if disable_flush_cache_time: