Compare commits

..

1 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
91dd15d3bf Fix batch size check to only apply to atomic batches with tests
The verify_batch_size() check was applied to all batches before determining mutate_atomic, causing unnecessary failures for batches that never touch system.batchlog. Only logged multi-partition batches (mutate_atomic=true) are stored as blobs in system.batchlog and require size validation.

Changed:

* Moved verify_batch_size() call in execute_without_conditions() directly
  into the logged multi-partition branch for clearer intent
* The check is now placed exactly where mutate_atomic=true, making the
  code more intuitive

Result:
Batch Type  Single-Partition  Size Check
LOGGED      false             YES
LOGGED      true              NO
UNLOGGED    N/A               NO

Tests:

* test_unlogged_batch_size_not_checked: Unlogged batch >1024KB succeeds
* test_logged_multi_partition_batch_size_checked: Logged multi-partition batch >1024KB fails
* test_logged_single_partition_batch_size_not_checked: Logged single-partition batch >1024KB succeeds

Fixes #27605

Co-authored-by: mykaul <yaniv.kaul@scylladb.com>
2025-12-15 10:45:29 +02:00
8 changed files with 77 additions and 167 deletions

View File

@@ -304,8 +304,6 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
}
}));
#endif
verify_batch_size(qp, mutations);
bool mutate_atomic = true;
if (_type != type::LOGGED) {
_stats.batches_pure_unlogged += 1;
@@ -313,6 +311,7 @@ future<coordinator_result<>> batch_statement::execute_without_conditions(
} else {
if (mutations.size() > 1) {
_stats.batches_pure_logged += 1;
verify_batch_size(qp, mutations);
} else {
_stats.batches_unlogged_from_logged += 1;
mutate_atomic = false;

View File

@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3cbe2dd05945f8fb76ebce2ea70864063d2b282c4d5080af1f290ead43321ab3
size 6444732
oid sha256:80a47fe93866989aaf7e949168fcd308e95841e78c976a61f9eac20bfdd34d96
size 6448960

View File

@@ -7252,81 +7252,29 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
});
}
if (enabled) {
// Enabling is immediate, no need to wait for idle topology or use a request
while (true) {
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
while (true) {
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_tablet_balancing_enabled(true)
.build()));
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_tablet_balancing_enabled(enabled)
.build()));
sstring reason = "Enabling tablet balancing";
rtlogger.info("{}", reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("set_tablet_balancing_enabled(true): concurrent modification, retrying");
}
sstring reason = format("Setting tablet balancing to {}", enabled);
rtlogger.info("{}", reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
}
} else {
// Disabling is done via topology request to interrupt tablet scheduler
utils::UUID request_id;
}
while (true) {
group0_guard guard = co_await _group0->client().start_operation(_group0_as, raft_timeout{});
// In legacy mode (without topology_global_request_queue feature), we can only have one
// global request at a time. Check if there's a conflicting request already pending.
auto curr_req = _topology_state_machine._topology.global_request;
if (!_feature_service.topology_global_request_queue && curr_req && *curr_req != global_topology_request::disable_tablet_balancing) {
throw std::runtime_error{
"set_tablet_balancing_enabled: a different topology request is already pending, try again later"};
}
utils::chunked_vector<canonical_mutation> muts;
topology_mutation_builder builder(guard.write_timestamp());
if (_feature_service.topology_global_request_queue) {
request_id = guard.new_group0_state_id();
builder.queue_global_topology_request_id(request_id);
topology_request_tracking_mutation_builder rtbuilder(request_id, _feature_service.topology_requests_type_column);
rtbuilder.set("done", false)
.set("start_time", db_clock::now())
.set("request_type", global_topology_request::disable_tablet_balancing);
muts.push_back(rtbuilder.build());
} else {
builder.set_global_topology_request(global_topology_request::disable_tablet_balancing);
}
muts.push_back(builder.build());
sstring reason = "Disabling tablet balancing";
rtlogger.info("{}", reason);
topology_change change{std::move(muts)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), _group0_as, raft_timeout{});
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("set_tablet_balancing_enabled(false): concurrent modification, retrying");
}
}
// Wait for the request to be processed
if (_feature_service.topology_global_request_queue) {
co_await _topology_state_machine.event.when([this, request_id] {
auto& queue = _topology_state_machine._topology.global_requests_queue;
return std::find(queue.begin(), queue.end(), request_id) == queue.end();
});
} else {
co_await _topology_state_machine.event.when([this] {
return _topology_state_machine._topology.global_request != global_topology_request::disable_tablet_balancing;
});
}
while (_topology_state_machine._topology.is_busy()) {
rtlogger.debug("set_tablet_balancing_enabled(): topology is busy");
co_await _topology_state_machine.event.when();
}
}

View File

@@ -1091,21 +1091,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(std::move(guard), {builder.build()}, "TRUNCATE TABLE requested");
}
break;
case global_topology_request::disable_tablet_balancing: {
rtlogger.info("disable_tablet_balancing requested");
utils::chunked_vector<canonical_mutation> updates;
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_tablet_balancing_enabled(false)
.del_global_topology_request()
.del_global_topology_request_id()
.drop_first_global_topology_request_id(_topo_sm._topology.global_requests_queue, req_id)
.build()));
updates.push_back(canonical_mutation(topology_request_tracking_mutation_builder(req_id)
.done()
.build()));
co_await update_topology_state(std::move(guard), std::move(updates), "disable tablet balancing");
}
break;
}
}

View File

@@ -194,7 +194,6 @@ static std::unordered_map<global_topology_request, sstring> global_topology_requ
{global_topology_request::cleanup, "cleanup"},
{global_topology_request::keyspace_rf_change, "keyspace_rf_change"},
{global_topology_request::truncate_table, "truncate_table"},
{global_topology_request::disable_tablet_balancing, "disable_tablet_balancing"},
};
global_topology_request global_topology_request_from_string(const sstring& s) {

View File

@@ -76,7 +76,6 @@ enum class global_topology_request: uint16_t {
cleanup,
keyspace_rf_change,
truncate_table,
disable_tablet_balancing,
};
struct ring_slice {

View File

@@ -1,75 +0,0 @@
"""
Test that disabling tablet balancing via REST API uses topology request mechanism.
This test verifies that:
1. Disabling balancing interrupts ongoing balancing operations
2. Enabling balancing is immediate
"""
import pytest
import asyncio
import logging
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_disable_tablet_balancing_interrupts_scheduler(manager: ManagerClient):
"""
Test that disabling tablet balancing via REST properly interrupts the scheduler.
This test verifies the fix for the issue where disabling balancing would wait
for topology state machine to become idle, potentially waiting hours for tablet
repair to finish. With the fix, disabling balancing should use a topology request
that interrupts the scheduler.
"""
logger.info("Starting test_disable_tablet_balancing_interrupts_scheduler")
# Start a single node cluster
servers = [await manager.server_add()]
# Enable balancing initially
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
# Disable balancing - this should complete quickly via topology request
logger.info("Disabling tablet balancing")
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Tablet balancing disabled successfully")
# Enable balancing again - this should be immediate
logger.info("Enabling tablet balancing")
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Tablet balancing enabled successfully")
# Disable again to verify the request mechanism works multiple times
logger.info("Disabling tablet balancing again")
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
logger.info("Tablet balancing disabled successfully again")
logger.info("Test completed successfully")
@pytest.mark.asyncio
async def test_enable_disable_balancing_idempotent(manager: ManagerClient):
"""
Test that enabling/disabling balancing multiple times is idempotent.
"""
logger.info("Starting test_enable_disable_balancing_idempotent")
# Start a single node cluster
servers = [await manager.server_add()]
# Enable twice - should not cause issues
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
# Disable twice - should not cause issues
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
# Enable again
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
logger.info("Test completed successfully")

View File

@@ -74,3 +74,58 @@ def test_batch_with_error(cql, table1):
# exceptions::exception_code::SERVER_ERROR, it gets converted to NoHostAvailable by the driver
with pytest.raises(NoHostAvailable, match="Value too large"):
cql.execute(generate_big_batch(table1, 100) + injection_key)
def test_unlogged_batch_size_not_checked(cql, test_keyspace):
"""Verifies that UNLOGGED batches are NOT subject to batch size limits.
Unlogged batches are applied as independent mutations and don't go through
the system.batchlog table, so their collective size is irrelevant.
This test should succeed even with a batch larger than the fail threshold.
"""
with new_test_table(cql, test_keyspace, "k int primary key, t text") as table:
# Create a batch larger than the fail threshold (1024 KB)
# This would fail for a logged batch, but should succeed for unlogged
statements = [f"INSERT INTO {table} (k, t) VALUES ({idx}, '{'x' * 743}')" for idx in range(1100)]
unlogged_batch = "BEGIN UNLOGGED BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
# This should not raise an exception
cql.execute(unlogged_batch)
# Verify the data was inserted
result = cql.execute(f"SELECT COUNT(*) FROM {table}")
assert result.one()[0] == 1100
def test_logged_multi_partition_batch_size_checked(cql, test_keyspace):
"""Verifies that LOGGED batches targeting multiple partitions ARE subject to batch size limits.
Logged multi-partition batches go through system.batchlog and must be size-checked.
"""
with new_test_table(cql, test_keyspace, "k int primary key, t text") as table:
# Create a batch larger than the fail threshold (1024 KB) with multiple partitions
statements = [f"INSERT INTO {table} (k, t) VALUES ({idx}, '{'x' * 743}')" for idx in range(1100)]
logged_batch = "BEGIN BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
# This should raise "Batch too large" exception
with pytest.raises(InvalidRequest, match="Batch too large"):
cql.execute(logged_batch)
def test_logged_single_partition_batch_size_not_checked(cql, test_keyspace):
"""Verifies that LOGGED batches targeting a single partition are NOT subject to batch size limits.
Logged single-partition batches don't go through system.batchlog,
so their collective size is not relevant.
"""
with new_test_table(cql, test_keyspace, "k int, c int, t text, primary key (k, c)") as table:
# Create a batch larger than the fail threshold (1024 KB) but all targeting the same partition
statements = [f"INSERT INTO {table} (k, c, t) VALUES (1, {idx}, '{'x' * 743}')" for idx in range(1100)]
logged_batch = "BEGIN BATCH\n" + "\n".join(statements) + "\n APPLY BATCH\n"
# This should not raise an exception since it's a single-partition batch
cql.execute(logged_batch)
# Verify the data was inserted
result = cql.execute(f"SELECT COUNT(*) FROM {table} WHERE k = 1")
assert result.one()[0] == 1100