Compare commits
1 Commits
copilot/di
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91dd15d3bf |
@@ -304,8 +304,6 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||
}
|
||||
}));
|
||||
#endif
|
||||
verify_batch_size(qp, mutations);
|
||||
|
||||
bool mutate_atomic = true;
|
||||
if (_type != type::LOGGED) {
|
||||
_stats.batches_pure_unlogged += 1;
|
||||
@@ -313,6 +311,7 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||
} else {
|
||||
if (mutations.size() > 1) {
|
||||
_stats.batches_pure_logged += 1;
|
||||
verify_batch_size(qp, mutations);
|
||||
} else {
|
||||
_stats.batches_unlogged_from_logged += 1;
|
||||
mutate_atomic = false;
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:3cbe2dd05945f8fb76ebce2ea70864063d2b282c4d5080af1f290ead43321ab3
|
||||
size 6444732
|
||||
oid sha256:80a47fe93866989aaf7e949168fcd308e95841e78c976a61f9eac20bfdd34d96
|
||||
size 6448960
|
||||
|
||||
@@ -7252,81 +7252,29 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
});
|
||||
}
|
||||
|
||||
if (enabled) {
|
||||
// Enabling is immediate, no need to wait for idle topology or use a request
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_tablet_balancing_enabled(true)
|
||||
.build()));
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_tablet_balancing_enabled(enabled)
|
||||
.build()));
|
||||
|
||||
sstring reason = "Enabling tablet balancing";
|
||||
rtlogger.info("{}", reason);
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(true): concurrent modification, retrying");
|
||||
}
|
||||
sstring reason = format("Setting tablet balancing to {}", enabled);
|
||||
rtlogger.info("{}", reason);
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
|
||||
}
|
||||
} else {
|
||||
// Disabling is done via topology request to interrupt tablet scheduler
|
||||
utils::UUID request_id;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
|
||||
|
||||
// In legacy mode (without topology_global_request_queue feature), we can only have one
|
||||
// global request at a time. Check if there's a conflicting request already pending.
|
||||
auto curr_req = _topology_state_machine._topology.global_request;
|
||||
if (!_feature_service.topology_global_request_queue && curr_req && *curr_req != global_topology_request::disable_tablet_balancing) {
|
||||
throw std::runtime_error{
|
||||
"set_tablet_balancing_enabled: a different topology request is already pending, try again later"};
|
||||
}
|
||||
|
||||
utils::chunked_vector<canonical_mutation> muts;
|
||||
topology_mutation_builder builder(guard.write_timestamp());
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
request_id = guard.new_group0_state_id();
|
||||
builder.queue_global_topology_request_id(request_id);
|
||||
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now())
|
||||
.set("request_type", global_topology_request::disable_tablet_balancing);
|
||||
muts.push_back(rtbuilder.build());
|
||||
} else {
|
||||
builder.set_global_topology_request(global_topology_request::disable_tablet_balancing);
|
||||
}
|
||||
muts.push_back(builder.build());
|
||||
|
||||
sstring reason = "Disabling tablet balancing";
|
||||
rtlogger.info("{}", reason);
|
||||
topology_change change{std::move(muts)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
|
||||
|
||||
try {
|
||||
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
|
||||
break;
|
||||
} catch (group0_concurrent_modification&) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(false): concurrent modification, retrying");
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the request to be processed
|
||||
if (_feature_service.topology_global_request_queue) {
|
||||
co_await _topology_state_machine.event.when([this, request_id] {
|
||||
auto& queue = _topology_state_machine._topology.global_requests_queue;
|
||||
return std::find(queue.begin(), queue.end(), request_id) == queue.end();
|
||||
});
|
||||
} else {
|
||||
co_await _topology_state_machine.event.when([this] {
|
||||
return _topology_state_machine._topology.global_request != global_topology_request::disable_tablet_balancing;
|
||||
});
|
||||
}
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): topology is busy");
|
||||
co_await _topology_state_machine.event.when();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1091,21 +1091,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
co_await update_topology_state(std::move(guard), {builder.build()}, "TRUNCATE TABLE requested");
|
||||
}
|
||||
break;
|
||||
case global_topology_request::disable_tablet_balancing: {
|
||||
rtlogger.info("disable_tablet_balancing requested");
|
||||
utils::chunked_vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_tablet_balancing_enabled(false)
|
||||
.del_global_topology_request()
|
||||
.del_global_topology_request_id()
|
||||
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
|
||||
.build()));
|
||||
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
|
||||
.done()
|
||||
.build()));
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "disable tablet balancing");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -194,7 +194,6 @@ static std::unordered_map<global_topology_request, sstring> global_topology_requ
|
||||
{global_topology_request::cleanup, "cleanup"},
|
||||
{global_topology_request::keyspace_rf_change, "keyspace_rf_change"},
|
||||
{global_topology_request::truncate_table, "truncate_table"},
|
||||
{global_topology_request::disable_tablet_balancing, "disable_tablet_balancing"},
|
||||
};
|
||||
|
||||
global_topology_request global_topology_request_from_string(const sstring& s) {
|
||||
|
||||
@@ -76,7 +76,6 @@ enum class global_topology_request: uint16_t {
|
||||
cleanup,
|
||||
keyspace_rf_change,
|
||||
truncate_table,
|
||||
disable_tablet_balancing,
|
||||
};
|
||||
|
||||
struct ring_slice {
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
"""
|
||||
Test that disabling tablet balancing via REST API uses topology request mechanism.
|
||||
|
||||
This test verifies that:
|
||||
1. Disabling balancing interrupts ongoing balancing operations
|
||||
2. Enabling balancing is immediate
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disable_tablet_balancing_interrupts_scheduler(manager: ManagerClient):
|
||||
"""
|
||||
Test that disabling tablet balancing via REST properly interrupts the scheduler.
|
||||
|
||||
This test verifies the fix for the issue where disabling balancing would wait
|
||||
for topology state machine to become idle, potentially waiting hours for tablet
|
||||
repair to finish. With the fix, disabling balancing should use a topology request
|
||||
that interrupts the scheduler.
|
||||
"""
|
||||
logger.info("Starting test_disable_tablet_balancing_interrupts_scheduler")
|
||||
|
||||
# Start a single node cluster
|
||||
servers = [await manager.server_add()]
|
||||
|
||||
# Enable balancing initially
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# Disable balancing - this should complete quickly via topology request
|
||||
logger.info("Disabling tablet balancing")
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
logger.info("Tablet balancing disabled successfully")
|
||||
|
||||
# Enable balancing again - this should be immediate
|
||||
logger.info("Enabling tablet balancing")
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
logger.info("Tablet balancing enabled successfully")
|
||||
|
||||
# Disable again to verify the request mechanism works multiple times
|
||||
logger.info("Disabling tablet balancing again")
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
logger.info("Tablet balancing disabled successfully again")
|
||||
|
||||
logger.info("Test completed successfully")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_enable_disable_balancing_idempotent(manager: ManagerClient):
|
||||
"""
|
||||
Test that enabling/disabling balancing multiple times is idempotent.
|
||||
"""
|
||||
logger.info("Starting test_enable_disable_balancing_idempotent")
|
||||
|
||||
# Start a single node cluster
|
||||
servers = [await manager.server_add()]
|
||||
|
||||
# Enable twice - should not cause issues
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# Disable twice - should not cause issues
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
# Enable again
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
logger.info("Test completed successfully")
|
||||
@@ -74,3 +74,58 @@ def test_batch_with_error(cql, table1):
|
||||
# exceptions::exception_code::SERVER_ERROR, it gets converted to NoHostAvailable by the driver
|
||||
with pytest.raises(NoHostAvailable, match="Value too large"):
|
||||
cql.execute(generate_big_batch(table1, 100) + injection_key)
|
||||
|
||||
|
||||
def test_unlogged_batch_size_not_checked(cql, test_keyspace):
|
||||
"""Verifies that UNLOGGED batches are NOT subject to batch size limits.
|
||||
|
||||
Unlogged batches are applied as independent mutations and don't go through
|
||||
the system.batchlog table, so their collective size is irrelevant.
|
||||
This test should succeed even with a batch larger than the fail threshold.
|
||||
"""
|
||||
with new_test_table(cql, test_keyspace, "k int primary key, t text") as table:
|
||||
# Create a batch larger than the fail threshold (1024 KB)
|
||||
# This would fail for a logged batch, but should succeed for unlogged
|
||||
statements = [f"INSERT INTO {table} (k, t) VALUES ({idx}, '{'x' * 743}')" for idx in range(1100)]
|
||||
unlogged_batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
|
||||
|
||||
# This should not raise an exception
|
||||
cql.execute(unlogged_batch)
|
||||
|
||||
# Verify the data was inserted
|
||||
result = cql.execute(f"SELECT COUNT(*) FROM {table}")
|
||||
assert result.one()[0] == 1100
|
||||
|
||||
|
||||
def test_logged_multi_partition_batch_size_checked(cql, test_keyspace):
|
||||
"""Verifies that LOGGED batches targeting multiple partitions ARE subject to batch size limits.
|
||||
|
||||
Logged multi-partition batches go through system.batchlog and must be size-checked.
|
||||
"""
|
||||
with new_test_table(cql, test_keyspace, "k int primary key, t text") as table:
|
||||
# Create a batch larger than the fail threshold (1024 KB) with multiple partitions
|
||||
statements = [f"INSERT INTO {table} (k, t) VALUES ({idx}, '{'x' * 743}')" for idx in range(1100)]
|
||||
logged_batch = "BEGIN BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
|
||||
|
||||
# This should raise "Batch too large" exception
|
||||
with pytest.raises(InvalidRequest, match="Batch too large"):
|
||||
cql.execute(logged_batch)
|
||||
|
||||
|
||||
def test_logged_single_partition_batch_size_not_checked(cql, test_keyspace):
|
||||
"""Verifies that LOGGED batches targeting a single partition are NOT subject to batch size limits.
|
||||
|
||||
Logged single-partition batches don't go through system.batchlog,
|
||||
so their collective size is not relevant.
|
||||
"""
|
||||
with new_test_table(cql, test_keyspace, "k int, c int, t text, primary key (k, c)") as table:
|
||||
# Create a batch larger than the fail threshold (1024 KB) but all targeting the same partition
|
||||
statements = [f"INSERT INTO {table} (k, c, t) VALUES (1, {idx}, '{'x' * 743}')" for idx in range(1100)]
|
||||
logged_batch = "BEGIN BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
|
||||
|
||||
# This should not raise an exception since it's a single-partition batch
|
||||
cql.execute(logged_batch)
|
||||
|
||||
# Verify the data was inserted
|
||||
result = cql.execute(f"SELECT COUNT(*) FROM {table} WHERE k = 1")
|
||||
assert result.one()[0] == 1100
|
||||
|
||||
Reference in New Issue
Block a user