From 0f59deffaa362d04945190dfd77f1c3a0a33d680 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 2 Apr 2025 22:24:38 -0300 Subject: [PATCH] replica: Fix truncate and drop table after tablet migration happens When running those operations after a tablet replica is migrated away from a shard, an assert can fail resulting in a crash. Status quo (around the assert in truncate procedure): 1) Highest RP seen by table is saved in low_mark, and the current time in low_mark_at. 2) Then compaction is disabled in order to not mix data written before truncate, and data written later. 3) Then memtable is flushed in order for the data written before truncate to be available in sstables and then removed. 4) Now, current time is saved in truncated_at, which is supposedly the time of truncate to decide which sstables to remove. Note: truncated_at is likely above low_mark_at due to steps 2 and 3. The interesting part of the assert is: (truncated_at <= low_mark_at ? rp <= low_mark : low_mark <= rp) Note: RP in the assert above is the highest RP among all sstables generated before truncated_at. RP is retrieved by table::discard_sstables(). If truncated_at > low_mark_at, maybe newer data was written during steps 2 and 3, and memtable's RP becomes greater than low_mark, resulting in a SSTable with RP > low_mark. So assert's 2nd condition is there to defend against the scenario above. truncated_at and low_mark_at uses millisecond granularity, so even if truncated_at == low_mark_at, data could have been written in steps 2 and 3 (during same MS window), failing the assert. This is fragile. Reproducer: To reproduce the problem, truncated_at must be > low_mark_at, which can easily happen with both drop table and truncate due to steps 2 and 3. If a shard has 2 or more tablets, the table's highest RP refer to just one tablet in that shard. If the tablet with the highest RP is migrated away, then the sstables in that shard will have lower RP than the recorded highest RP (it's a table wide state, which makes sense since CL is shared among tablets). So when either drop table or truncate runs, low_mark will be potentially bigger than highest RP retrieved from sstables. Proposed solution: The current assert is hacked to not fail if writes sneak in, during steps 2 and 3, but it's still fragile and seems not to serve its real purpose, since it's allowing for RP > low_mark. We should be able to say that low_mark >= RP, as a way of asserting we're not leaving data targeted by truncate behind (or that we're not removing the wrong data). But the problem is that we're saving low_mark in step 1, before preparation steps (2 and 3). When truncated_at is recorded in step 4, it's a way of saying all data written so far is targeted for removal. But as of today, low_mark refers to all data written up to step 1. So low_mark is now only one set before issuing flush, and also accounts for all potentially flushed data. Fixes #18059. Signed-off-by: Raphael S. Carvalho Closes scylladb/scylladb#23560 --- replica/database.cc | 46 +++++++++++++++++++++------------- replica/database.hh | 6 ++++- replica/table.cc | 4 +++ test/cluster/test_tablets2.py | 47 ++++++++++++++++++++++++++++++++++- 4 files changed, 84 insertions(+), 19 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index df607b8ad2..9dda5675d1 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2491,8 +2491,9 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s struct database::table_truncate_state { gate::holder holder; - db_clock::time_point low_mark_at; + // This RP mark accounts for all data (includes memtable) generated until truncated_at. db::replay_position low_mark; + db_clock::time_point truncated_at; std::vector cres; bool did_flush; }; @@ -2529,14 +2530,6 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s st->holder = cf.async_gate().hold(); - // Force mutations coming in to re-acquire higher rp:s - // This creates a "soft" ordering, in that we will guarantee that - // any sstable written _after_ we issue the flush below will - // only have higher rp:s than we will get from the discard_sstable - // call. - st->low_mark_at = db_clock::now(); - st->low_mark = cf.set_low_replay_position_mark(); - st->cres.reserve(1 + cf.views().size()); auto& db = sharded_db.local(); auto& cm = db.get_compaction_manager(); @@ -2577,17 +2570,34 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s auto& cf = *table_shards; auto& st = *table_states[shard]; + // Force mutations coming in to re-acquire higher rp:s + // This creates a "soft" ordering, in that we will guarantee that + // any sstable written _after_ we issue the flush below will + // only have higher rp:s than we will get from the discard_sstables + // call. + st.low_mark = cf.set_low_replay_position_mark(); + co_await flush_or_clear(cf); + co_await coroutine::parallel_for_each(cf.views(), [&] (view_ptr v) -> future<> { auto& vcf = db.find_column_family(v); co_await flush_or_clear(vcf); }); + // Since writes could be appended to active memtable between getting low_mark above + // and flush, the low_mark has to be adjusted to account for those writes, where + // memtable was flushed with a higher replay position than the one obtained above. + st.low_mark = std::max(st.low_mark, cf.highest_flushed_replay_position()); + // truncated_at is a time point that describes both the truncation time, and also + // serves as a filter, where a sstable is only filtered in if it was created before + // the truncated_at. The reason for saving it right after flush, is to prevent a + // sstable created after we're done here in this shard from being included, since + // different shards might have different pace. + st.truncated_at = truncated_at_opt.value_or(db_clock::now()); st.did_flush = should_flush; }); - auto truncated_at = truncated_at_opt.value_or(db_clock::now()); - if (with_snapshot) { + auto truncated_at = truncated_at_opt.value_or(db_clock::now()); auto name = snapshot_name_opt.value_or( format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name())); co_await table::snapshot_on_all_shards(sharded_db, table_shards, name); @@ -2598,15 +2608,16 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s auto& cf = *table_shards; auto& st = *table_states[shard]; - return db.truncate(sys_ks.local(), cf, st, truncated_at); + return db.truncate(sys_ks.local(), cf, st); }); dblog.info("Truncated {}.{}", s->ks_name(), s->cf_name()); } -future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state& st, db_clock::time_point truncated_at) { +future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state& st) { dblog.trace("Truncating {}.{} on shard", cf.schema()->ks_name(), cf.schema()->cf_name()); const auto uuid = cf.schema()->id(); + const auto truncated_at = st.truncated_at; dblog.debug("Discarding sstable data for truncated CF + indexes"); // TODO: notify truncation @@ -2619,10 +2630,11 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, cons // We nowadays do not flush tables with sstables but autosnapshot=false. This means // the low_mark assertion does not hold, because we maybe/probably never got around to // creating the sstables that would create them. - // If truncated_at is earlier than the time low_mark was taken - // then the replay_position returned by discard_sstables may be - // smaller than low_mark. - SCYLLA_ASSERT(!st.did_flush || rp == db::replay_position() || (truncated_at <= st.low_mark_at ? rp <= st.low_mark : st.low_mark <= rp)); + // + // What we want to assert is that only data generated until truncation time was included, + // since we don't want to leave behind data on disk with RP lower than the one we set + // in the truncation table. + SCYLLA_ASSERT(!st.did_flush || rp == db::replay_position() || st.low_mark >= rp); if (rp == db::replay_position()) { // If this shard had no mutations, st.low_mark will be an empty, default constructed // replay_position. This is a problem because an empty replay_position has the shard_id diff --git a/replica/database.hh b/replica/database.hh index a2a4d6a38e..d3129aadf9 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -479,6 +479,9 @@ private: std::optional _sstable_generation_generator; db::replay_position _highest_rp; + // Tracks the highest replay position flushed to a sstable + db::replay_position _highest_flushed_rp; + // Tracks the highest position before flush actually starts db::replay_position _flush_rp; db::replay_position _lowest_allowed_rp; @@ -973,6 +976,7 @@ public: future snapshot_exists(sstring name); db::replay_position set_low_replay_position_mark(); + db::replay_position highest_flushed_replay_position() const; private: using snapshot_file_set = foreign_ptr>>; @@ -1884,7 +1888,7 @@ private: struct table_truncate_state; static future<> truncate_table_on_all_shards(sharded& db, sharded& sys_ks, const global_table_ptr&, std::optional truncated_at_opt, bool with_snapshot, std::optional snapshot_name_opt); - future<> truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state&, db_clock::time_point truncated_at); + future<> truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state&); public: /** Truncates the given column family */ // If truncated_at_opt is not given, it is set to db_clock::now right after flush/clear. diff --git a/replica/table.cc b/replica/table.cc index b4db130939..f610c2a3d8 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1535,6 +1535,7 @@ table::seal_active_memtable(compaction_group& cg, flush_permit&& flush_permit) n }); cg.memtables()->add_memtable(); + _highest_flushed_rp = std::max(_highest_flushed_rp, old->replay_position()); // no exceptions allowed (nor expected) from this point on _stats.memtable_switch_count++; @@ -2827,6 +2828,9 @@ logalloc::occupancy_stats table::occupancy() const { return res; } +db::replay_position table::highest_flushed_replay_position() const { + return _highest_flushed_rp; +} future<> table::seal_snapshot(sstring jsondir, std::vector file_sets) { diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 55c634562c..d021c1b764 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -9,7 +9,7 @@ from test.pylib.internal_types import HostID, ServerInfo, ServerNum from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier from test.pylib.util import wait_for_cql_and_get_hosts, unique_name -from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas +from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, TabletReplicas from test.cluster.conftest import skip_mode from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace @@ -1835,3 +1835,48 @@ async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient): await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark) await manager.api.take_snapshot(servers[0].ip_addr, ks, "test_snapshot") + +# Reproduces assert failure when truncating table, either triggered by DROP TABLE or TRUNCATE. +# See: https://github.com/scylladb/scylladb/issues/18059 +# It's achieved by migrating a tablet away that contains the highest replay position of a shard, +# so when drop/truncate happens, the highest replay position will be greater than all the data +# found in the table (includes data in memtable). +@pytest.mark.asyncio +@pytest.mark.parametrize("operation", ['DROP TABLE', 'TRUNCATE']) +@skip_mode('release', 'error injections are not supported in release mode') +async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, operation): + cmdline = [ '--smp=2' ] + cfg = { 'auto_snapshot': True } + servers = [await manager.server_add(cmdline=cmdline, config=cfg)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND TABLETS = {{'initial': 4}}") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + + await manager.api.disable_autocompaction(servers[0].ip_addr, ks) + + keys = range(100) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test') + tablet_replicas_in_s0 = list[TabletReplicas]() + + for replica in tablet_replicas: + if replica.replicas[0][1] == 0: + tablet_replicas_in_s0.append(replica) + + assert len(tablet_replicas_in_s0) == 2 + + target_tablet = tablet_replicas_in_s0[0] + + s0_host_id = await manager.get_host_id(servers[0].server_id) + + logger.info("Migrating 1st tablet to shard 1") + await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *(s0_host_id, 0), *(s0_host_id, 1), target_tablet.last_token) + + await manager.api.enable_injection(servers[0].ip_addr, "truncate_disable_compaction_delay", one_shot=True) + + logger.info(f"Running {operation} {ks}.test") + await cql.run_async(f"{operation} {ks}.test")