diff --git a/replica/table.cc b/replica/table.cc index 0f451b47d7..a21f1c6900 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -961,6 +961,11 @@ bool storage_group::set_split_mode() { return false; } if (!splitting_mode()) { + // Don't create new compaction groups if the main cg has compaction disabled + if (_main_cg->compaction_disabled()) { + tlogger.debug("storage_group::set_split_mode: split ready groups not created due to compaction disabled on the main group"); + return false; + } auto create_cg = [this] () -> compaction_group_ptr { // TODO: use the actual sub-ranges instead, to help incremental selection on the read path. return compaction_group::make_empty_group(*_main_cg); diff --git a/service/storage_service.cc b/service/storage_service.cc index 0924592e77..928b80e2bb 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5827,6 +5827,8 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep }); }; + co_await utils::get_local_injector().inject("tablet_split_monitor_wait", utils::wait_for_message(1min)); + exponential_backoff_retry split_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300)); while (!_async_gate.is_closed() && !_group0_as.abort_requested()) { diff --git a/test/cluster/test_truncate_with_tablets.py b/test/cluster/test_truncate_with_tablets.py index ef53f9f53a..3f5b9ff7a4 100644 --- a/test/cluster/test_truncate_with_tablets.py +++ b/test/cluster/test_truncate_with_tablets.py @@ -10,8 +10,8 @@ from cassandra.policies import FallthroughRetryPolicy from test.pylib.manager_client import ManagerClient from test.cluster.conftest import skip_mode from test.cluster.util import get_topology_coordinator, new_test_keyspace -from test.pylib.tablets import get_all_tablet_replicas -from test.pylib.util import wait_for_cql_and_get_hosts +from test.pylib.tablets import get_all_tablet_replicas, get_tablet_count +from test.pylib.util import wait_for_cql_and_get_hosts, wait_for import time import pytest import logging @@ -347,3 +347,78 @@ async def test_parallel_truncate(manager: ManagerClient): assert row[0].count == 0 row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test1', consistency_level=ConsistencyLevel.ALL)) assert row[0].count == 0 + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_split_emitted_during_truncate(manager: ManagerClient): + """Tests that truncation handles new compaction groups introduced by tablet + split after compaction was already disabled on existing groups. + + The scenario: + 1. Truncation disables compaction on all existing compaction groups, then pauses. + 2. While paused, tablet split creates new (split-ready) compaction groups. + 3. Truncation resumes and runs discard_sstables(), which encounters groups + that don't have compaction disabled. + + Without the fix, discard_sstables() would fire on_internal_error because the + new groups don't have compaction disabled. With the fix, it tolerates them as + long as they contain no sstables older than the truncation time. + """ + logger.info("Bootstrapping cluster") + cfg = { 'tablets_mode_for_new_keyspaces': 'enabled', + 'tablet_load_stats_refresh_interval_in_seconds': 1, + } + cmdline = [ + '--logger-log-level', 'table=debug', + '--logger-log-level', 'load_balancer=debug', + '--logger-log-level', 'debug_error_injection=debug', + ] + servers = await manager.servers_add(1, cmdline=cmdline, config=cfg) + server = servers[0] + + cql = manager.get_cql() + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};") + + keys = range(10) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + # Flush so sstables exist on disk, exercising the discard logic more thoroughly + await manager.api.flush_keyspace(server.ip_addr, ks) + + await manager.api.enable_injection(server.ip_addr, "database_truncate_wait", True) + await manager.api.enable_injection(server.ip_addr, "tablet_split_monitor_wait", True) + + log = await manager.server_open_log(server.server_id) + mark = await log.mark() + + expected_tablet_count = 2 + # Force split on the test table + await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}") + await log.wait_for("tablet_split_monitor_wait: waiting", from_mark=mark) + + truncate_task = cql.run_async(f"TRUNCATE {ks}.test") + + # Wait for truncation to disable compaction on existing groups and pause + await log.wait_for("database_truncate_wait: waiting", from_mark=mark) + + # Release split monitor so it calls set_split_mode(), creating new compaction groups + await manager.api.message_injection(server.ip_addr, "tablet_split_monitor_wait") + + # Wait for set_split_mode to create the new groups before releasing truncation + await log.wait_for('storage_group::set_split_mode', from_mark=mark) + + # Now release truncation - discard_sstables() will see the new groups + await manager.api.message_injection(server.ip_addr, "database_truncate_wait") + + await truncate_task + + # Verify truncation actually removed the data + row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL)) + assert row[0].count == 0 + + async def finished_splitting(): + tablet_count = await get_tablet_count(manager, server, ks, 'test') + return tablet_count >= expected_tablet_count or None + # Give enough time for split to happen in debug mode + await wait_for(finished_splitting, time.time() + 120)