From bd66edee5ca6ccbf0ee4a7cd6511bc246ad68f29 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Sat, 21 Feb 2026 13:42:25 +0100 Subject: [PATCH] logstor: truncate table implement freeing all segments of a table for table truncate. first do barrier to flush all active and mixed segments and put all the table's data in compaction groups, then stop compaction for the table, then free the table's segments and remove the live entries from the index. --- replica/database.cc | 4 ++ replica/database.hh | 1 + replica/logstor/index.hh | 11 +++++ replica/logstor/logstor.cc | 4 ++ replica/logstor/logstor.hh | 2 + replica/logstor/segment_manager.cc | 70 ++++++++++++++++++++++++++++++ replica/logstor/segment_manager.hh | 2 + replica/table.cc | 6 +++ test/cluster/test_logstor.py | 50 +++++++++++++++++++++ 9 files changed, 150 insertions(+) diff --git a/replica/database.cc b/replica/database.cc index 099c793d91..3da5e94bea 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -3041,6 +3041,10 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, std: dblog.debug("Discarding sstable data for truncated CF + indexes"); // TODO: notify truncation + if (cf.uses_kv_storage()) { + co_await cf.discard_kv_storage(); + } + db::replay_position rp = co_await cf.discard_sstables(truncated_at); // TODO: indexes. // Note: since discard_sstables was changed to only count tables owned by this shard, diff --git a/replica/database.hh b/replica/database.hh index 958ae8672b..aab17f3aa2 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1066,6 +1066,7 @@ public: bool needs_flush() const; future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); + future<> discard_kv_storage(); bool can_flush() const; diff --git a/replica/logstor/index.hh b/replica/logstor/index.hh index ceffea8904..2a2c81b90d 100644 --- a/replica/logstor/index.hh +++ b/replica/logstor/index.hh @@ -73,6 +73,13 @@ public: } } + void erase(const index_key& key, log_location loc) { + auto it = _index.find(key); + if (it != _index.end() && it->location == loc) { + _index.erase(key); + } + } + auto begin() const { return _index.begin(); } auto end() const { return _index.end(); } }; @@ -118,6 +125,10 @@ public: return get_bucket(key).insert_if_newer(key, std::move(new_entry)); } + void erase(const index_key& key, log_location loc) { + get_bucket(key).erase(key, loc); + } + class const_iterator { using bucket_array = log_index::bucket_array; using bucket_iterator = decltype(std::declval().begin()); diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 7809a3d842..c7a4d5a9f8 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -72,6 +72,10 @@ future<> logstor::do_barrier() { return _segment_manager.do_barrier(); } +future<> logstor::truncate_table(table_id tid) { + return _segment_manager.truncate_table(tid); +} + future<> logstor::write(const mutation& m, group_id group) { auto key = calculate_key(*m.schema(), m.decorated_key()); diff --git a/replica/logstor/logstor.hh b/replica/logstor/logstor.hh index 4c561472e5..8d9017c9b1 100644 --- a/replica/logstor/logstor.hh +++ b/replica/logstor/logstor.hh @@ -58,6 +58,8 @@ public: future<> do_barrier(); + future<> truncate_table(table_id); + static index_key calculate_key(const schema&, const dht::decorated_key&); future<> write(const mutation&, group_id); diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 0f441e976f..8a677eda32 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -688,6 +688,8 @@ public: } } + future> free_compaction_groups(table_id); + future<> write_to_separator(separator&, write_buffer&, log_location base_location); future<> write_to_separator(separator&, log_segment_id); future<> flush_separator(separator&); @@ -704,6 +706,7 @@ private: future<> compact_segments(group_id gid, std::vector segments); void remove_segment(segment_descriptor& desc); + future>> remove_compaction_group(compaction_group_it); separator::buffer& get_separator_buffer(separator& sep, const log_record_writer& writer); @@ -933,6 +936,8 @@ public: future<> do_barrier(); + future<> truncate_table(table_id table); + private: struct segment_allocation_guard { @@ -1510,6 +1515,31 @@ future<> segment_manager_impl::free_segment(log_segment_id segment_id) { co_return; } +future<> segment_manager_impl::truncate_table(table_id table) { + logstor_logger.info("Truncating table {}", table); + + auto holder = _async_gate.hold(); + + // do a barrier to ensure all the table's data is flushed to segments + // in the table's compaction groups. we assume there are no new writes + // to the table by this point. + co_await do_barrier(); + + // stop and free the compaction groups of the table. + auto segments = co_await _compaction_mgr.free_compaction_groups(table); + + // free all segments and remove their records from the index. + for (auto seg_id : segments) { + logstor_logger.debug("Freeing segment {} of truncated table {}", seg_id, table); + co_await for_each_record(seg_id, [this] (log_location loc, log_record record) { + _index.erase(record.key, loc); + return make_ready_future<>(); + }); + + co_await free_segment(seg_id); + } +} + future<> segment_manager_impl::for_each_record(log_segment_id segment_id, std::function(log_location, log_record)> callback) { auto holder = _async_gate.hold(); @@ -1836,6 +1866,42 @@ void compaction_manager::remove_segment(segment_descriptor& desc) { } } +future>> +compaction_manager::remove_compaction_group(compaction_group_it it) { + std::vector segments; + auto& cg = it->second; + auto& hist = cg.segment_hist; + while (!hist.empty()) { + co_await coroutine::maybe_yield(); + auto& desc = hist.one_of_largest(); + auto seg_id = _sm.desc_to_segment_id(desc); + segments.push_back(seg_id); + hist.erase(desc); + } + + if (_next_group_for_compaction == it) { + ++_next_group_for_compaction; + } + it = _compaction_groups.erase(it); + co_return std::make_pair(it, std::move(segments)); +} + +future> compaction_manager::free_compaction_groups(table_id table) { + co_await disable_auto_compaction(table); + + std::vector segments_to_free; + + auto it = _compaction_groups.lower_bound(group_id{table, 0}); + while (it != _compaction_groups.end() && it->first.table == table) { + co_await coroutine::maybe_yield(); + auto [next_it, segments] = co_await remove_compaction_group(it); + segments_to_free.insert(segments_to_free.end(), segments.begin(), segments.end()); + it = next_it; + } + + co_return std::move(segments_to_free); +} + separator::buffer& compaction_manager::get_separator_buffer(separator& sep, const log_record_writer& writer) { auto gid = writer.record().group; @@ -2206,6 +2272,10 @@ future<> segment_manager::do_barrier() { return _impl->do_barrier(); } +future<> segment_manager::truncate_table(table_id table) { + return _impl->truncate_table(table); +} + } template<> diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 50373a315d..6313e84579 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -83,6 +83,8 @@ public: // to their group_id. future<> do_barrier(); + future<> truncate_table(table_id); + friend class segment_manager_impl; }; diff --git a/replica/table.cc b/replica/table.cc index 4e8870c18d..9783e6438f 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -4281,6 +4281,12 @@ future table::discard_sstables(db_clock::time_point truncat co_return rp; } +future<> table::discard_kv_storage() { + if (_logstor) { + co_await _logstor->truncate_table(_schema->id()); + } +} + void table::mark_ready_for_writes(db::commitlog* cl) { if (!_readonly) { on_internal_error(dblog, ::format("table {}.{} is already writable", _schema->ks_name(), _schema->cf_name())); diff --git a/test/cluster/test_logstor.py b/test/cluster/test_logstor.py index 17e7164657..7958c3ac4e 100644 --- a/test/cluster/test_logstor.py +++ b/test/cluster/test_logstor.py @@ -330,3 +330,53 @@ async def test_compaction(manager: ManagerClient): metrics = await manager.metrics.query(servers[0].ip_addr) segments_compacted = metrics.get("scylla_logstor_sm_segments_compacted") or 0 assert segments_compacted == 4, f"Expected 4 segments to be compacted, but got {segments_compacted}" + +@pytest.mark.asyncio +async def test_drop_table(manager: ManagerClient): + """ + Test log compaction by creating dead data and verifying space reclamation. + """ + cmdline = ['--logger-log-level', 'logstor=trace'] + cfg = {'enable_logstor': True, 'experimental_features': ['logstor']} + servers = await manager.servers_add(1, cmdline=cmdline, config=cfg) + cql = manager.get_cql() + + async with new_test_keyspace(manager, "WITH tablets={'initial': 4}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'") + + # create another table that will not be dropped to verify it's not affected + await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'") + + # write data to fill few segments + value_size = 30 * 1024 + value = 'x' * value_size + for i in range(20): + await cql.run_async(f"INSERT INTO {ks}.test1 (pk, v) VALUES ({i}, '{value}')") + await cql.run_async(f"INSERT INTO {ks}.test2 (pk, v) VALUES ({i}, '{value}')") + + await cql.run_async(f"DROP TABLE {ks}.test1") + + # verify test2 is not affected + for i in range(20): + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test2 WHERE pk = {i}") + assert len(rows) == 1, f"Expected 1 row for key {i} in test2, but got {len(rows)}" + assert rows[0].v == value, f"Expected value of size {value_size} for key {i} in test2, but got {len(rows[0].v)}" + + # recreate the table and verify that old data is not visible + await cql.run_async(f"CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'") + + for i in range(20): + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test1 WHERE pk = {i}") + assert len(rows) == 0, f"Expected no rows for key {i} after table drop, but got {len(rows)}" + + # write new data to the recreated table and verify + await cql.run_async(f"INSERT INTO {ks}.test1 (pk, v) VALUES (1, 'new_value')") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test1 WHERE pk = 1") + assert len(rows) == 1, f"Expected 1 row for key 1 after new insert, but got {len(rows)}" + assert rows[0].v == 'new_value', f"Expected value 'new_value' for key 1 after new insert, but got {rows[0].v}" + + # verify test2 again + for i in range(20): + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test2 WHERE pk = {i}") + assert len(rows) == 1, f"Expected 1 row for key {i} in test2 after all operations, but got {len(rows)}" + assert rows[0].v == value, f"Expected value of size {value_size} for key {i} in test2 after all operations, but got {len(rows[0].v)}"