diff --git a/replica/database.cc b/replica/database.cc index 099c793d91..3da5e94bea 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -3041,6 +3041,10 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, std: dblog.debug("Discarding sstable data for truncated CF + indexes"); // TODO: notify truncation + if (cf.uses_kv_storage()) { + co_await cf.discard_kv_storage(); + } + db::replay_position rp = co_await cf.discard_sstables(truncated_at); // TODO: indexes. // Note: since discard_sstables was changed to only count tables owned by this shard, diff --git a/replica/database.hh b/replica/database.hh index 958ae8672b..aab17f3aa2 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1066,6 +1066,7 @@ public: bool needs_flush() const; future<> clear(); // discards memtable(s) without flushing them to disk. future discard_sstables(db_clock::time_point); + future<> discard_kv_storage(); bool can_flush() const; diff --git a/replica/logstor/index.hh b/replica/logstor/index.hh index ceffea8904..2a2c81b90d 100644 --- a/replica/logstor/index.hh +++ b/replica/logstor/index.hh @@ -73,6 +73,13 @@ public: } } + void erase(const index_key& key, log_location loc) { + auto it = _index.find(key); + if (it != _index.end() && it->location == loc) { + _index.erase(key); + } + } + auto begin() const { return _index.begin(); } auto end() const { return _index.end(); } }; @@ -118,6 +125,10 @@ public: return get_bucket(key).insert_if_newer(key, std::move(new_entry)); } + void erase(const index_key& key, log_location loc) { + get_bucket(key).erase(key, loc); + } + class const_iterator { using bucket_array = log_index::bucket_array; using bucket_iterator = decltype(std::declval().begin()); diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 7809a3d842..c7a4d5a9f8 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -72,6 +72,10 @@ future<> logstor::do_barrier() { return _segment_manager.do_barrier(); } +future<> logstor::truncate_table(table_id tid) { + return _segment_manager.truncate_table(tid); +} + future<> logstor::write(const mutation& m, group_id group) { auto key = calculate_key(*m.schema(), m.decorated_key()); diff --git a/replica/logstor/logstor.hh b/replica/logstor/logstor.hh index 4c561472e5..8d9017c9b1 100644 --- a/replica/logstor/logstor.hh +++ b/replica/logstor/logstor.hh @@ -58,6 +58,8 @@ public: future<> do_barrier(); + future<> truncate_table(table_id); + static index_key calculate_key(const schema&, const dht::decorated_key&); future<> write(const mutation&, group_id); diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 0f441e976f..8a677eda32 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -688,6 +688,8 @@ public: } } + future> free_compaction_groups(table_id); + future<> write_to_separator(separator&, write_buffer&, log_location base_location); future<> write_to_separator(separator&, log_segment_id); future<> flush_separator(separator&); @@ -704,6 +706,7 @@ private: future<> compact_segments(group_id gid, std::vector segments); void remove_segment(segment_descriptor& desc); + future>> remove_compaction_group(compaction_group_it); separator::buffer& get_separator_buffer(separator& sep, const log_record_writer& writer); @@ -933,6 +936,8 @@ public: future<> do_barrier(); + future<> truncate_table(table_id table); + private: struct segment_allocation_guard { @@ -1510,6 +1515,31 @@ future<> segment_manager_impl::free_segment(log_segment_id segment_id) { co_return; } +future<> segment_manager_impl::truncate_table(table_id table) { + logstor_logger.info("Truncating table {}", table); + + auto holder = _async_gate.hold(); + + // do a barrier to ensure all the table's data is flushed to segments + // in the table's compaction groups. we assume there are no new writes + // to the table by this point. + co_await do_barrier(); + + // stop and free the compaction groups of the table. + auto segments = co_await _compaction_mgr.free_compaction_groups(table); + + // free all segments and remove their records from the index. + for (auto seg_id : segments) { + logstor_logger.debug("Freeing segment {} of truncated table {}", seg_id, table); + co_await for_each_record(seg_id, [this] (log_location loc, log_record record) { + _index.erase(record.key, loc); + return make_ready_future<>(); + }); + + co_await free_segment(seg_id); + } +} + future<> segment_manager_impl::for_each_record(log_segment_id segment_id, std::function(log_location, log_record)> callback) { auto holder = _async_gate.hold(); @@ -1836,6 +1866,42 @@ void compaction_manager::remove_segment(segment_descriptor& desc) { } } +future>> +compaction_manager::remove_compaction_group(compaction_group_it it) { + std::vector segments; + auto& cg = it->second; + auto& hist = cg.segment_hist; + while (!hist.empty()) { + co_await coroutine::maybe_yield(); + auto& desc = hist.one_of_largest(); + auto seg_id = _sm.desc_to_segment_id(desc); + segments.push_back(seg_id); + hist.erase(desc); + } + + if (_next_group_for_compaction == it) { + ++_next_group_for_compaction; + } + it = _compaction_groups.erase(it); + co_return std::make_pair(it, std::move(segments)); +} + +future> compaction_manager::free_compaction_groups(table_id table) { + co_await disable_auto_compaction(table); + + std::vector segments_to_free; + + auto it = _compaction_groups.lower_bound(group_id{table, 0}); + while (it != _compaction_groups.end() && it->first.table == table) { + co_await coroutine::maybe_yield(); + auto [next_it, segments] = co_await remove_compaction_group(it); + segments_to_free.insert(segments_to_free.end(), segments.begin(), segments.end()); + it = next_it; + } + + co_return std::move(segments_to_free); +} + separator::buffer& compaction_manager::get_separator_buffer(separator& sep, const log_record_writer& writer) { auto gid = writer.record().group; @@ -2206,6 +2272,10 @@ future<> segment_manager::do_barrier() { return _impl->do_barrier(); } +future<> segment_manager::truncate_table(table_id table) { + return _impl->truncate_table(table); +} + } template<> diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 50373a315d..6313e84579 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -83,6 +83,8 @@ public: // to their group_id. future<> do_barrier(); + future<> truncate_table(table_id); + friend class segment_manager_impl; }; diff --git a/replica/table.cc b/replica/table.cc index 4e8870c18d..9783e6438f 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -4281,6 +4281,12 @@ future table::discard_sstables(db_clock::time_point truncat co_return rp; } +future<> table::discard_kv_storage() { + if (_logstor) { + co_await _logstor->truncate_table(_schema->id()); + } +} + void table::mark_ready_for_writes(db::commitlog* cl) { if (!_readonly) { on_internal_error(dblog, ::format("table {}.{} is already writable", _schema->ks_name(), _schema->cf_name())); diff --git a/test/cluster/test_logstor.py b/test/cluster/test_logstor.py index 17e7164657..7958c3ac4e 100644 --- a/test/cluster/test_logstor.py +++ b/test/cluster/test_logstor.py @@ -330,3 +330,53 @@ async def test_compaction(manager: ManagerClient): metrics = await manager.metrics.query(servers[0].ip_addr) segments_compacted = metrics.get("scylla_logstor_sm_segments_compacted") or 0 assert segments_compacted == 4, f"Expected 4 segments to be compacted, but got {segments_compacted}" + +@pytest.mark.asyncio +async def test_drop_table(manager: ManagerClient): + """ + Test log compaction by creating dead data and verifying space reclamation. + """ + cmdline = ['--logger-log-level', 'logstor=trace'] + cfg = {'enable_logstor': True, 'experimental_features': ['logstor']} + servers = await manager.servers_add(1, cmdline=cmdline, config=cfg) + cql = manager.get_cql() + + async with new_test_keyspace(manager, "WITH tablets={'initial': 4}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'") + + # create another table that will not be dropped to verify it's not affected + await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'") + + # write data to fill few segments + value_size = 30 * 1024 + value = 'x' * value_size + for i in range(20): + await cql.run_async(f"INSERT INTO {ks}.test1 (pk, v) VALUES ({i}, '{value}')") + await cql.run_async(f"INSERT INTO {ks}.test2 (pk, v) VALUES ({i}, '{value}')") + + await cql.run_async(f"DROP TABLE {ks}.test1") + + # verify test2 is not affected + for i in range(20): + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test2 WHERE pk = {i}") + assert len(rows) == 1, f"Expected 1 row for key {i} in test2, but got {len(rows)}" + assert rows[0].v == value, f"Expected value of size {value_size} for key {i} in test2, but got {len(rows[0].v)}" + + # recreate the table and verify that old data is not visible + await cql.run_async(f"CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, v text) WITH storage_engine = 'logstor'") + + for i in range(20): + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test1 WHERE pk = {i}") + assert len(rows) == 0, f"Expected no rows for key {i} after table drop, but got {len(rows)}" + + # write new data to the recreated table and verify + await cql.run_async(f"INSERT INTO {ks}.test1 (pk, v) VALUES (1, 'new_value')") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test1 WHERE pk = 1") + assert len(rows) == 1, f"Expected 1 row for key 1 after new insert, but got {len(rows)}" + assert rows[0].v == 'new_value', f"Expected value 'new_value' for key 1 after new insert, but got {rows[0].v}" + + # verify test2 again + for i in range(20): + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test2 WHERE pk = {i}") + assert len(rows) == 1, f"Expected 1 row for key {i} in test2 after all operations, but got {len(rows)}" + assert rows[0].v == value, f"Expected value of size {value_size} for key {i} in test2 after all operations, but got {len(rows[0].v)}"