Compare commits
4 Commits
next
...
copilot/di
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
69ee5b4203 | ||
|
|
4330f783bf | ||
|
|
5a1f6c350c | ||
|
|
82dca857ad |
@@ -7252,29 +7252,81 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
});
|
||||
}
|
||||
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
if (enabled) {
|
||||
// Enabling is immediate, no need to wait for idle topology or use a request
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_tablet_balancing_enabled(enabled)
|
||||
.build()));
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_tablet_balancing_enabled(true)
|
||||
.build()));
|
||||
|
||||
sstring reason = format("Setting tablet balancing to {}", enabled);
|
||||
rtlogger.info("{}", reason);
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
|
||||
sstring reason = "Enabling tablet balancing";
|
||||
rtlogger.info("{}", reason);
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(true): concurrent modification, retrying");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Disabling is done via topology request to interrupt tablet scheduler
|
||||
utils::UUID request_id;
|
||||
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): topology is busy");
|
||||
co_await _topology_state_machine.event.when();
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
// In legacy mode (without topology_global_request_queue feature), we can only have one
|
||||
// global request at a time. Check if there's a conflicting request already pending.
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (!_feature_service.topology_global_request_queue && curr_req && *curr_req != global_topology_request::disable_tablet_balancing) {
|
||||
throw std::runtime_error{
|
||||
"set_tablet_balancing_enabled: a different topology request is already pending, try again later"};
|
||||
}
|
||||
|
||||
utils::chunked_vector<canonical_mutation> muts;
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
request_id = guard.new_group0_state_id();
|
||||
builder.queue_global_topology_request_id(request_id);
|
||||
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::disable_tablet_balancing);
|
||||
muts.push_back(rtbuilder.build());
|
||||
} else {
|
||||
builder.set_global_topology_request(global_topology_request::disable_tablet_balancing);
|
||||
}
|
||||
muts.push_back(builder.build());
|
||||
|
||||
sstring reason = "Disabling tablet balancing";
|
||||
rtlogger.info("{}", reason);
|
||||
topology_change change{std::move(muts)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(false): concurrent modification, retrying");
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the request to be processed
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
co_await _topology_state_machine.event.when([this, request_id] {
|
||||
auto& queue = _topology_state_machine._topology.global_requests_queue;
|
||||
return std::find(queue.begin(), queue.end(), request_id) == queue.end();
|
||||
});
|
||||
} else {
|
||||
co_await _topology_state_machine.event.when([this] {
|
||||
return _topology_state_machine._topology.global_request != global_topology_request::disable_tablet_balancing;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1091,6 +1091,21 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
co_await update_topology_state(std::move(guard), {builder.build()}, "TRUNCATE TABLE requested");
|
||||
}
|
||||
break;
|
||||
case global_topology_request::disable_tablet_balancing: {
|
||||
rtlogger.info("disable_tablet_balancing requested");
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_tablet_balancing_enabled(false)
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
|
||||
.build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
|
||||
.done()
|
||||
.build()));
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "disable tablet balancing");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -194,6 +194,7 @@ static std::unordered_map<global_topology_request, sstring> global_topology_requ
|
||||
{global_topology_request::cleanup, "cleanup"},
|
||||
{global_topology_request::keyspace_rf_change, "keyspace_rf_change"},
|
||||
{global_topology_request::truncate_table, "truncate_table"},
|
||||
{global_topology_request::disable_tablet_balancing, "disable_tablet_balancing"},
|
||||
};
|
||||
|
||||
global_topology_request global_topology_request_from_string(const sstring& s) {
|
||||
|
||||
@@ -76,6 +76,7 @@ enum class global_topology_request: uint16_t {
|
||||
cleanup,
|
||||
keyspace_rf_change,
|
||||
truncate_table,
|
||||
disable_tablet_balancing,
|
||||
};
|
||||
|
||||
struct ring_slice {
|
||||
|
||||
75
test/cluster/test_tablet_balancing_disable.py
Normal file
75
test/cluster/test_tablet_balancing_disable.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""
|
||||
Test that disabling tablet balancing via REST API uses topology request mechanism.
|
||||
|
||||
This test verifies that:
|
||||
1. Disabling balancing interrupts ongoing balancing operations
|
||||
2. Enabling balancing is immediate
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disable_tablet_balancing_interrupts_scheduler(manager: ManagerClient):
|
||||
"""
|
||||
Test that disabling tablet balancing via REST properly interrupts the scheduler.
|
||||
|
||||
This test verifies the fix for the issue where disabling balancing would wait
|
||||
for topology state machine to become idle, potentially waiting hours for tablet
|
||||
repair to finish. With the fix, disabling balancing should use a topology request
|
||||
that interrupts the scheduler.
|
||||
"""
|
||||
logger.info("Starting test_disable_tablet_balancing_interrupts_scheduler")
|
||||
|
||||
# Start a single node cluster
|
||||
servers = [await manager.server_add()]
|
||||
|
||||
# Enable balancing initially
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# Disable balancing - this should complete quickly via topology request
|
||||
logger.info("Disabling tablet balancing")
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
logger.info("Tablet balancing disabled successfully")
|
||||
|
||||
# Enable balancing again - this should be immediate
|
||||
logger.info("Enabling tablet balancing")
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
logger.info("Tablet balancing enabled successfully")
|
||||
|
||||
# Disable again to verify the request mechanism works multiple times
|
||||
logger.info("Disabling tablet balancing again")
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
logger.info("Tablet balancing disabled successfully again")
|
||||
|
||||
logger.info("Test completed successfully")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_balancing_idempotent(manager: ManagerClient):
|
||||
"""
|
||||
Test that enabling/disabling balancing multiple times is idempotent.
|
||||
"""
|
||||
logger.info("Starting test_enable_disable_balancing_idempotent")
|
||||
|
||||
# Start a single node cluster
|
||||
servers = [await manager.server_add()]
|
||||
|
||||
# Enable twice - should not cause issues
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# Disable twice - should not cause issues
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# Enable again
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Test completed successfully")
|
||||
Reference in New Issue
Block a user