diff --git a/replica/database.cc b/replica/database.cc index df607b8ad2..9dda5675d1 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2491,8 +2491,9 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s struct database::table_truncate_state { gate::holder holder; - db_clock::time_point low_mark_at; + // This RP mark accounts for all data (includes memtable) generated until truncated_at. db::replay_position low_mark; + db_clock::time_point truncated_at; std::vector cres; bool did_flush; }; @@ -2529,14 +2530,6 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s st->holder = cf.async_gate().hold(); - // Force mutations coming in to re-acquire higher rp:s - // This creates a "soft" ordering, in that we will guarantee that - // any sstable written _after_ we issue the flush below will - // only have higher rp:s than we will get from the discard_sstable - // call. - st->low_mark_at = db_clock::now(); - st->low_mark = cf.set_low_replay_position_mark(); - st->cres.reserve(1 + cf.views().size()); auto& db = sharded_db.local(); auto& cm = db.get_compaction_manager(); @@ -2577,17 +2570,34 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s auto& cf = *table_shards; auto& st = *table_states[shard]; + // Force mutations coming in to re-acquire higher rp:s + // This creates a "soft" ordering, in that we will guarantee that + // any sstable written _after_ we issue the flush below will + // only have higher rp:s than we will get from the discard_sstables + // call. + st.low_mark = cf.set_low_replay_position_mark(); + co_await flush_or_clear(cf); + co_await coroutine::parallel_for_each(cf.views(), [&] (view_ptr v) -> future<> { auto& vcf = db.find_column_family(v); co_await flush_or_clear(vcf); }); + // Since writes could be appended to active memtable between getting low_mark above + // and flush, the low_mark has to be adjusted to account for those writes, where + // memtable was flushed with a higher replay position than the one obtained above. + st.low_mark = std::max(st.low_mark, cf.highest_flushed_replay_position()); + // truncated_at is a time point that describes both the truncation time, and also + // serves as a filter, where a sstable is only filtered in if it was created before + // the truncated_at. The reason for saving it right after flush, is to prevent a + // sstable created after we're done here in this shard from being included, since + // different shards might have different pace. + st.truncated_at = truncated_at_opt.value_or(db_clock::now()); st.did_flush = should_flush; }); - auto truncated_at = truncated_at_opt.value_or(db_clock::now()); - if (with_snapshot) { + auto truncated_at = truncated_at_opt.value_or(db_clock::now()); auto name = snapshot_name_opt.value_or( format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name())); co_await table::snapshot_on_all_shards(sharded_db, table_shards, name); @@ -2598,15 +2608,16 @@ future<> database::truncate_table_on_all_shards(sharded& sharded_db, s auto& cf = *table_shards; auto& st = *table_states[shard]; - return db.truncate(sys_ks.local(), cf, st, truncated_at); + return db.truncate(sys_ks.local(), cf, st); }); dblog.info("Truncated {}.{}", s->ks_name(), s->cf_name()); } -future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state& st, db_clock::time_point truncated_at) { +future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state& st) { dblog.trace("Truncating {}.{} on shard", cf.schema()->ks_name(), cf.schema()->cf_name()); const auto uuid = cf.schema()->id(); + const auto truncated_at = st.truncated_at; dblog.debug("Discarding sstable data for truncated CF + indexes"); // TODO: notify truncation @@ -2619,10 +2630,11 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, cons // We nowadays do not flush tables with sstables but autosnapshot=false. This means // the low_mark assertion does not hold, because we maybe/probably never got around to // creating the sstables that would create them. - // If truncated_at is earlier than the time low_mark was taken - // then the replay_position returned by discard_sstables may be - // smaller than low_mark. - SCYLLA_ASSERT(!st.did_flush || rp == db::replay_position() || (truncated_at <= st.low_mark_at ? rp <= st.low_mark : st.low_mark <= rp)); + // + // What we want to assert is that only data generated until truncation time was included, + // since we don't want to leave behind data on disk with RP lower than the one we set + // in the truncation table. + SCYLLA_ASSERT(!st.did_flush || rp == db::replay_position() || st.low_mark >= rp); if (rp == db::replay_position()) { // If this shard had no mutations, st.low_mark will be an empty, default constructed // replay_position. This is a problem because an empty replay_position has the shard_id diff --git a/replica/database.hh b/replica/database.hh index a2a4d6a38e..d3129aadf9 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -479,6 +479,9 @@ private: std::optional _sstable_generation_generator; db::replay_position _highest_rp; + // Tracks the highest replay position flushed to a sstable + db::replay_position _highest_flushed_rp; + // Tracks the highest position before flush actually starts db::replay_position _flush_rp; db::replay_position _lowest_allowed_rp; @@ -973,6 +976,7 @@ public: future snapshot_exists(sstring name); db::replay_position set_low_replay_position_mark(); + db::replay_position highest_flushed_replay_position() const; private: using snapshot_file_set = foreign_ptr>>; @@ -1884,7 +1888,7 @@ private: struct table_truncate_state; static future<> truncate_table_on_all_shards(sharded& db, sharded& sys_ks, const global_table_ptr&, std::optional truncated_at_opt, bool with_snapshot, std::optional snapshot_name_opt); - future<> truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state&, db_clock::time_point truncated_at); + future<> truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state&); public: /** Truncates the given column family */ // If truncated_at_opt is not given, it is set to db_clock::now right after flush/clear. diff --git a/replica/table.cc b/replica/table.cc index b4db130939..f610c2a3d8 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1535,6 +1535,7 @@ table::seal_active_memtable(compaction_group& cg, flush_permit&& flush_permit) n }); cg.memtables()->add_memtable(); + _highest_flushed_rp = std::max(_highest_flushed_rp, old->replay_position()); // no exceptions allowed (nor expected) from this point on _stats.memtable_switch_count++; @@ -2827,6 +2828,9 @@ logalloc::occupancy_stats table::occupancy() const { return res; } +db::replay_position table::highest_flushed_replay_position() const { + return _highest_flushed_rp; +} future<> table::seal_snapshot(sstring jsondir, std::vector file_sets) { diff --git a/test/cluster/test_tablets2.py b/test/cluster/test_tablets2.py index 55c634562c..d021c1b764 100644 --- a/test/cluster/test_tablets2.py +++ b/test/cluster/test_tablets2.py @@ -9,7 +9,7 @@ from test.pylib.internal_types import HostID, ServerInfo, ServerNum from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier from test.pylib.util import wait_for_cql_and_get_hosts, unique_name -from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas +from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, TabletReplicas from test.cluster.conftest import skip_mode from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace @@ -1835,3 +1835,48 @@ async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient): await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark) await manager.api.take_snapshot(servers[0].ip_addr, ks, "test_snapshot") + +# Reproduces assert failure when truncating table, either triggered by DROP TABLE or TRUNCATE. +# See: https://github.com/scylladb/scylladb/issues/18059 +# It's achieved by migrating a tablet away that contains the highest replay position of a shard, +# so when drop/truncate happens, the highest replay position will be greater than all the data +# found in the table (includes data in memtable). +@pytest.mark.asyncio +@pytest.mark.parametrize("operation", ['DROP TABLE', 'TRUNCATE']) +@skip_mode('release', 'error injections are not supported in release mode') +async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, operation): + cmdline = [ '--smp=2' ] + cfg = { 'auto_snapshot': True } + servers = [await manager.server_add(cmdline=cmdline, config=cfg)] + + await manager.api.disable_tablet_balancing(servers[0].ip_addr) + + cql = manager.get_cql() + ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND TABLETS = {{'initial': 4}}") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + + await manager.api.disable_autocompaction(servers[0].ip_addr, ks) + + keys = range(100) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys]) + + tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test') + tablet_replicas_in_s0 = list[TabletReplicas]() + + for replica in tablet_replicas: + if replica.replicas[0][1] == 0: + tablet_replicas_in_s0.append(replica) + + assert len(tablet_replicas_in_s0) == 2 + + target_tablet = tablet_replicas_in_s0[0] + + s0_host_id = await manager.get_host_id(servers[0].server_id) + + logger.info("Migrating 1st tablet to shard 1") + await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *(s0_host_id, 0), *(s0_host_id, 1), target_tablet.last_token) + + await manager.api.enable_injection(servers[0].ip_addr, "truncate_disable_compaction_delay", one_shot=True) + + logger.info(f"Running {operation} {ks}.test") + await cql.run_async(f"{operation} {ks}.test")